1.概述

Apache Flink中的KeyBy算子是一种根据指定Key将数据流分区的算子。在使用KeyBy算子时,需要指定一个或多个Key,Flink会根据这些Key将数据流分成不同的分区,以便并行处理。 KeyBy算子通常用于实现基于Key的聚合操作,如求和、平均值等。它可以将具有相同Key的数据分配到同一个分区中,并在该分区上执行聚合操作。此外,KeyBy算子还可以用于流数据的状态管理,如将具有相同Key的数据存储在同一个状态中进行管理。 KeyBy算子的使用非常简单,只需要在代码中调用DataStream的keyBy方法,并指定一个或多个Key即可,例如:

DataStream> dataStream = ...;

// 使用KeyBy算子将数据流分区

DataStream> keyedStream = dataStream.keyBy(0); // 根据第一个元素作为Key

 

在上述代码中,我们使用KeyBy算子将数据流按照第一个元素作为Key进行分区,并返回一个新的DataStream对象。在实际应用中,我们可以根据不同的需求指定不同的Key,以达到最优的分区效果。 需要注意的是,KeyBy算子只能将数据流按照指定的Key进行分区,而无法对分区进行任何修改。如果需要对分区进行修改或者调整,可以使用其他分区算子,例如Shuffle、Rebalance等。

2.使用示例

2.1单分区键

假设我们有一份订单数据流,包含订单ID、用户ID和订单金额三个字段,我们希望根据用户ID将数据流分区,并对每个用户的订单金额求和。代码如下所示:

// 定义订单数据类

public class Order {

public String orderId;

public String userId;

public double amount;

public Order(String orderId, String userId, double amount) {

this.orderId = orderId;

this.userId = userId;

this.amount = amount;

}

}

// 生成订单数据流

List orders = new ArrayList<>();

orders.add(new Order("1", "user1", 100.0));

orders.add(new Order("2", "user2", 200.0));

orders.add(new Order("3", "user1", 300.0));

orders.add(new Order("4", "user2", 400.0));

DataStream orderStream = env.fromCollection(orders);

// 使用KeyBy算子将数据流按照用户ID分区,并对每个用户的订单金额求和

DataStream> sumStream = orderStream

.keyBy(order -> order.userId) // 根据用户ID作为Key进行分区

.sum("amount"); // 对每个分区的订单金额求和

// 输出分区后的结果

sumStream.print();

在上述代码中,我们首先定义了一个Order类,用于表示订单数据。然后生成一份包含4个订单的数据流,并使用KeyBy算子将数据流按照用户ID分区。接着,我们调用sum算子对每个分区的订单金额求和,并将计算结果打印输出。 运行代码后,我们可以得到如下的输出结果:

(user1, 400.0)

(user2, 600.0)

从输出结果可以看出,KeyBy算子将订单数据流按照用户ID分成了两个分区,分别对应用户user1和user2,而sum算子则对每个分区的订单金额进行了求和。这样,我们就成功地完成了基于Key的聚合操作。 需要注意的是,KeyBy算子只能将数据流按照指定的Key进行分区,而无法对分区进行任何修改。如果需要对分区进行修改或者调整,可以使用其他分区算子,例如Shuffle、Rebalance等。

2.2多分区键

假设我们有一份订单数据流,包含订单ID、用户ID、商家ID和订单金额四个字段,我们希望根据用户ID和商家ID将数据流分区,并对每个用户和商家的订单金额求和。代码如下所示:

// 定义订单数据类

public class Order {

public String orderId;

public String userId;

public String merchantId;

public double amount;

public Order(String orderId, String userId, String merchantId, double amount) {

this.orderId = orderId;

this.userId = userId;

this.merchantId = merchantId;

this.amount = amount;

}

}

// 生成订单数据流

List orders = new ArrayList<>();

orders.add(new Order("1", "user1", "merchant1", 100.0));

orders.add(new Order("2", "user2", "merchant2", 200.0));

orders.add(new Order("3", "user1", "merchant1", 300.0));

orders.add(new Order("4", "user2", "merchant2", 400.0));

DataStream orderStream = env.fromCollection(orders);

// 使用KeyBy算子将数据流按照用户ID和商家ID分区,并对每个用户和商家的订单金额求和

DataStream> sumStream = orderStream

.keyBy(order -> Tuple2.of(order.userId, order.merchantId)) // 根据用户ID和商家ID作为Key进行分区

