摘要:本文整理自阿里云高级产品专家黄鹏程和阿里云技术专家陈婧敏在 FFA 2023 平台建设专场中的分享。内容主要为以下五部分:

阿里云实时计算 Flink 产品化思考 产品化实践 SQL 产品化思考及实践 展望

接上篇:阿里云实时计算Flink的产品化思考与实践【上】

三、产品化实践

4、技术民主化

这涉及到了一个关键问题:如何确保每个人都能够参与实时数据处理和分析的过程中。这正对应了国内常见的问题,即业务和使用门槛的高度挑战。

Snowflake 之所以广受欢迎,不仅归功于其卓越的技术架构,还在于它推动了技术的民主化过程。即便是非技术人员,只要掌握一些 SQL 知识就能轻松使用。相比之下,Flink 在实现技术民主化方面还面临着较长的道路。

作为一家云服务提供商,阿里云 Flink 提供了一个一站式的开发和运维平台。它支持用户的全方位需求,包括开发、测试、调试、以及底层资源的运行调度,并在运行维护方面(包括问题诊断、资源优化、日志排查、监控与告警等)提供全面服务。

4.1 流批一体的作业开发与运行

我们提供一个平台式的流批一体作业开发运行环境,支持流作业和批作业。在今年的云栖大会上,我们宣布了我们有了自己的调度引擎,能够让用户在平台上一次把所有的流和批都做完。

4.2 纯SQL 开发,简单易用,专注业务

我们的开发过程中,我们鼓励用户采用 SQL 进行开发。尽管 Datastream API 和 Table API 提供了极高的灵活性,我们仍旧希望用户能够在确保业务需求得到满足的前提下,优先选择使用 Flink SQL。目前在云端,大约70%的用户选择使用纯 Flink SQL,而剩下的 30% 则部署了使用 Datastream API 或 Table API 的 JAR 作业,这显示了 SQL 在我们平台上的首要地位。

为了进一步支持用户,我们在云端提供了一个可视化 Web IDE,它不仅能协助用户更便捷地开发 Flink 作业,还提供语法校验和错误提示,从而增强开发体验。此外,该 IDE 还支持用户进行数据探查,管理用户定义的函数(UDF),组织作业,管理元数据,以及进行 SQL 优化分析,全方位满足不同开发者的需求。

4.3 快速实现SQL 调试,支持模拟数据生成

调试部分分为以下几个步骤:第一,可快速地反复运行;第二,确定测试数据的来源;第三,如何保证不影响线上数据。们会按照用户要求在规则的限制下使用专门的 Connector 快速生成一些类似于业务数据的数据,可以用户自行上载测试数据,整个过程不会影响线上的实际生产数据。

4.4 智能自动调优模式,轻松应对洪峰

项目进入开发测试阶段后,便开始了在云环境上运行作业的过程。此时,我们引入了智能调优模式,旨在帮助用户高效应对业务高峰。该模式能根据作业运行情况,对 Flink 的配置提出针对性的调整建议或采取相应措施,无论配置偏高或偏低均可。它通过分析作业指标(包括作业本身的指标和底层环境的指标,以及日志信息),辅助用户对作业执行参数进行优化调整,以更好地适应当前的业务需求,例如通过调整作业的并发度。此外,当需要对资源进行伸缩调整时,系统还可利用上边提到的参数动态更新模式,实现资源配置的迅速变更。在右下角展示的案例中,显示了一个 AGG 算子的处理能力遭遇瓶颈。随着作业运行一段时间,系统会提示用户考虑增加 MiniBatch 配置以提升处理能力。

目前,我们拥有一套丰富的智能自动调优策略,这套策略涵盖了三个方面:基于 CPU、基于内存以及基于延迟的参数调整。未来,还计划通过考虑 State 的情况来进一步完善这套自动调优策略。

除了完全自动调优外,还有一种定时调优。

因为自动调整是为了应对意料之外的流量,但如果每个人对自己的业务较为熟悉,或者业务运作具有一定的计划性—比如双十一购物节、教育培训行业的周末和晚间时段—那么用户可以按照时间制定资源计划。这样,他们就能在特定的时间段重启作业,或者采用动态更新的方式调整资源配置,比如横向调整并发处理的数量、纵向调整资源规模,或者根据需求高峰期调整适用于 Flink 的相关配置。

