在介绍数据流中算子时,我们根据继承关系介绍,从父类向子类介绍。
下面,首先我们来看一下数据流中算子的相关接口。
1 KeyContext:支持键控流算子的接口
KeyContext 接口为键控算子(Keyed Operation)定义了设置和获取当前键(key)的方法:
setCurrentKey(Object key):将当前键值设为 keygetCurrentKey():获取当前键值
源码|Github|org.apache.flink.streaming.api.operators.KeyContext
public interface KeyContext {
void setCurrentKey(Object key);
Object getCurrentKey();
}
2 KeyContextHandler(内部 API)
KeyContextHandler 接口用于减少 Input#setKeyContextElement 方法的调用。通过 KeyContextHandler 接口的 hasKeyContext() 方法,可以获知算子是否包含 KeyContext,从而省略不存在 KeyContext 算子的 setKeyContextElement() 方法的调用。具体地,KeyContextHandler 接口定义了如下 3 个方法:
hasKeyContext():获取输入流是否包含 KeyContext,如果返回 False,则可以不再调用 Input#setKeyContextElement 方法hasKeyContext1():获取第一个输入流是否包含 KeyContexthasKeyContext2():获取第二个输入流是否包含 KeyContext
源码|Github|org.apache.flink.streaming.api.operators.KeyContextHandler
@Internal
public interface KeyContextHandler {
default boolean hasKeyContext() {
return hasKeyContext1();
}
default boolean hasKeyContext1() {
return true;
}
default boolean hasKeyContext2() {
return true;
}
}
3 CheckpointListener:支持 checkpoint 监听器的接口
CheckpointListener 接口用于在 checkpoint 事件发生后需执行其他操作的算子。例如一旦完成 checkpoint,则需要再外部系统提交事务的场景。CheckpointListener 接口定义了如下 2 个方法:
notifyCheckpointComplete(long checkpointId):告知监听者 checkpointId 对应的 checkpoint 已经完成被并确认。notifyCheckpointAborted(long checkpointId):告知监听者 checkpointId 对应的 checkpoint 已经被中止。这个方法有默认实现,在默认实现中不执行任何操作。
源码|Github|org.apache.flink.api.common.state.CheckpointListener
@Public
public interface CheckpointListener {
void notifyCheckpointComplete(long checkpointId) throws Exception;
default void notifyCheckpointAborted(long checkpointId) throws Exception {}
}
4 StreamOperator:数据流算子的基础接口
StreamOperator 接口时数据流算子(operator)的基础接口。在具体实现算子时,应该实现 OneInputStreamOperator 接口或 TwoInputStreamOperator 接口。
StreamOperator 接口继承了 CheckpointListener 接口,用于支持 checkpoint 监听器;继承了 KeyContext 接口,用于支持键控算子。
StreamOperator 接口有一个泛型 OUT,表示输出数据流的类型。
StreamOperator 接口定义了如下 3 个生命周期相关的方法:
open():这个方法会在处理数据元素之前被调用,应包含算子的初始化逻辑finish():这个方法会在数据处理完成后被调用close():这个方法会在算子生命周期结束前被调用,无论算子运行成功、失败还是取消都会被调用
StreamOperator 接口定义了如下 3 个状态快照相关的方法:
prepareSnapshotPreBarrier(long checkpointId):这个方法告知算子需要在发出它的 checkpoint barrier 之前制作一个快照OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation):构造一个算子的状态快照initializeState(StreamTaskStateInitializer streamTaskStateManager):根据 streamTaskStateManager 初始化算子的所有状态
此外,StreamOperator 还定义了如下 4 个方法:
setKeyContextElement1(StreamRecord> record):设置在第 1 个数据流中的 keysetKeyContextElement2(StreamRecord> record):设置在第 2 个数据流中的 keygetMetricGroup():获取指标组getOperatorID():获取算子 ID
源码|Github|org.apache.flink.streaming.api.operators.StreamOperator(修改注释)
@PublicEvolving
public interface StreamOperator
// 生命周期
void open() throws Exception;
void finish() throws Exception;
void close() throws Exception;
// 状态快照
void prepareSnapshotPreBarrier(long checkpointId) throws Exception;
OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory storageLocation)
throws Exception;
void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception;
// 其他方法
void setKeyContextElement1(StreamRecord> record) throws Exception;
void setKeyContextElement2(StreamRecord> record) throws Exception;
OperatorMetricGroup getMetricGroup();
OperatorID getOperatorID();
}
5 UserFunctionProvider:支持包含 UDF 算子的接口(内部 API)
UserFunctionProvider 接口提供了对包含 UDF 算子的支持。
接口包含继承自 Function 的泛型 F,其中 Function 为 org.apache.flink.api.common.functions.Function,是所有 UDF 的基础接口,详见 3.2 Function 接口的源码。
接口定义了方法 getUserFunction(),用于获取 UDF。
源码|Github|org.apache.flink.streaming.api.operators.UserFunctionProvider
@Internal
public interface UserFunctionProvider
F getUserFunction();
}
6 Input:处理 1 个上游数据流的接口
Input 接口提供了处理上游数据流元素的支持。包含泛型 IN,表示输入数据的类型。定义了如下 6 个方法:
processElement(StreamRecord
源码|Github|org.apache.flink.streaming.api.operators.Input
@PublicEvolving
public interface Input
void processElement(StreamRecord
void processWatermark(Watermark mark) throws Exception;
void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
void setKeyContextElement(StreamRecord
@Experimental
default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}
}
7 OneInputStreamOperator:包含 1 个上游数据流的算子
OneInputStreamOperator 接口为包含 1 个上游数据流算子的接口,继承了 StreamOperator 的数据流算子的基础接口以及 Input 处理上游数据流的接口。接口有 2 个泛型,IN 表示输入数据流类型,OUT 表示输出数据流类型。在 OneInputStreamOperator 接口中,重写了 Input 中的 setKeyContextElement(StreamRecord
源码|Github|org.apache.flink.streaming.api.operators.OneInputStreamOperator
@PublicEvolving
public interface OneInputStreamOperator
@Override
default void setKeyContextElement(StreamRecord
setKeyContextElement1(record);
}
}
8 TwoInputStreamOperator:包含 2 个上游数据流的算子
TwoInputStreamOperator 接口继承了 StreamOperator
processElement1(StreamRecord
源码|Github|org.apache.flink.streaming.api.operators.TwoInputStreamOperator
@PublicEvolving
public interface TwoInputStreamOperator
void processElement1(StreamRecord
void processElement2(StreamRecord
void processWatermark1(Watermark mark) throws Exception;
void processWatermark2(Watermark mark) throws Exception;
void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;
void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception;
void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception;
@Experimental
default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}
@Experimental
default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}
}
推荐链接
发表评论