.sum("amount"); // 对每个分区的订单金额求和

// 输出分区后的结果

sumStream.print();

在上述代码中,我们首先定义了一个Order类,用于表示订单数据。然后生成一份包含4个订单的数据流,并使用KeyBy算子将数据流按照用户ID和商家ID分区。接着,我们调用sum算子对每个分区的订单金额求和,并将计算结果打印输出。 需要注意的是,对于多Key的情况,我们需要将多个Key封装成一个Tuple类型,以便进行分区。在上述代码中,我们使用了Flink提供的Tuple2.of方法将用户ID和商家ID封装成了一个Tuple2对象。 运行代码后,我们可以得到如下的输出结果:

(user1,merchant1,400.0)

(user2,merchant2,600.0)

从输出结果可以看出,KeyBy算子将订单数据流按照用户ID和商家ID分成了两个分区,分别对应(user1,merchant1)和(user2,merchant2),而sum算子则对每个分区的订单金额进行了求和。这样,我们就成功地完成了基于多Key的聚合操作。

3.源代码剖析

在Flink的源代码中,KeyBy算子是一个Transformation类型的算子,其主要实现逻辑在KeyedStream类中。KeyedStream类继承自DataStream类,表示一个分区的数据流,其中的数据是根据指定的Key进行分区的。KeyedStream类提供了若干个聚合算子,例如sum、min、max等,可以对分区中的数据进行聚合计算。 在KeyBy算子的具体实现中,其主要流程如下:

接收指定的KeySelector,用于从数据流中抽取Key;

根据Key将数据流进行分区,并返回一个KeyedStream对象;

对KeyedStream对象进行聚合计算,例如sum、min、max等。 在具体实现上,KeyBy算子的核心代码如下所示:

public KeyedStream keyBy(KeySelector keySelector) {

// 根据KeySelector对数据流进行分区,返回一个KeyedStream对象

return new KeyedStream<>(this, Objects.requireNonNull(keySelector), getType());

}

 

在上述代码中,KeyBy算子接收一个KeySelector对象,用于从数据流中抽取Key。然后,KeyedStream类的构造函数中会使用KeySelector对象对数据流进行分区,并返回一个新的KeyedStream对象,其中的数据根据指定的Key进行了分区,并将分组 key 保存在 keySelector 成员变量中。最后,KeyedStream对象提供了若干个聚合算子,可以对分区中的数据进行聚合计算。

KeyedStream 对象的构造函数定义如下:

public KeyedStream(DataStream input, KeySelector keySelector, String callLocationName) {

super(input.getExecutionEnvironment(), new PartitionTransformation<>(input.getTransformation(), keySelector, callLocationName));

this.keySelector = Preconditions.checkNotNull(keySelector);

}

在构造函数中,会调用 PartitionTransformation 的构造函数,将原数据流的 Transformation 对象作为参数,并将分组 key 保存在 PartitionTransformation 对象中。PartitionTransformation 是 Flink 中用于对数据流进行分区的算子。在 KeyedStream 中,它用于对数据流按照指定的 key 进行分组。 KeyedStream 对象中,还定义了一系列用于对分组后的数据流进行操作的方法,如 reduce()、sum()、window() 等。这些方法都是返回一个新的 DataStream 对象,表示对分组后的数据流进行了一些转换操作。例如 reduce() 方法的定义如下:

public DataStream reduce(ReduceFunction reducer) {

return transform("KeyedReduce", getType(), new ReduceOperator<>(reducer), new StreamExchangeModeProperty(StreamExchangeMode.BATCH_ONLY));

}

可以看到,reduce() 方法内部调用了 transform() 方法,传入了一个 ReduceOperator 对象作为参数。ReduceOperator 是 Flink 中用于对数据流进行聚合操作的算子。在 transform() 方法中,会将当前 KeyedStream 对象的算子链与 ReduceOperator 组成一个新的算子链,并返回一个新的 DataStream 对象。 总的来说,KeyBy 算子是 Flink 中用于对数据流进行分组的核心算子之一,它将数据流按照指定的 key 进行分组,并返回一个 KeyedStream 对象,方便用户对分组后的数据流进行操作。

需要注意的是,KeyBy算子的实现主要依赖于Flink的DataStream API,其具体实现可能因版本而异。以上代码仅为Flink 1.12版本的实现方式,其他版本实现可能略有不同。

相关文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。