4.5 作业智能诊断, 一键诊断问题

阿里云 Flink 进一步降低了用户在问题诊断上的使用门槛,这通常是使用 Flink 时压力最大、最耗费时间和精力的环节。Flink 的智能诊断功能覆盖了作业开发和运行的整个过程,包括在出现异常时提供的建议。在整个应用周期内,无论是在运行阶段还是上线过程中,它都能进行错误检测,明确指出错误原因以及纠正方法。在运行状态下,Flink 还会提供问题提示。尤其是当作业运行失败时,Flink 能够根据当前的作业状况与规则库匹配,提出针对性的建议,帮助用户迅速回归到业务开发上,免去底层细节问题的困扰。

全生命周期的作业状态管理还可以降低使用门槛,这关系到如何管理状态的问题。具体包括状态的(定期)生成与删除、状态的展示、状态的兼容性,以及状态的恢复。

用户往往需要确认状态是否按照预期进行操作并希望通过状态来观察业务的进展情况,特别是当使用 Datastream API 的情况下,对于状态的操作更加的灵活。平台提供了检查 Checkpoint 的正确性、数量、所需时间以及探查状态的中间结果等方面的功能。另外,我们计划后续推出一套状态探查工具和结果分析功能,以帮助用户更深入地了解和监控状态。

对于状态兼容性,无论对于 Apache Flink 还是云服务的能力而言,我们都渴望在业务逻辑发生变化后,现有的状态能够保持兼容,避免需要重新追溯数据。为此,我们引入了状态兼容性检查功能,此功能能够告知我们作业的兼容性情况,帮助我们判断是否能够直接重启业务。此外,我们还致力于在兼容性方面进行一定程度的提升。这里所说的“提升”,并非指向状态周期管理的具体功能上,而是指在 SQL 核心级别进行的改进,以期望用户在修改特定 SQL 语句后,仍旧保持兼容性。关于如何修改 SQL 以保持状态兼容的详尽文档,可以在阿里云 Flink 官方网站找到。

对于状态的恢复,具体而言,涉及从状态(state)、检查点(checkpoint)进行恢复,未来还将支持兼容 Flink 开源状态的云端恢复。此外,我们已经支持跨作业状态恢复,允许使用 B 作业的状态来启动A作业。这在进行 A/B 测试或构建容错机制时特别有用,因为这些场景常常要求根据一个作业的状态来顺序启动或建立关联启动另一个作业。

5、实时数据可观测性

它与我们平时谈论的可观测性内容没有区别,我们现在只是站在了实时领域的角度进行分析。

首先,数据处理的状态:这指的是作业处理过程中的各个阶段需要被观测。其次,元数据的管理,涉及到作业当前的数据背后的业务含义。最后,数据血缘,关注的是数据之间的相互关系。

5.1 作业运行状态

产品具备极其详细且全面的监控告警功能,同时与阿里云的云产品 ARMS 实现了对接,为用户提供了一种更为灵活的方式,以查看所有指标的状态。

5.2 数据血缘

近些年实时数仓常常被提起,它涉及在实时环境中根据数据仓库理论进行数据的分层建设。在这个过程中,作业之间的关系变得更加复杂。这不仅仅关乎单个作业的运行,而是关于一个作业的产出如何成为另一个作业的原始输入的过程。对于这一流程,流式作业与批处理作业之间存在诸多差异。批处理作业的数据血缘可以在一定程度上通过调度工具获取,而流式作业并不涉及调度,缺乏调度器提供的信息,因此需要借助平台来完成这一部分工作。

具体的实施方法以及达到的效果将在后续由陈老师进行分享。

5.3 元数据管理

无论是云端用户还是开源用户,我们都希望他们能够使用 Catalog。尽管从开源或商业角度来看,Catalog 目前支持的数据组件数量相对较少,但它已经支持了包括 Kafka、MySQL 在内的常见数据组件,以及阿里云的 Hologres、ADB、MaxCompute、数据湖 Paimon 等。

