我们以 kafka 为例,看一下 Kafka-client 对版本的依赖情况,从图中可以看出 beam 2.6.0 版本的 api 改变基本是稳定的。当然,现在用的比较多的2.4、2.5版本。吐个槽,2.6版本之前的兼容性问题,上个版本还有这个类或方法,下一个版本就没有了,兼容性不是很好。

4. SDK beam-sdks-java-io-kafka 读取源码剖析

① 指定 KafkaIO 的模型,从源码中不难看出这个地方的 KafkaIO 类型是 Long 和 String 类型,也可以换成其他类型。

pipeline.apply(KafkaIO.read() pipeline.apply(KafkaIO.read()

② 设置 Kafka 集群的集群地址。

.withBootstrapServers(“broker_1:9092,broker_2:9092”)

③ 设置 Kafka 的主题类型,源码中使用了单个主题类型,如果是多个主题类型则用 withTopics(List) 方法进行设置。设置情况基本跟 Kafka 原生是一样的。

.withTopic(“my_topic”) // use withTopics(List) to read from multiple topics.

④ 设置序列化类型。Apache Beam KafkaIO 在序列化的时候做了很大的简化,例如原生 Kafka 可能要通过 Properties 类去设置 ,还要加上很长一段 jar 包的名字。

Beam KafkaIO 的写法:

.withKeyDeserializer(LongDeserializer.class)

.withValueDeserializer(StringDeserializer.class)

原生 Kafka 的设置:

Properties props = new Properties();

props.put(“key.deserializer”,“org.apache.kafka.common.serialization.ByteArrayDeserializer”);

props.put(“value.deserializer”,“org.apache.kafka.common.serialization.ByteArrayDeserializer”);

⑤ 设置 Kafka 的消费者属性,这个地方还可以设置其他的属性。源码中是针对消费分组进行设置。

.updateConsumerProperties(ImmutableMap.of(“group.id”, my_beam_app_1"))

⑥ 设置 Kafka 吞吐量的时间戳,可以是默认的,也可以自定义。

.withLogAppendTime()

⑦ 相当于 Kafka 中 “isolation.level” , “read_committed”,指定 KafkaConsumer 只应读取非事务性消息,或从其输入主题中提交事务性消息。流处理应用程序通常在多个读取处理写入阶段处理其数据,每个阶段使用前一阶段的输出作为其输入。通过指定 read_committed 模式,我们可以在所有阶段完成一次处理。针对 “Exactly-once” 语义,支持 Kafka 0.11 版本。

.withReadCommitted()

⑧ 设置 Kafka 是否自动提交属性 “AUTO_COMMIT”,默认为自动提交,使用 Beam 的方法来设置。

set CommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize)

.commitOffsetsInFinalize()

⑨ 设置是否返回 Kafka 的其他数据,例如 offset 信息和分区信息,不用可以去掉。

.withoutMetadata() // PCollection>

⑩ 设置只返回 values 值,不用返回 key。例如 PCollection,而不是 PCollection

.apply(Values.create()) // PCollection

在写入 Kafka 时完全一次性地提供语义,这使得应用程序能够在 Beam 管道中的一次性语义之上提供端到端的一次性保证。它确保写入接收器的记录仅在 Kafka 上提交一次,即使在管道执行期间重试某些处理也是如此。重试通常在应用程序重新启动时发生(如在故障恢复中)或者在重新分配任务时(如在自动缩放事件中)。Flink runner 通常为流水线的结果提供精确一次的语义,但不提供变换中用户代码的副作用。如果诸如 Kafka 接收器之类的转换写入外部系统,则这些写入可能会多次发生。

在此处启用 EOS 时,接收器转换将兼容的 Beam Runners 中的检查点语义与 Kafka 中的事务联系起来,以确保只写入一次记录。由于实现依赖于 runners checkpoint 语义,因此并非所有 runners 都兼容。Beam 中 FlinkRunner 针对 Kafka 0.11+ 版本才支持,然而 Dataflow runner 和 Spark runner 如果操作 kafkaIO 是完全支持的。

关于性能的注意事项:

“Exactly-once” 在接收初始消息的时候,除了将原来的数据进行格式化转换外,还经历了 2 个序列化 - 反序列化循环。根据序列化的数量和成本,CPU 可能会涨的很明显。通过写入二进制格式数据(即在写入 Kafka 接收器之前将数据序列化为二进制数据)可以降低 CPU 成本。

5. Pipeline

您输入的数据存储在哪里?

首先要确定你要构造几条数据源,在 Beam 可以构建多条,构建之前可以选择自己的 SDK 的 IO。

您的数据类型是什么样的?

Beam 提供的是键值对的数据类型,你的数据可能是日志文本,格式化设备事件,数据库的行,所以在 PCollection 就应该确定数据集的类型。

您想怎么去处理数据?

对数据进行转换,过滤处理,窗口计算,SQL 处理等。在管道中提供了通用的 ParDo 转换类,算子计算以及 BeamSQL 等操作。

您打算把数据最后输出到哪里去?

在管道末尾进行 Write 操作,把数据最后写入您自己想存放或最后流向的地方。

重要的是要理解变换不消耗 PCollections;相反,他们会考虑 a 的每个元素 PCollection 并创建一个新 PCollection 的输出。这样,您可以对不同的元素执行不同的操作 PCollection。这里是出现了两条管,例如输入 AR,AI,VAR,BT,BMP。

例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。

一种是收费的拓蓝公司出品叫 Talend Big Data Studio,有没有免费的呢?

有的,它叫 kettle-beam。例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。大家可以去 github 去看一下插件相应的安装及使用说明。从图中可以看出大部分 beam 的输入输出现在都是支持的。

https://github.com/mattcasters/kettle-beam

6. Runners

我们在看一下运行平台,这是运行平台支持度的截图。例如不同的数据源,有数据库,文件,以及缓存等输入进行合并。

Runners 在 Beam Model 模型中有4个支持的维度:

What,如何对数据进行计算?例如,机器学习中训练学习模型可以用 Sum 或者 Join 等。在 Beam SDK 中由 Pipeline 中的操作符指定。 Where,数据在什么范围中计算?例如,基于 Process-Time 的时间窗口、基于 Event-Time 的时间窗口、滑动窗口等等。在 Beam SDK 中由 Pipeline 的窗口指定。 When,何时输出计算结果?例如,在 1 小时的 Event-Time 时间窗口中,每隔 1 分钟将当前窗口计算结果输出。在 Beam SDK 中由 Pipeline 的 Watermark 和触发器指定。 How,迟到数据如何处理?例如,将迟到数据计算增量结果输出,或是将迟到数据计算结果和窗口内数据计算结果合并成全量结果输出。在 Beam SDK 中由 Accumulation 指定。

① What

对数据如果处理,计算。分组的矩阵图,提到这里说一下,这些运行平台已经集成到 Beam,只是没有更新到官方首页而已。以及或者是官方不打算主推的,就没有写上去。

② Where

窗口处理矩阵能力图,大家从图中可以看出很多都是全部支持的。

③ When

对于事件处理,流计算引擎Apache Flink,Google Cloud ,Dataflow 以及 Jstorm 都支持性比较好。

④ How

最后是对迟到数据的数据处理能力矩阵图。

7. FlinkRunner Beam

我们以最近两年最火的 Apache Flink 为例子,帮大家解析一下 beam 集成情况。大家可以从图中看出,flink 集成情况。

然后看一下,FlinkRunner 具体解析了哪些参数,以及代码中怎样设置。

8. Beam SQL

Apache Calcite 是一种保准 SQL 的解析器,用于大数据处理和一些流增强功能,基于它做 SQL 引擎的有很多,例如 spark,Cassandra,druid 和我们的 Beam。

我们看一下 Beam SQL 的设计思路:首先是我们写的 SQL 语句,进行查询解析,验证来源的类型,数据格式,建一个执行计划,然后通过优化,设计计划规则或逻辑,封装在 Beam 管道中,进行编译器编译,最后提交 job 到运行平台执行。

表中是 beam SQL 和 Calcite 的类型支持度,是把 Calcite 进行映射。

Beam SQL 和 Apache Calcite 函数的支持度。里面有一些现在不支持的,需要大家做的时候多多关注,特别是架构师设计时候。

从图中可以看出,首先要设置好数据类型,在设置数据,最后填充到管道数据集,最后做 SQL 的操作。其实这样写还是不方便的。有没有很好的解决方式,有。大家继续往下看…

Beam SQL 的扩展。Beam SQL 的 CREATE EXTERNAL TABLE 语句注册一个映射到外部存储系统的虚拟表 。对于某些存储系统,CREATE EXTERNAL TABLE 在写入发生之前不会创建物理表。物理表存在后,您可以使用访问表 SELECT,JOIN 和 INSERT INTO 语句。通过虚拟表,可以动态的操作数据,最后写入到数据库就可以了。这块可以做成视图抽象的。

Create 创建一个动态表,tableName 后面是列名。TYPE 是数据来源的类型,限制支持 bigquery,pubsub,kafka,text 等。Location 下面为表的数据类型配置, 这里以 kafka 为例。

▌****AloT PB 级实时数据,怎么构建自己的“AI微服务”?

在 AIoT 里面,实时性数据比较大,例如视频分析,视频挖掘,合规检测,语音分析等等。130W 路的摄像头每秒写入300多 G 的视频,一天就是 25PB,有人说可以晚上用批方式上数据,其实 AIoT 场景跟其他的场景是不一样的,例如做智能儿童手表,我们晚上上报数据的频度可以变低,白天儿童上学放学路上可以正常上报数据。AIoT 场景下摄像头24小时监控的,并且宽带主杆线都换成千兆光线,其实也支持不了每秒 300G 的实时写入。我们是怎么处理呢?

首先在设计架构方案的时候,相信很多架构师都会这样想,不想第一个去吃螃蟹,因为稳定性,安全性,及不确定性原因会导致整个项目的成败。那我们看一下 Beam 有哪些大厂在使用。

知道他们使用 Beam ,咱们了解一下他们用 Beam 做了什么?例如:

使用 Apache Beam 进行大规模流分析 使用 Apache Beam 运行定量分析 使用 Apache Beam 构建大数据管道 从迁移到 Apache Beam 进行地理数据可视化 使用 Apache Beam & tf.Transform 对 TensorFlow 管道进行预处理 卫星图像的土地利用分类 智慧城市大数据集成 平安城市及质量实时风控 电商平台双十一活动实时数据处理

小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数初中级Java工程师,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新Java开发全套学习资料》送给大家,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频

如果你觉得这些内容对你有帮助,可以添加下面V无偿领取!(备注Java)

华为、OPPO等大厂,18年进入阿里一直到现在。**

深知大多数初中级Java工程师,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新Java开发全套学习资料》送给大家,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。 [外链图片转存中…(img-Zv12zTdS-1710869089448)] [外链图片转存中…(img-Kh0DV5A8-1710869089448)] [外链图片转存中…(img-YK2bttsI-1710869089449)]

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频

如果你觉得这些内容对你有帮助,可以添加下面V无偿领取!(备注Java) [外链图片转存中…(img-5Zsb9kcP-1710869089449)]

相关链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。
大家都在看: