在介绍数据流中算子时,我们根据继承关系介绍,从父类向子类介绍。

下面,首先我们来看一下数据流中算子的相关接口。

1 KeyContext:支持键控流算子的接口

KeyContext 接口为键控算子(Keyed Operation)定义了设置和获取当前键(key)的方法:

setCurrentKey(Object key):将当前键值设为 keygetCurrentKey():获取当前键值

源码|Github|org.apache.flink.streaming.api.operators.KeyContext

public interface KeyContext {

void setCurrentKey(Object key);

Object getCurrentKey();

}

2 KeyContextHandler(内部 API)

KeyContextHandler 接口用于减少 Input#setKeyContextElement 方法的调用。通过 KeyContextHandler 接口的 hasKeyContext() 方法,可以获知算子是否包含 KeyContext,从而省略不存在 KeyContext 算子的 setKeyContextElement() 方法的调用。具体地,KeyContextHandler 接口定义了如下 3 个方法:

hasKeyContext():获取输入流是否包含 KeyContext,如果返回 False,则可以不再调用 Input#setKeyContextElement 方法hasKeyContext1():获取第一个输入流是否包含 KeyContexthasKeyContext2():获取第二个输入流是否包含 KeyContext

源码|Github|org.apache.flink.streaming.api.operators.KeyContextHandler

@Internal

public interface KeyContextHandler {

default boolean hasKeyContext() {

return hasKeyContext1();

}

default boolean hasKeyContext1() {

return true;

}

default boolean hasKeyContext2() {

return true;

}

}

3 CheckpointListener:支持 checkpoint 监听器的接口

CheckpointListener 接口用于在 checkpoint 事件发生后需执行其他操作的算子。例如一旦完成 checkpoint,则需要再外部系统提交事务的场景。CheckpointListener 接口定义了如下 2 个方法:

notifyCheckpointComplete(long checkpointId):告知监听者 checkpointId 对应的 checkpoint 已经完成被并确认。notifyCheckpointAborted(long checkpointId):告知监听者 checkpointId 对应的 checkpoint 已经被中止。这个方法有默认实现,在默认实现中不执行任何操作。

源码|Github|org.apache.flink.api.common.state.CheckpointListener

@Public

public interface CheckpointListener {

void notifyCheckpointComplete(long checkpointId) throws Exception;

default void notifyCheckpointAborted(long checkpointId) throws Exception {}

}

4 StreamOperator:数据流算子的基础接口

StreamOperator 接口时数据流算子(operator)的基础接口。在具体实现算子时,应该实现 OneInputStreamOperator 接口或 TwoInputStreamOperator 接口。

StreamOperator 接口继承了 CheckpointListener 接口,用于支持 checkpoint 监听器;继承了 KeyContext 接口,用于支持键控算子。

StreamOperator 接口有一个泛型 OUT,表示输出数据流的类型。

StreamOperator 接口定义了如下 3 个生命周期相关的方法:

open():这个方法会在处理数据元素之前被调用,应包含算子的初始化逻辑finish():这个方法会在数据处理完成后被调用close():这个方法会在算子生命周期结束前被调用,无论算子运行成功、失败还是取消都会被调用

StreamOperator 接口定义了如下 3 个状态快照相关的方法:

prepareSnapshotPreBarrier(long checkpointId):这个方法告知算子需要在发出它的 checkpoint barrier 之前制作一个快照OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation):构造一个算子的状态快照initializeState(StreamTaskStateInitializer streamTaskStateManager):根据 streamTaskStateManager 初始化算子的所有状态

此外,StreamOperator 还定义了如下 4 个方法:

setKeyContextElement1(StreamRecord record):设置在第 1 个数据流中的 keysetKeyContextElement2(StreamRecord record):设置在第 2 个数据流中的 keygetMetricGroup():获取指标组getOperatorID():获取算子 ID

源码|Github|org.apache.flink.streaming.api.operators.StreamOperator(修改注释)

@PublicEvolving

public interface StreamOperator extends CheckpointListener, KeyContext, Serializable {

// 生命周期

void open() throws Exception;

void finish() throws Exception;

void close() throws Exception;

// 状态快照

void prepareSnapshotPreBarrier(long checkpointId) throws Exception;

OperatorSnapshotFutures snapshotState(

long checkpointId,

long timestamp,

CheckpointOptions checkpointOptions,

CheckpointStreamFactory storageLocation)

throws Exception;

void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception;

// 其他方法

void setKeyContextElement1(StreamRecord record) throws Exception;

void setKeyContextElement2(StreamRecord record) throws Exception;

OperatorMetricGroup getMetricGroup();

OperatorID getOperatorID();

}

5 UserFunctionProvider:支持包含 UDF 算子的接口(内部 API)

UserFunctionProvider 接口提供了对包含 UDF 算子的支持。

接口包含继承自 Function 的泛型 F,其中 Function 为 org.apache.flink.api.common.functions.Function,是所有 UDF 的基础接口,详见 3.2 Function 接口的源码。

接口定义了方法 getUserFunction(),用于获取 UDF。

源码|Github|org.apache.flink.streaming.api.operators.UserFunctionProvider

@Internal

public interface UserFunctionProvider {

F getUserFunction();

}

6 Input:处理 1 个上游数据流的接口

Input 接口提供了处理上游数据流元素的支持。包含泛型 IN,表示输入数据的类型。定义了如下 6 个方法:

processElement(StreamRecord element):处理数据流中的数据记录processWatermark(Watermark mark):处理数据流中的 watermarkprocessWatermarkStatus(WatermarkStatus watermarkStatus):处理数据流中的 Watermark 空闲状态变化processLatencyMarker(LatencyMarker latencyMarker):处理数据流中的延迟标记setKeyContextElement(StreamRecord record):设置数据流中的键的上下文,用于从 record 中提取键,并将该键传给状态后端processRecordAttributes(RecordAttributes recordAttributes):处理记录属性(默认不执行任何操作,实验性功能)

源码|Github|org.apache.flink.streaming.api.operators.Input

@PublicEvolving

public interface Input {

void processElement(StreamRecord element) throws Exception;

void processWatermark(Watermark mark) throws Exception;

void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception;

void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;

void setKeyContextElement(StreamRecord record) throws Exception;

@Experimental

default void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {}

}

7 OneInputStreamOperator:包含 1 个上游数据流的算子

OneInputStreamOperator 接口为包含 1 个上游数据流算子的接口,继承了 StreamOperator 的数据流算子的基础接口以及 Input 处理上游数据流的接口。接口有 2 个泛型,IN 表示输入数据流类型,OUT 表示输出数据流类型。在 OneInputStreamOperator 接口中,重写了 Input 中的 setKeyContextElement(StreamRecord record) 方法,令其直接执行 StreamOperator 的 setKeyContextElement1(record) 接口。

源码|Github|org.apache.flink.streaming.api.operators.OneInputStreamOperator

@PublicEvolving

public interface OneInputStreamOperator extends StreamOperator, Input {

@Override

default void setKeyContextElement(StreamRecord record) throws Exception {

setKeyContextElement1(record);

}

}

8 TwoInputStreamOperator:包含 2 个上游数据流的算子

TwoInputStreamOperator 接口继承了 StreamOperator,并重新将 Input 接口中的处理上游数据流的方法写成了处理 2 个上游数据流的两套。接口有 3 个泛型,IN1 表示第 1 个输入数据流的类型,IN2 表示第 2 个输入数据流的类型,OUT 表示输出数据流的类型。具体地,TwoInputStreamOperator 接口定义了如下方法:

processElement1(StreamRecord element):处理第 1 个输入流中的数据记录processElement2(StreamRecord element):处理第 2 个输入流中的数据记录processWatermark1(Watermark mark):处理第 1 个输入流中的 WatermarkprocessWatermark2(Watermark mark):处理第 2 个输入流中的 WatermarkprocessLatencyMarker1(LatencyMarker latencyMarker):处理第 1 个输入流中的延迟标记processLatencyMarker2(LatencyMarker latencyMarker):处理第 2 个输入流中的延迟标记processWatermarkStatus1(WatermarkStatus watermarkStatus):处理第 1 个输入流中的 Watermark 空闲状态变化processWatermarkStatus2(WatermarkStatus watermarkStatus):处理第 2 个输入流中的 Watermark 空闲状态变化processRecordAttributes1(RecordAttributes recordAttributes):处理第 1 个输入流中的记录属性(默认不执行任何操作,实验性功能)processRecordAttributes2(RecordAttributes recordAttributes):处理第 2 个输入流中的记录属性(默认不执行任何操作,实验性功能)

源码|Github|org.apache.flink.streaming.api.operators.TwoInputStreamOperator

@PublicEvolving

public interface TwoInputStreamOperator extends StreamOperator {

void processElement1(StreamRecord element) throws Exception;

void processElement2(StreamRecord element) throws Exception;

void processWatermark1(Watermark mark) throws Exception;

void processWatermark2(Watermark mark) throws Exception;

void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;

void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;

void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception;

void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception;

@Experimental

default void processRecordAttributes1(RecordAttributes recordAttributes) throws Exception {}

@Experimental

default void processRecordAttributes2(RecordAttributes recordAttributes) throws Exception {}

}

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。