6、实时离线一体化

6.1 Apache Paimon构建流批一体存储基础

首先,Apache Paimon 本身采用的是湖仓一体化的架构,主打的是离线数据在成本可控的情况下进一步的实时化改造升级;其次,它并不直接提供存储服务,而是依托于对象存储技术,例如阿里云的 OSS,可以视作与 Flink 高度兼容的 Hudi 或 Iceberg 的延伸。Apache Paimon 具备流表二元属性,能够同时支持文件存储(File Store)和日志存储(Log Store)。在其架构下,支持流式读写和批量读写操作,同时也能实现流式写入后的批量处理。

6.2 Flink + Paimon构建流式湖仓

Apache Paimon 上构建的 Streamhouse 旨在从用户的数据库或日志系统中提取数据后进行湖上数仓分层构建准实时数据分析体系。对于数据库数据,我们通过 Flink CDC 实时抽取数据,并将其存储在 OSS 云服务上的 Paimon 数据湖中。接着,我们可以进一步进行数据流的分层处理,遵循标准的数据仓库分层架构。每一层都可以采用流处理或批处理的方式构建,且每一层的数据都能被多种外部独立供应商的查询工具,如 Presto、Hologres、StarRocks,以及阿里云的 MaxCompute 等,进行查询。这种模式相较于以往依赖 Kafka 的纯开源方案,使得数据沉淀更加便捷,且数据的查询变得更加高效。

该架构实现了流批处理的一体化,存储全部基于 OSS,计算通过 Flink 加上 Paimon 的 API 来完成,整个链路的实时处理能力意味着每一层都能生成供下一层消费的日志。由于整个系统建立在 OSS 之上,它实现了近乎实时的数据处理,同时保持了较低的成本。此外,其开放的数据架构确保了每一层都可以被不同的 OLAP 引擎查询,增强了数据的可访问性和灵活性。

四、SQL产品化思考及实践

这部分主要介绍近一年 SQL 在产品上做出的重要突破及未来的规划。

1、降本增效

我们在保持了社区 API 不变的前提下,内部引入了增强的特性,能够更好地帮助用户达到降本增效的效果。

1.1 引入增强的 QUERY HINTS

(1)针对双流 Join 的 TTL Hint

在社区1.18之前,SQL 作业只能在作业粒度设置状态算子 TTL,这意味着如果作业中有多个状态算子,只能设置同样的 TTL。我们以双流 Join 为例简要介绍一下这个过程。当作业在从 Execplan 生成 Transformation 时 TableConfig 会被解析出来作为入参去创建 StreamingJoinOperator。在 Runtime 阶段,当 Operator 的 Open 函数被调用时,TTL 值会分别用于创建左流和右流的 StateDescriptor。

FLIP-292 中提出了一种通用的在算子粒度修改 State TTL 的方法。但该方法要求用户修改 ExecPlan 序列化之后的 JSON 文件,交互没那么友好。产品上我们针对双流 Join 这种 hint 传播没有歧义的场景下引入了 JOIN_STATE_TTL,这样用户就可以通过写 SQL 来修改左、右流的State TTL 达到预定效果。在实现方面,优化器会先在 LogicalPlan 解析和校验 Hint。在 ExecPlan 到 Transformation 的生成过程中,把 Hint 的 Value 转化成 FLIP-292 引入的 StateMetadata,达到跟社区保持兼容的目的。

下面展示了阿里云内部作业的优化效果:

该作业中左流是大流,右流是小流,左流的数据是右流的 20-50 倍,右流有长周期保存的需求。理想状态下,右流的 State 需要保存 18 天,但出于权衡,业务方最终设置了 10 天为 TTL。在这种情况下,可以看到其 State Size 约为 5.8T,而当用户使用了 JOIN_STATE_TTL 之后,它的左流可以设置为 12 个小时,进而提高其右流数据的计算精度达到 18 天。在不牺牲数据正确性的前提下,其 State Size 降低到了 582 G,整个作业的资源消耗从原来的 700CU 降低到了现在的 300-400CU。

