1. 概述

Apache Flink是一个大数据处理框架,允许程序员以非常高效和可扩展的方式处理大量数据。

在本文中,我们将介绍Apache FlinkJava API 中提供的一些核心 API 概念和标准数据转换。这个 API 的流畅风格使得使用 Flink 的中心结构——分布式集合变得容易。

首先,我们将看一下 Flink 的DataSetAPI 转换,并使用它们来实现字数统计程序。然后我们将简要介绍一下 Flink 的DataStreamAPI,它允许您以实时方式处理事件流。

2. Maven 依赖

首先,我们需要将 Maven 依赖项添加到 flink-java 和flink-test-utils库中:

org.apache.flink

flink-java

1.2.0

org.apache.flink

flink-test-utils_2.10

1.2.0

test

Copy

3. 核心 API 概念

在使用 Flink 时,我们需要了解与其 API 相关的几件事:

每个 Flink 程序都对分布式数据集合执行转换。提供了多种数据转换功能,包括过滤、映射、联接、分组和聚合

Flink 中的sink操作会触发流的执行以产生程序的预期结果,例如将结果保存到文件系统或将其打印到标准输出

Flink 转换是惰性的,这意味着在调用接收器操作之前不会执行它们

Apache Flink API 支持两种操作模式——批处理和实时。如果要处理可在批处理模式下处理的有限数据源,则将使用数据集API。如果要实时处理无限数据流,则需要使用DataStreamAPI

4. 数据集 API 转换

Flink 程序的入口点是ExecutionEnvironment类的一个实例——它定义了程序执行的上下文。

让我们创建一个执行环境来开始我们的处理:

ExecutionEnvironment env

= ExecutionEnvironment.getExecutionEnvironment();Copy

请注意,当您在本地机器上启动应用程序时,它将在本地 JVM 上执行处理。如果要在计算机集群上开始处理,则需要在这些计算机上安装Apache Flink并相应地配置执行环境。

4.1. 创建DataSet

要开始执行数据转换,我们需要为程序提供数据。

让我们使用我们的执行环境创建DataSet类的实例:

DataSet amounts = env.fromElements(1, 29, 40, 50);Copy

您可以从多个源创建数据集,例如 Apache Kafka、CSV、文件或几乎任何其他数据源。

4.2. Filter和Reduce

创建DataSet类的实例后,可以对其应用转换。

假设您要过滤超过某个阈值的数字,然后将它们全部相加。您可以使用filter() 和reduce() 转换来实现此目的:

int threshold = 30;

List collect = amounts

.filter(a -> a > threshold)

.reduce((integer, t1) -> integer + t1)

.collect();

assertThat(collect.get(0)).isEqualTo(90);

Copy

请注意,collect() 方法是触发实际数据转换的接收器操作。

4.3. map

假设您有一个Person对象的数据集:

private static class Person {

private int age;

private String name;

// standard constructors/getters/setters

}Copy

接下来,让我们创建这些对象的数据集:

DataSet personDataSource = env.fromCollection(

Arrays.asList(

new Person(23, "Tom"),

new Person(75, "Michael")));Copy

假设您只想从集合的每个对象中提取年龄字段。您可以使用map() 转换来仅获取Person类的特定字段:

List ages = personDataSource

.map(p -> p.age)

.collect();

assertThat(ages).hasSize(2);

assertThat(ages).contains(23, 75);Copy

4.4. Join

 

当您有两个数据集时,您可能希望将它们连接到某个id字段上。为此,您可以使用join() 转换。

让我们创建用户的交易和地址的集合:

Tuple3 address

= new Tuple3<>(1, "5th Avenue", "London");

DataSet> addresses

= env.fromElements(address);

Tuple2 firstTransaction

= new Tuple2<>(1, "Transaction_1");

DataSet> transactions

= env.fromElements(firstTransaction, new Tuple2<>(12, "Transaction_2"));

Copy

两个元组中的第一个字段都是Integer类型,这是一个id字段,我们希望在其上连接两个数据集。

为了执行实际的连接逻辑,我们需要为地址和事务实现一个KeySelector接口:

private static class IdKeySelectorTransaction

implements KeySelector, Integer> {

@Override

public Integer getKey(Tuple2 value) {

return value.f0;

}

}

private static class IdKeySelectorAddress

implements KeySelector, Integer> {

@Override

public Integer getKey(Tuple3 value) {

return value.f0;

}

}Copy

每个选择器仅返回应对其执行连接的字段。

不幸的是,这里不能使用 lambda 表达式,因为 Flink 需要泛型类型信息。

接下来,让我们使用这些选择器实现合并逻辑:

List, Tuple3>>

joined = transactions.join(addresses)

.where(new IdKeySelectorTransaction())

.equalTo(new IdKeySelectorAddress())

.collect();

assertThat(joined).hasSize(1);

assertThat(joined).contains(new Tuple2<>(firstTransaction, address));

Copy

4.5. Sort

假设您有以下Tuple2 集合:

Tuple2 secondPerson = new Tuple2<>(4, "Tom");

Tuple2 thirdPerson = new Tuple2<>(5, "Scott");

Tuple2 fourthPerson = new Tuple2<>(200, "Michael");

