目录
1.背景
2.coGroup算子源码分析
2.1完整的coGroup算子调用流程
2.2coGroup方法入口
2.3 CoGroupedStreams对象分析
2.4WithWindow内部类分析
2.5CoGroupWindowFunction函数分析
3.修改源码支持获取迟到数据测输出流
3.1复制CoGroupedStreams
3.2新增WithWindow.sideOutputLateData方法
3.3新增WithWindow构造方法
3.4修改apply方法
3.5开放UnionTypeInfo类的public权限
3.6编译Flink源码flink-streaming-java模块
3.7项目中查看maven是否已经刷新为最新代码
4.测试
1.背景
coGroup算子开窗到时间关闭之后,迟到数据无法通过测输出流提取,intervalJoin算子提供了api,因为join算子底层就是coGroup算子,所以Join算子也不行。
flink版本 v1.17.1
2.coGroup算子源码分析
2.1完整的coGroup算子调用流程
input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness)
.apply(cgroupFunction)
通过上述代码可以看到没有sideOutputLateData的相关方法,用来提取窗口关闭之后的迟到数据
2.2coGroup方法入口
其中创建了一个CoGroupedStreams流对象
/**
* Creates a join operation. See {@link CoGroupedStreams} for an example of how the keys and
* window can be specified.
*/
public
return new CoGroupedStreams<>(this, otherStream);
}
2.3 CoGroupedStreams对象分析
他可以理解为构造设计模式的一个Builder类,通过where方法配置第一条流的KeySelector,再返回一个CoGroupedStreams的内部类Where,再通过equalTo方法配置第二条流的KeySelector,再返回EqualTo内部类,window方法配置窗口划分器,返回WithWindow内部类,后续都是窗口的配置 trigger,evictor,allowedLateness配置窗口参数,最后调用apply方法传送用户业务函数
2.4WithWindow内部类分析
WithWindow是最终保存所有配置的内部类包括两条流,窗口配置,key提取器的配置,最终会用户调用apply方法触发CoGroup的业务,在apply方法中通过union联合两条流,然后通过keyby转为KeyedStream,再通过window配置窗口,最终调用窗口函数的apply方法,传入WindowFunction,做CoGroup的业务与用户业务。
具体代码如下已写好备注
/**
* A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs as
* well as a {@link WindowAssigner}.
*
* @param
* @param
* @param
* @param
*/
@Public
public static class WithWindow
//第一条流
private final DataStream
//第二条流
private final DataStream
//第一个key提取器
private final KeySelector
//第二个Key提取器
private final KeySelector
//Key的类型
private final TypeInformation
//窗口分配器
private final WindowAssigner super TaggedUnion
//窗口出发计算器
private final Trigger super TaggedUnion
private final Evictor super TaggedUnion
private final Time allowedLateness;
private WindowedStream
//构造函数给上面对象赋值
protected WithWindow(
DataStream
DataStream
KeySelector
KeySelector
TypeInformation
WindowAssigner super TaggedUnion
Trigger super TaggedUnion
Evictor super TaggedUnion
Time allowedLateness) {
this.input1 = input1;
this.input2 = input2;
this.keySelector1 = keySelector1;
this.keySelector2 = keySelector2;
this.keyType = keyType;
this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
this.allowedLateness = allowedLateness;
}
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
*
Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
* parallelism.
*/
public
CoGroupFunction
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
//创建合并两个流的公共TypeInfo,UnionTypeInfo最终会将Input1,Input2的数据通过map算子转换为该类型
UnionTypeInfo
new UnionTypeInfo<>(input1.getType(), input2.getType());
//转换成union的KeySelector
UnionKeySelector
new UnionKeySelector<>(keySelector1, keySelector2);
//将taggedInput1的数据类容map成UnionTypeInfo
SingleOutputStreamOperator
input1.map(new Input1Tagger
taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
taggedInput1.returns(unionType);
//将taggedInput2的数据类容map成UnionTypeInfo
SingleOutputStreamOperator
input2.map(new Input2Tagger
taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
taggedInput2.returns(unionType);
//将两个流进行union
DataStream
//keyBy并且开窗
windowedStream =
new KeyedStream
unionStream, unionKeySelector, keyType)
.window(windowAssigner);
//配置窗口触发器
if (trigger != null) {
windowedStream.trigger(trigger);
}
//配置移除器
if (evictor != null) {
windowedStream.evictor(evictor);
}
//配置allowedLateness
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
//创建CoGroupWindowFunction ,并把用户函数传入进去
return windowedStream.apply(
new CoGroupWindowFunction
}
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
*
Note: This is a temporary workaround while the {@link #apply(CoGroupFunction,
* TypeInformation)} method has the wrong return type and hence does not allow one to set an
* operator-specific parallelism
*
* @deprecated This method will be removed once the {@link #apply(CoGroupFunction,
* TypeInformation)} method is fixed in the next major version of Flink (2.0).
*/
@PublicEvolving
@Deprecated
public
CoGroupFunction
return (SingleOutputStreamOperator
}
@VisibleForTesting
Time getAllowedLateness() {
return allowedLateness;
}
//获取窗口包装流,但是标记为VisibleForTesting,用户无法调用,如果可以调用的话可以通过该方法获取包装流之后通过窗口流获取迟到数据的测输出流
@VisibleForTesting
WindowedStream
return windowedStream;
}
}
2.5CoGroupWindowFunction函数分析
CoGroupWindowFunction也是CoGroupedStreams内部类,负责做CoGroup的业务,最终将数据封装好转发给用户函数(也就是2.1中apply中的cgroupFunction)
private static class CoGroupWindowFunction
extends WrappingFunction
implements WindowFunction
private static final long serialVersionUID = 1L;
public CoGroupWindowFunction(CoGroupFunction
super(userFunction);
}
@Override
public void apply(KEY key, W window, Iterable
throws Exception {
//缓存当前窗口里1号流的数据
List
//缓存当前窗口里2号流的数据
List
for (TaggedUnion
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
//传入到用户函数中
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}
3.修改源码支持获取迟到数据测输出流
思路 复制CoGroupedStreams新增一个NewCoGroupedStreams,在WithWindow函数中增加方法sideOutputLateData,让用户传入outputTag,用于提取窗口关闭后的测输出流。
3.1复制CoGroupedStreams
3.2新增WithWindow.sideOutputLateData方法
新增该方法,传入outputTag,下图WithWindow构造方法是3.3新增的
@PublicEvolving
public WithWindow
OutputTag
return new WithWindow<>(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
trigger,
evictor,
allowedLateness,
outputTag
);
}
3.3新增WithWindow构造方法
新增属性laterDataOutputTag,用来保存构造函数中传入的laterOutputTag
protected WithWindow(
DataStream
DataStream
KeySelector
KeySelector
TypeInformation
WindowAssigner super TaggedUnion
Trigger super TaggedUnion
Evictor super TaggedUnion
Time allowedLateness,
OutputTag
) {
this(
input1,
input2,
keySelector1,
keySelector2,
keyType,
windowAssigner,
trigger,
evictor,
allowedLateness);
this.lateDataOutputTag = laterOutputTag;
}
3.4修改apply方法
判断lateDataOutputTag 是否为null,如果不为null则调用windowedStream的sideOutputLateData设置迟到数据tag
/**
* Completes the co-group operation with the user function that is executed for windowed
* groups.
*
*
Note: This method's return type does not support setting an operator-specific
* parallelism. Due to binary backwards compatibility, this cannot be altered. Use the
* {@link #with(CoGroupFunction, TypeInformation)} method to set an operator-specific
* parallelism.
*/
public
CoGroupFunction
// clean the closure
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo
new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector
new UnionKeySelector<>(keySelector1, keySelector2);
SingleOutputStreamOperator
input1.map(new Input1Tagger
taggedInput1.getTransformation().setParallelism(input1.getParallelism(), false);
taggedInput1.returns(unionType);
SingleOutputStreamOperator
input2.map(new Input2Tagger
taggedInput2.getTransformation().setParallelism(input2.getParallelism(), false);
taggedInput2.returns(unionType);
DataStream
// we explicitly create the keyed stream to manually pass the key type information in
windowedStream =
new KeyedStream
unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
//判断lateDataOutputTag是否为NULL,如果不为NULL,则调用windowedStream
//的sideOutputLateData方法,传入lateDataOutputTag让迟到数据输出到测输出流中
if (lateDataOutputTag != null) {
windowedStream.sideOutputLateData(lateDataOutputTag);
}
return windowedStream.apply(
new CoGroupWindowFunction
}
3.5开放UnionTypeInfo类的public权限
该类就是union之后的公共类的类型 oneType代表Input1流的数据类型,TwoType代表Input2流的数据类型
3.6编译Flink源码flink-streaming-java模块
进入到flink-streaming-java所在磁盘目录输入以下命令编译
mvn clean install -DskipTests -Dfast
编译成功
3.7项目中查看maven是否已经刷新为最新代码
编译之后,可以看到导入的maven包已经有了新增的NewCoGroupedStreams类了,注意项目中的maven依赖中的flink版本,要与编译源码的版本一致,否则无法引入到。
4.测试
新建两个流,通过new NewCoGroupedStreams创建对象,在allowedLateness之后通过sideOutputLateData设置outputTag,然后通过with方法触发业务,with底层也是调用了apply,只不过他帮我们把返回的流转为了SingleOutputStreamOperator类型,可以用于提取测输出流。最后通过with.getSideOutput(outputTag)提取测输出流,最后通过map转换为 Tuple2
OutputTag
new NewCoGroupedStreams.UnionTypeInfo<>(Types.POJO(WaterSensor.class), Types.POJO(WaterSensor.class)));
NewCoGroupedStreams
SingleOutputStreamOperator
.allowedLateness(Time.seconds(3))
.sideOutputLateData(outputTag)
.with(new RichCoGroupFunction
@Override
public void coGroup(Iterable
out.collect(first.toString() + "======" + second.toString());
}
});
with.print();
with.getSideOutput(outputTag).map(new MapFunction
@Override
public Tuple2
return value.isOne() ? Tuple2.of(1, value.getOne()) : Tuple2.of(2, value.getTwo());
}
}).print();
可以看到下图结果,ts代表时间戳,第一个打印是RichCoGroupFunction打印,代表关闭了1~10s的时间窗,后面我们在输入,WaterSensor{id='a', ts=1, vc=1} 就通过测输出流打印为二元组了
参考文章
发表评论