以下笔记基于对尚硅谷Java版Flink(2020版)的学习,Flink版本1.10
状态管理
算子状态(operator state)
算子状态的作用范围限定为算子任务。 这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。 Flink为算子状态提供三种基本数据结构: 列表状态(List state) 将状态表示为一组数据的列表。 联合列表状态(Union list state) 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。 广播状态(Broadcast state) 如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态。 官方文档示例
public class CountingFunction
// this count is the number of elements in the parallel subtask
private long count;
// 状态快照
@Override
public List
// return a single element - our count
return Collections.singletonList(count);
}
// 状态恢复
@Override
public void restoreState(List
// in case of scale in, this adds up counters from different original subtasks
// in case of scale out, list this may be empty
for (Long l : state) {
count += l;
}
}
@Override
public Tuple2
count++;
return new Tuple2<>(value, count);
}
}
键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。 Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。 当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。 Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)
Flink的Keyed State支持以下数据类型: ValueState[T]保存单个的值,值的类型为T。 创建:getRuntimeContext().getState(new ValueStateDescriptor
状态后端(State Backends)
MemoryStateBackend 内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。 特点:快速、低延迟,但不稳定
// 状态后端配置
env.setStateBackend( new MemoryStateBackend());
FsStateBackend 将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。 同时拥有内存级的本地访问速度和更好的容错保证
// 状态后端配置
env.setStateBackend( new FsStateBackend(String checkpointDataUri));
RocksDBStateBackend 将所有状态序列化后,存入本地的RocksDB中存储。 RocksDB本身是内存+文件的方式,适合状态值占用空间大容易耗尽内存的情况。
// 状态后端配置
env.setStateBackend( new RocksDBStateBackend(String checkpointDataUri));
可以在flink-conf.yaml里配置默认的状态后端
state.backend: jobmanager/filesystem/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.backend.incremental: false/true
容错机制
一致性检查点(Checkpoints)
Flink故障恢复机制的核心,就是应用状态的一致性检查点 有状态流应用的一致性检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候
从检查点恢复状态
在执行流应用程序期间,Flink会定期保存状态的一致检查点 如果发生故障,Flink将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程 遇到故障之后,第一步就是重启应用 第二步是从checkpoint中读取状态,将状态重置 从检查点重启应用程序后,其内部状态与检查点完成时的状态完全相同 第三步开始消费并处理检查点到发生故障之间的数据 这种检查点的保存和恢复机制可以为应用程序状态提供“精确一致”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样所有的输入流都会被重置到检查点完成时的位置
检查点的实现算法
Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting) 该算法大致基于Chandy-Lamport分布式快照算法 将检查点的保存和数据处理分离开,不暂停整个应用 Flink的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开 分界线之前来到的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;分界线之后的数据导致的所有更改,就会被包含在之后的检查点中 JobManager会向每个source任务发送一条带有新检查点ID的消息,通过这种方式来启动检查点 数据源将它们的状态写入检查点,并发出一个检查点barrier,状态后端在状态存入检查点后会返回通知给source任务,source任务就会向JobManager确认检查点完成 分界线对齐:barrier向下游传递,任务会等待所有输入分区的barrier到达,对于barrier已到达的分区,继续到达的数据会被缓存,而barrier未到达的分区,数据就会被正常处理 当收到所有输入分区的barrier时,任务就将其状态保存到后端的检查点中,然后将barrier继续向下游转发,任务继续正常的数据处理 Sink任务向JobManager确认状态保存到checkpoint完毕 当所有任务都已经成功将状态保存到检查点,检查点就真正完成了
保存点(Savepoints)
Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints) 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是特殊的检查点 Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等
检查点配置代码
// 检查点配置,传入检查点周期
env.enableCheckpointing(300);
// 高级选项
// 检查点模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 检查点超时时间
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 最大同时进行的检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 前一个检查点结束到后一个检查点开始的最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
// 恢复时是否检查点优先于保存点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 可容忍的检查点故障次数
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
// 重启策略配置
// 固定延迟重启,允许重启3次,每次重启间隔10秒
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
// 失败率重启,10分钟内允许重启3次,每次重启间隔1分钟
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
状态一致性
一致性级别
at-most-once: 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。 at-least-once: 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。 exactly-once: 这指的是系统保证在发生故障后得到的计数结果与正确值一致。
端到端(end-to-end)状态一致性
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下: 内部保证 —— 依赖checkpoint source 端 —— 需外部源,可重设数据的读取位置 sink 端 —— 需保证,从故障恢复时,数据不会重复写入外部系统 而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。 幂等写入 所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 事务写入 需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。 对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。 预写日志(WAL) 把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink 系统 简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定 DataStream API提供了一个模板类:GenericWriteAheadSink,来实现这种事务性sink 两阶段提交(2PC) 对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收的数据添加到事务里 然后将这些数据写入外部sink系统,但不提交它们——这时只是“预提交” 当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入 这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口。 两阶段提交(2PC)对外部sink系统的要求 外部 sink系统必须提供事务支持,或者sink任务必须能够模拟外部系统上的事务 在checkpoint的间隔期间里,必须能够开启一个事务并接受数据写入在收到checkpoint完成的通知之前,事务必须是"等待提交"的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候sink系统关闭事务(例如超时了),那么未提交的数据就会丢失 sink 任务必须能够在进程失败后恢复事务 提交事务必须是幂等操作
不同 Source 和 Sink 的一致性保证可以用下表说明:
sink\source不可重置可重置任意(Any)At-most-onceAt-least-once幂等At-most-once Exactly-once (故源恢复时会出现国时不一致) 预写日志(WAL)At-most-onceAt-least-once两阶段提交(2PC)At-most-onceExactly-once
ProcessFunction API(底层API)
DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。 Flink提供了8个Process Function: • ProcessFunction • KeyedProcessFunction • CoProcessFunction • ProcessJoinFunction • BroadcastProcessFunction • KeyedBroadcastProcessFunction • ProcessWindowFunction • ProcessAllWindowFunction
KeyedProcessFunction
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。 所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。 而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法: • processElement(v: IN, ctx: Context, out: Collector[OUT]) 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。 Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。 Context还可以将结果输出到别的流(side outputs)。 • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]) 是一个回调函数。当之前注册的定时器触发时调用。 参数timestamp为定时器所设定的触发的时间戳。 Collector为输出结果的集合。 OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
TimerService 和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法: • currentProcessingTime(): Long 返回当前处理时间 • currentWatermark(): Long 返回当前watermark的时间戳 • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。 • registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。 • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。 • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。 当定时器timer触发时,会执行回调函数onTimer()。 注意定时器timer只能在keyed streams上面使用。
KeyedProcessFunction和定时器的使用示例
// 实现自定义处理函数,检测一段时间内的温度连续上升,输出报警
public static class TempConsIncreWarning extends KeyedProcessFunction
// 定义私有属性,当前统计的时间间隔
private Integer interval;
public TempConsIncreWarning(Integer interval) {
this.interval = interval;
}
// 定义状态,保存上一次的温度值,定时器时间戳
private ValueState
private ValueState
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor
}
@Override
public void processElement(SensorReading value, Context ctx, Collector
// 取出状态
Double lastTemp = lastTempState.value();
Long timerTs = timerTsState.value();
// 如果温度上升并且没有定时器,注册10秒后的定时器,开始等待
if( value.getTemperature() > lastTemp && timerTs == null ){
// 计算出定时器时间戳
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
ctx.timerService().registerProcessingTimeTimer(ts);
timerTsState.update(ts);
}
// 如果温度下降,那么删除定时器
else if( value.getTemperature() < lastTemp && timerTs != null ){
ctx.timerService().deleteProcessingTimeTimer(timerTs);
timerTsState.clear();
}
// 更新温度状态
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector
// 定时器触发,输出报警信息
out.collect("传感器" + ctx.getCurrentKey().getField(0) + "温度值连续" + interval + "s上升");
timerTsState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
侧输出流(SideOutput)
// 定义一个OutputTag,用来表示侧输出流低温流
OutputTag
};
// 测试ProcessFunction,自定义侧输出流实现分流操作
SingleOutputStreamOperator
@Override
public void processElement(SensorReading value, Context ctx, Collector
// 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
if( value.getTemperature() > 30 ){
out.collect(value);
}
else {
ctx.output(lowTempTag, value);
}
}
});
highTempStream.print("high-temp");
highTempStream.getSideOutput(lowTempTag).print("low-temp");
CoProcessFunction
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。 CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。 类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。
好文阅读
发表评论