可以看出,对于左流跟右流需要保存不同的 TTL 的场景,Hint 的优化效果比较明显。

(2)针对维表 Join 的 Shuffle Hint

除了双流 Join 场景之外,我们也为维表 Join 场景引入了优化的 Hint。以 Cache All 为例,过去如果要使用 Cache All 方式做维表关联,每个 Subtask 上面都会缓存维表全部数据,故而较大的维表很难完成该操作。

最近我们引入了 SHUFFLE_HASH Hint。它会指定流表在关联之前先按照连接 Key 做一次 Hash。同一个 Key 值对应的记录在做完 Hash Shuffle 之后,必然会落在某个确定的 Subtask 上,与此同时,维表在 Open 和 Loading 过程中可以只过滤出当前 Key 所对应的维表的数据。假设作业的并发是 N,在数据分布均匀的情况下,通过一次 Hash Shuffle 之后,每个 Subtask 上只需要加载 1/N 的数据,如此一来大维表也可以较好地支持 Cache All 操作。

在数据分布不均匀时,通过一次 Hash Shuffle,Key 会落在某个或某几个 Subtask 上,造成作业出现数据热点,拖垮整个作业处理数据的速度。

针对这种情况,我们引入了 REPLICATED_SHUFFLE_HASH hint。

核心思路是随机打散。通过种分桶机制指定 Bucket 的数量,这样对于热点数据,就会把它随机 Shuffle 到 b 个桶,即 b 个 Subtask 上,相当于把维表的数据随机地在 b 个 Subtask 上 Loading。通过这种方式就完成了动态维表的复制,用户不需要自己再去把维表复制 b 份打散,在 Runtime 阶段会自动完成,来缓解由于 Shuffle 导致的数据热点问题。

1.2 Mini-batch 双流 Join

以当前 Query 为例,这是一个左表关联右表的场景。假设关联 Key a0 和 b0 都分别是 A 表跟 B 表的主键。接下来通过一个小动画展示有更新流的场景 Outer Join 存在的问题。假设在 t1 时刻,B 流流入了一条 +I 数据,由于 A 流还没有数据,则此时不会有数据发送。同时,会把 B 流来的数据更新到 B 流所对应的 State 中;在下一时刻,A 流流入了一条数据,同样 A 流的数据会更新到它对应的 State 中,假设 a0 跟 b0 可以关联到,此时就会输出一条关联之后的数据;在下一刻,收到了一条来自 B 流的 -U 的数据,此时,B 流对应的 State 被清空,发出一条 -U 的消息撤回之前的操作,同时在此 Outer Join 场景下,根据语义再发送一条左流 Pad Null 数据,因为此时 B 里面已经没有数据了;同样,如果此时 B 流流入了一条更新的数据,就会先把之前的 Pad Null 数据撤回,再发送更新之后的数据。

以此类推,如果 B 流更新了两次,则会发送九条数据。其中有七条是因为 B 流的状态变更导致的中间结果,也就是冗余的中间消息。假设下游并不需要完整 Changelog,则没有必要去发送中间的冗余消息,也不需要每次都更新 State。

针对这种情况,我们最近引入了 Mini-batch 的双流 Join 优化。其核心思想很简单——攒批。数据流入之后先不进行处理,而是攒在 Buffer 中,等到指定的时间或 Buffer 填满时,统一进行处理。在实现层面,攒批既降低了 State 的更新,又能避免发送中间的冗余消息。以刚才的 Example 为例来介绍 Mini-batch 的攒批实现。

