目录

1.背景

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

2.2coGroup方法入口

2.3 CoGroupedStreams对象分析

2.4WithWindow内部类分析

2.5CoGroupWindowFunction函数分析

3.修改源码支持获取迟到数据测输出流

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

3.3新增WithWindow构造方法

3.4修改apply方法

3.5开放UnionTypeInfo类的public权限

​3.6编译Flink源码flink-streaming-java模块

3.7项目中查看maven是否已经刷新为最新代码

4.测试

1.背景

coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。

flink版本 v1.17.1

2.coGroup算子源码分析

2.1完整的coGroup算子调用流程

input1.coGroup(input2)

.where(keySelector1)

.equalTo(keySelector2)

.window(windowAssigner)

.trigger(trigger)

.evictor(evictor)

.allowedLateness(allowedLateness)

.apply(cgroupFunction)

通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据

2.2coGroup方法入口

其中创建了一个CoGroupedStreams流对象

/**

* Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and

* window can be specified.

*/

public CoGroupedStreams coGroup(DataStream otherStream) {

return new CoGroupedStreams<>(this, otherStream);

}

2.3 CoGroupedStreams对象分析

他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数

2.4WithWindow内部类分析

WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。

具体代码如下已写好备注

/**

* A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as

* well as a {@link WindowAssigner}.

*

* @param Type of the elements from the first input

* @param Type of the elements from the second input

* @param Type of the key. This must be the same for both inputs

* @param Type of {@link Window} on which the co-group operation works.

*/

@Public

public static class WithWindow {

//第一条流

private final DataStream input1;

//第二条流

private final DataStream input2;

//第一个key提取器

private final KeySelector keySelector1;

//第二个Key提取器

private final KeySelector keySelector2;

//Key的类型

private final TypeInformation keyType;

//窗口分配器

private final WindowAssigner, W> windowAssigner;

//窗口出发计算器

private final Trigger, ? super W> trigger;

private final Evictor, ? super W> evictor;

private final Time allowedLateness;

private WindowedStream, KEY, W> windowedStream;

//构造函数给上面对象赋值

protected WithWindow(

DataStream input1,

DataStream input2,

KeySelector keySelector1,

KeySelector keySelector2,

TypeInformation keyType,

WindowAssigner, W> windowAssigner,

Trigger, ? super W> trigger,

Evictor, ? super W> evictor,

Time allowedLateness) {

this.input1 = input1;

this.input2 = input2;

this.keySelector1 = keySelector1;

this.keySelector2 = keySelector2;

this.keyType = keyType;

this.windowAssigner = windowAssigner;

this.trigger = trigger;

this.evictor = evictor;

this.allowedLateness = allowedLateness;

}

/**

* Completes the co-group operation with the user function that is executed for windowed

* groups.

*

*

Note: This method's return type does not support setting an operator-specific

* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the

* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific

* parallelism.

*/

public DataStream apply(

CoGroupFunction function, TypeInformation resultType) {

// clean the closure

function = input1.getExecutionEnvironment().clean(function);

//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型

UnionTypeInfo unionType =

new UnionTypeInfo<>(input1.getType(), input2.getType());

//转换成union的KeySelector

UnionKeySelector unionKeySelector =

new UnionKeySelector<>(keySelector1, keySelector2);

//将taggedInput1的数据类容map成UnionTypeInfo类型

SingleOutputStreamOperator> taggedInput1 =

input1.map(new Input1Tagger());

taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);

taggedInput1.returns(unionType);

//将taggedInput2的数据类容map成UnionTypeInfo类型

SingleOutputStreamOperator> taggedInput2 =

input2.map(new Input2Tagger());

taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);

taggedInput2.returns(unionType);

//将两个流进行union

DataStream> unionStream = taggedInput1.union(taggedInput2);

//keyBy并且开窗

windowedStream =

new KeyedStream, KEY>(

unionStream, unionKeySelector, keyType)

.window(windowAssigner);

//配置窗口触发器

if (trigger != null) {

windowedStream.trigger(trigger);

}

//配置移除器

if (evictor != null) {

windowedStream.evictor(evictor);

}

//配置allowedLateness

if (allowedLateness != null) {

windowedStream.allowedLateness(allowedLateness);

}

//创建CoGroupWindowFunction ,并把用户函数传入进去

return windowedStream.apply(

new CoGroupWindowFunction(function), resultType);

}

/**

* Completes the co-group operation with the user function that is executed for windowed

* groups.

*

*

Note: This is a temporary workaround while the {@link #apply(CoGroupFunction,

* TypeInformation)} method has the wrong return type and hence does not allow one to set an

* operator-specific parallelism

*

* @deprecated This method will be removed once the {@link #apply(CoGroupFunction,

* TypeInformation)} method is fixed in the next major version of Flink (2.0).

*/

@PublicEvolving

@Deprecated

public SingleOutputStreamOperator with(

CoGroupFunction function, TypeInformation resultType) {

return (SingleOutputStreamOperator) apply(function, resultType);

}

@VisibleForTesting

Time getAllowedLateness() {

return allowedLateness;

}

//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流

@VisibleForTesting

WindowedStream, KEY, W> getWindowedStream() {

return windowedStream;

}

}

2.5CoGroupWindowFunction函数分析

CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)

private static class CoGroupWindowFunction

extends WrappingFunction>

implements WindowFunction, T, KEY, W> {

private static final long serialVersionUID = 1L;

public CoGroupWindowFunction(CoGroupFunction userFunction) {

super(userFunction);

}

@Override

public void apply(KEY key, W window, Iterable> values, Collector out)

throws Exception {

//缓存当前窗口里1号流的数据

List oneValues = new ArrayList<>();

//缓存当前窗口里2号流的数据

List twoValues = new ArrayList<>();

for (TaggedUnion val : values) {

if (val.isOne()) {

oneValues.add(val.getOne());

} else {

twoValues.add(val.getTwo());

}

}

//传入到用户函数中

wrappedFunction.coGroup(oneValues, twoValues, out);

}

}

3.修改源码支持获取迟到数据测输出流

思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。

3.1复制CoGroupedStreams

3.2新增WithWindow.sideOutputLateData方法

新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的

@PublicEvolving

public WithWindow sideOutputLateData(

OutputTag> outputTag) {

return new WithWindow<>(

input1,

input2,

keySelector1,

keySelector2,

keyType,

windowAssigner,

trigger,

evictor,

allowedLateness,

outputTag

);

}

3.3新增WithWindow构造方法

新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag

protected WithWindow(

DataStream input1,

DataStream input2,

KeySelector keySelector1,

KeySelector keySelector2,

TypeInformation keyType,

WindowAssigner, W> windowAssigner,

Trigger, ? super W> trigger,

Evictor, ? super W> evictor,

Time allowedLateness,

OutputTag> laterOutputTag

) {

this(

input1,

input2,

keySelector1,

keySelector2,

keyType,

windowAssigner,

trigger,

evictor,

allowedLateness);

this.lateDataOutputTag = laterOutputTag;

}

3.4修改apply方法

判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag

/**

* Completes the co-group operation with the user function that is executed for windowed

* groups.

*

*

Note: This method's return type does not support setting an operator-specific

* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the

* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific

* parallelism.

*/

public DataStream apply(

CoGroupFunction function, TypeInformation resultType) {

// clean the closure

function = input1.getExecutionEnvironment().clean(function);

UnionTypeInfo unionType =

new UnionTypeInfo<>(input1.getType(), input2.getType());

UnionKeySelector unionKeySelector =

new UnionKeySelector<>(keySelector1, keySelector2);

SingleOutputStreamOperator> taggedInput1 =

input1.map(new Input1Tagger());

taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);

taggedInput1.returns(unionType);

SingleOutputStreamOperator> taggedInput2 =

input2.map(new Input2Tagger());

taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);

taggedInput2.returns(unionType);

DataStream> unionStream = taggedInput1.union(taggedInput2);

// we explicitly create the keyed stream to manually pass the key type information in

windowedStream =

new KeyedStream, KEY>(

unionStream, unionKeySelector, keyType)

.window(windowAssigner);

if (trigger != null) {

windowedStream.trigger(trigger);

}

if (evictor != null) {

windowedStream.evictor(evictor);

}

if (allowedLateness != null) {

windowedStream.allowedLateness(allowedLateness);

}

//判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream

//的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中

if (lateDataOutputTag != null) {

windowedStream.sideOutputLateData(lateDataOutputTag);

}

return windowedStream.apply(

new CoGroupWindowFunction(function), resultType);

}

3.5开放UnionTypeInfo类的public权限

该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型

3.6编译Flink源码flink-streaming-java模块

进入到flink-streaming-java所在磁盘目录输入以下命令编译

mvn clean install -DskipTests -Dfast

编译成功

3.7项目中查看maven是否已经刷新为最新代码

编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。

4.测试

新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2 类型进行打印

OutputTag> outputTag = new OutputTag<>("later",

new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));

NewCoGroupedStreams newCgroupStream = new NewCoGroupedStreams<>(ds1, ds2);

SingleOutputStreamOperator with = newCgroupStream.where((x) -> x.getId()).equalTo(x -> x.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10)))

.allowedLateness(Time.seconds(3))

.sideOutputLateData(outputTag)

.with(new RichCoGroupFunction() {

@Override

public void coGroup(Iterable first, Iterable second, Collector out) throws Exception {

out.collect(first.toString() + "======" + second.toString());

}

});

with.print();

with.getSideOutput(outputTag).map(new MapFunction, Tuple2>() {

@Override

public Tuple2 map(NewCoGroupedStreams.TaggedUnion value) throws Exception {

return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());

}

}).print();

可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了

参考文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。