Tuple2 firstPerson = new Tuple2<>(1, "Jack");

DataSet> transactions = env.fromElements(

fourthPerson, secondPerson, thirdPerson, firstPerson);

Copy

如果要按元组的第一个字段对此集合进行排序,可以使用sortPartitions()转换:

List> sorted = transactions

.sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)

.collect();

assertThat(sorted)

.containsExactly(firstPerson, secondPerson, thirdPerson, fourthPerson);Copy

5. 字数统计

字数统计问题通常用于展示大数据处理框架的功能。基本解决方案涉及计算文本输入中的单词出现次数。让我们使用 Flink 来实现这个问题的解决方案。

作为解决方案的第一步,我们创建一个LineSplitter类,该类将我们的输入拆分为标记(单词),为每个标记收集键值对的Tuple2。在每个元组中,键是在文本中找到的单词,值是整数 (1)。

此类实现FlatMapFunction接口,该接口将字符串作为输入并生成Tuple2

public class LineSplitter implements FlatMapFunction> {

@Override

public void flatMap(String value, Collector> out) {

Stream.of(value.toLowerCase().split("\\W+"))

.filter(t -> t.length() > 0)

.forEach(token -> out.collect(new Tuple2<>(token, 1)));

}

}Copy

我们在Collector类上调用collect() 方法,以在处理管道中向前推送数据。

我们的下一步也是最后一步是按元组的第一个元素(单词)对元组进行分组,然后对第二个元素执行总和聚合以生成单词出现的计数:

 

public static DataSet> startWordCount(

ExecutionEnvironment env, List lines) throws Exception {

DataSet text = env.fromCollection(lines);

return text.flatMap(new LineSplitter())

.groupBy(0)

.aggregate(Aggregations.SUM, 1);

}Copy

我们使用三种类型的 Flink 转换:flatMap()、groupBy() 和aggregate()。

让我们编写一个测试来断言字数统计实现按预期工作:

List lines = Arrays.asList(

"This is a first sentence",

"This is a second sentence with a one word");

DataSet> result = WordCount.startWordCount(env, lines);

List> collect = result.collect();

assertThat(collect).containsExactlyInAnyOrder(

new Tuple2<>("a", 3), new Tuple2<>("sentence", 2), new Tuple2<>("word", 1),

new Tuple2<>("is", 2), new Tuple2<>("this", 2), new Tuple2<>("second", 1),

new Tuple2<>("first", 1), new Tuple2<>("with", 1), new Tuple2<>("one", 1));Copy

6. 数据流接口

6.1. 创建数据流

Apache Flink 还支持通过其 DataStream API 处理事件流。如果我们想开始使用事件,我们首先需要使用StreamExecutionEnvironment类:

StreamExecutionEnvironment executionEnvironment

= StreamExecutionEnvironment.getExecutionEnvironment();Copy

接下来,我们可以使用来自各种来源的executionEnvironment创建事件流。它可以是像Apache Kafka 这样的消息总线,但在这个例子中,我们将简单地从几个字符串元素创建一个源:

DataStream dataStream = executionEnvironment.fromElements(

"This is a first sentence",

"This is a second sentence with a one word");Copy

我们可以像在普通的 DataSet类中一样将转换应用于DataStream的每个元素:

SingleOutputStreamOperator upperCase = text.map(String::toUpperCase);Copy

要触发执行,我们需要调用一个接收器操作,例如print(),该操作只会将转换的结果打印到标准输出中,然后是StreamExecutionEnvironment类上的execute()方法:

upperCase.print();

env.execute();Copy

它将生成以下输出:

1> THIS IS A FIRST SENTENCE

2> THIS IS A SECOND SENTENCE WITH A ONE WORDCopy

6.2. 事件的窗口化

实时处理事件流时,有时可能需要将事件分组在一起,并在这些事件的窗口上应用一些计算。

假设我们有一个事件流,其中每个事件都是一对,由事件编号和事件发送到我们系统时的时间戳组成,我们可以容忍无序事件,但前提是它们迟到不超过二十秒。

对于此示例,让我们首先创建一个模拟相隔几分钟的两个事件的流,并定义一个时间戳提取器来指定我们的延迟阈值:

SingleOutputStreamOperator> windowed

= env.fromElements(

new Tuple2<>(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),

new Tuple2<>(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))

.assignTimestampsAndWatermarks(

new BoundedOutOfOrdernessTimestampExtractor

>(Time.seconds(20)) {

@Override

public long extractTimestamp(Tuple2 element) {

return element.f1 * 1000;

}

});Copy

接下来,让我们定义一个窗口操作,将事件分组为五秒窗口,并对这些事件应用转换:

SingleOutputStreamOperator> reduced = windowed

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

.maxBy(0, true);

reduced.print();Copy

它将每五秒获得窗口的最后一个元素,因此它会打印出来:

1> (15,1491221519)Copy

请注意,我们看不到第二个事件,因为它的到达时间晚于指定的延迟阈值。

7. 结论

在本文中,我们介绍了 Apache Flink 框架,并查看了其 API 提供的一些转换。

我们使用 Flink 流畅且实用的 DataSet API 实现了一个字数统计程序。然后,我们查看了 DataStream API,并对事件流实现了简单的实时转换。

相关链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。