现在对于 A 流跟 B 流来说,除了对应的 State 之外,还多了对应的 In-memory Buffer 用来缓存每个 Batch 之内的数据。假设 t1 时刻,B 流流入了一条数据,将其放进 B 流 Buffer;在 t2 时刻,A 流流入了一条数据,同样将其放进 A 流的 Buffer,此时 A 流跟 B 流都流入了数据,但由于仍未触发 Buffer Flush,则不会更新 State 也不会输出关联结果;假设在 t3 时刻 Buffer 已经充满,则 A 流的 Buffer 就会 Flush 到 State,B 流同样如此。同时,因为 a0 和 b0 可以互相关联,则此时输出第一条关联记录。当 B 流流入了一条 -U 数据,就会把它放进对应 Buffer 中,同时不更新 State;当 B 流流入了一条 +U 数据,把它放在 Buffer 的 List 中,同样不更新 State;t5 时刻又流入了一条 -U 的消息,此时发现 Buffer 中的数据变成了一开始的 -U 的消息,因为这一组数据在写入 Buffer 时被消除了,因为不考虑 Rowkind 时记录的内容是相同的,相当于 Accumulate 数据在前,Retract 数据在后,进行了抵消;t7 时刻,流入了一条 +U 的消息,因为它不可以与前面的消息折叠,则会被放在 Buffer 中;到 t8 时刻,当 Flush Buffer 时,可以看到 Buffer里只有两条数据。最后,只会输出对于第一条消息的 -U 和第二条消息的 +U 带来的更新操作。

这里还可以进一步优化,因为假定 a0 和 b0 分别是 A 流和 B 流的主键,当其既是主键又是 Join Key 时,只需要保留 Join Key 下最新的记录。也就是在更新 State 时,只需要把最后一条 +U 的数据更新到 State 即可。这样,一方面减少了中间冗余消息的输出,另一方面也减少了 State 的操作。

1.3 级联 Join 优化 (WIP)

由于目前所有流上的 Join 都是算子两两之间的 Join,故而在级联 Join 场景,随着加入 Join 的流的增多,State 会存在逐级放大的现象。目前我们正在探索是否有可能从 Plan 结合 Multiple Input 算子的层面消除这种中间 State 的冗余。

2、易用性提升

2.1 SQL优化建议

虽然 SQL 已经可以非常方便地帮助用户快速搭建实时的 Pipeline,但与批模式下执行 SQL 相比,流 SQL 的语义门槛仍相对较高,我们在社区或内部值班时,经常会遇到 Changelog 乱序,Retraction 造成非预期结果,或是 State 过期,或是使用了非确定性的函数等导致的种种问题。用户往往会觉得 SQL 编写不易,且无从排查。FLIP-280 试图解决这类问题,如果我们在框架层面可以提供一种新的 Explain Level——PLAN_ADVICE,当用户执行 Explain 语句时,将当前 Query 可能存在的风险和优化随执行 Plan 一起打印出来,给出比较有针对性的建议,可能会缓解这种情况。

我们在内部的产品上集成了社区的框架,并且引入了更多的 Analyzer 去分析问题。当用户点击 SQL 的深度校验之后,Plan 和优化建议会一起返回。

2.2 元数据血缘

除了plan Advice 之外,我们还引入了元数据血缘功能。

如果用户使用 Catalog Table 读写的话,他可以在作业界面查看表与表之间关系。在实现方面,用户在提交作业时,就会触发异步的任务,通过 Calcite 分析含有字段的血缘,并将返回的结果保存下来。另外,如果是在 Catalog 视角,还可以查询字段之间的血缘关系。

以上就是最近一年阿里云在 SQL 产品方面做出的一些比较重要的突破。

五、展望

后面近期内阿里云 Flink 主要有两项工作:

先是优化流式湖仓的构建,具体而言,我们将探索如何进一步改善 Flink 与 Paimon 的集成,以便用户能够实现真正的流批一体化体验和应用方式。此外,我们不仅会在流式存储方面持续优化 Paimon,还将研究更适合 Flink 这一流式计算领域领导者的高效存储解决方案;同时,我们计划对数据治理体系进行优化,包括元数据管理和数据血缘等关键方面的改进。

其次,我们计划积极拥抱AI技术,探讨如何将 AI 或 AIGC 的高级能力融入 Flink 产品之中。这包括从 AI 自动生成代码开始,到利用AI技术加速并智能化地处理当前云用户的工单服务内容,从而在产品中更快速、更智能地实现智能诊断。此外,在前文提及的能力之中,我们还希望能够进一步拓展 Flink 的实时分析功能,包括情绪分析、舆情评估等多样化应用场景,在与 AIGC 的结合使用过程中,实现功能的进一步丰富和优化。

参考链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。