在批处理统计中,我们可以等待一批数据都到齐后,统一处理。但是在实时处理统计中,我们是来一条就得处理一条,那么我们怎么统计最近一段时间内的数据呢?引入“窗口”。

所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。

一.窗口(Window)

1.1 窗口的概念

Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。

正确理解:在Flink中,窗口其实并不是一个“框”,应该把窗口理解成一个“桶”。在Flink中,窗口可以把流切割成有限大小的多个“存储桶” (bcket): 每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

且Flink 中的窗口并不是事先创建好的,而是动态创建的。当有落在窗口范围中的数据到达时才会创建对应的窗口。

例如需要将数据按照时间进行统计计算,就可以将数据按小时进行分桶,0点~1点放在一个桶中,1点~两点放到一个桶中。

窗口是由窗口分配器和窗口函数组成的。

1.2 窗口的分类

Flink 中除了最简单的时间窗口外,还可以使用各种不同类型的窗口来实现需求。

1.2.1 按照驱动(度量)类型分

窗口其实截取有界流的一种方式,如何定义截取的开始时机和结束时机,这就叫做窗口的驱动类型。

(1) 时间窗口(Time Window)

时间窗口就是以时间点来定义窗口的开始和结束,截取出的就是某一时间段的数据。到达结束时间,窗口则不再继续收集数据,触发计算输出结果,并将窗口销毁关闭。

时间窗口并不是以第一条数据来的时间+窗口长度为一个窗口,而且整数向下取整。

例如:

一个基于时间的窗口,且窗口长度为7。

1分12秒一条数据达到,其实这条数据不属于 [ 12 ,19 ),而是属于 [ 10,20 )。

(2) 计数窗口(Count Window)

计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。

1.2.2 按照窗口分配数据的规则分类

根据分配数据的规则,窗口的具体实现可以分为4类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

(1)  滚动窗口(Tumbling Window)

滚动窗口有固定的大小,是一种对数据的“均匀切分”的划分方式。窗口间不会重叠,也不会产生间隔,每个数据只会属于一个窗口。

滚动窗口可以根据时间和数据个数定义,需要的参数就是窗口大小(window size)。例如可以定义长度为1小时的滚动窗口,则每小时会进行一次统计,也可以定义一个长度为10的滚动计数窗口,则每10个数会进行一次统计。

应用:对每个时间段做聚合统计。

(2) 滑动窗口(Sliding Window)

滑动窗口的大小也是固定的,当窗口间并不一定是无缝连接的,可以错开一定的位置。

定义滑动窗口的参数有两个:除去窗口大小 (window size)之外,还有一个“滑动步长”(window slide)它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果,那么滑动步长就代表了计算频率。

当滑动步长小于窗口大小时,滑动窗口就会出现重叠,这时数据也可能会被同时分配到多个窗口中。而具体的个数就由窗口大小和滑动步长的比值 (size/slide) 来决定。 滚动窗口也可以看作是一种特殊的滑动窗口一-窗口大小等于滑动步长 (size =slide)。

滑动窗口适合计算结果更新频率非常高的场景。

同样的,滑动窗口也支持以时间和数据个数来定义。

(3) 会话窗口(Session Window)

会话窗口,是基于“会话” (session) 来来对数据进行分组的。会话窗口只能基于时间来定义。

会话窗口中,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果两条数据达到的间隔小于定义的会话超时时间,那为保持会话,数据都属于同一个窗口;如果两条数据达到的间隔大于定义的会话超时时间,则为两个不同的会话,数据也就不在一个窗口。

(4) 全局窗口(Global Window) 

这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候, 默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器” (Tiigger)

1.2.3 四种时间窗口的演示

(1)  滚动窗口

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

KeyedStream sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 1、指定 窗口分配器 使用滚动窗口,窗口长度为10s,每10s的数据在一个窗口内

WindowedStream sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

SingleOutputStreamOperator process = sensorWs.process(new ProcessWindowFunction() {

/**

*

* @param s The key for which this window is evaluated. 该窗口的 Key

* @param context The context in which the window is being evaluated. 窗口上下文

* @param elements The elements in the window being evaluated. 窗口中所有的数据

* @param out A collector for emitting elements. 采集器

* @throws Exception

*/

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

});

process.print();

env.execute();

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,5,5

s1,6,6

结果: 

 (2) 滑动窗口

// 2、 滑动窗口,窗口长度为10s,滑动步长为 5s (窗口重叠 5s)

WindowedStream sensorWs = sensorKS

.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)));

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,5,5

s1,6,6

s1,7,7

 输出:

(3) 会话窗口 

// 3、会话窗口,会话超时时间为 10s

WindowedStream sensorWs = sensorKS

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

// 等待10s

s1,4,4

s1,5,5

s1,6,6

// 等待十秒

s1,7,7

输出:

 (4) 动态会话窗口

// 4、动态会话窗口,可以动态指定会话超时时间

WindowedStream sensorWs = sensorKS.window(ProcessingTimeSessionWindows.withDynamicGap(

new SessionWindowTimeGapExtractor() {

@Override

public long extract(WaterSensor element) {

// 根据数据中的属性自定义指定会话超时间,会话单位是毫秒

// 以数据中的 vc * 1000 毫秒为会话超时间

return element.getVc() * 1000;

}

}

));

1.2.4 两种计数窗口的演示

(1) 滚动窗口

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

KeyedStream sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 计数窗口

// 滚动窗口:每3条为一个窗口

WindowedStream sensorWs = sensorKS.countWindow(3);

SingleOutputStreamOperator process = sensorWs.process(new ProcessWindowFunction() {

@Override

public void process(String s, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {

out.collect("窗口包含==>" + elements.toString() + "==>" + elements.spliterator().estimateSize() + "条数据");

}

});

process.print();

env.execute();

}

 输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3,

s1,4,4

s1,5,5

s1,6,6

输出:

(2) 滑动窗口

// 滑动窗口:窗口长度为3,滑动步长为1

WindowedStream sensorWs = sensorKS

.countWindow(3,1);

输入:

[root@VM-55-24-centos ~]# nc -lk 8877

s1,1,1

s1,2,2

s1,3,3

s1,4,4

输出:

1.3 窗口API概览

(1) 按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先要确定数据流有没有进行 KeyBy 操作。

(1.1) 非按键分区(Non-Keyed Windows)

如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。

基于DataStream调用.windowAll()定义窗口

stream.windowAll(...)

(1.2) 按键分区窗口(Keyed Windows)

经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。

例如有 Key分别为红、黄、蓝的三种数据,需要按照时间分桶,则在1点~2点之间,红、黄、蓝会各自单独创建一个桶,桶与桶之间互不干扰,到下一个时间点,则会各自创建对应的桶。

需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。

stream.keyBy(...).window(...)

(2) 代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy()

.window()

.aggregate()

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型;而后面的.aggregate()方法传入一个窗口函数作为参数,它用来定义窗口具体的处理逻辑。窗口分配器有各种形式,而窗口函数的调用方法也不只.aggregate()一种

1.4 窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。

窗口按照驱动类型可以分成时间窗口和计数窗口,而按照具体的分配规则,又有滚动窗口、滑动窗口、会话窗口、全局窗口四种。除去需要自定义的全局窗口外,其他常用的类型Flink中都给出了内置的分配器实现,我们可以方便地调用实现各种需求。

窗口分配器就是根据是否进行了 KeyBy 操作,直接调用 window() / windowAll()。

... sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 1.1 基于时间的窗口

// 滚动窗口,窗口长度为10s,每10s的数据在一个窗口内

sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

// 滑动窗口,窗口长度为10s,滑动步长为2s(窗口重叠2s)

sensorKS.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)));

// 会话窗口,会话间隔为10s

sensorKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));

// 1.2 基于计数的窗口

// 滚动窗口,窗口长度为10个元素(每10个元素在一个窗口内)

sensorKS.countWindow(10);

// 滑动窗口,窗口长度为10个元素,滑动步长为2个元素

sensorKS.countWindow(10,2);

// 全局窗口,计数窗口的底层实现,自定义窗口时使用

sensorKS.window(GlobalWindows.create());

1.5 窗口函数

第一步用窗口分配器将数据收集在窗口中后,则需要定义窗口函数对窗口收集的数据进行计算操作。

窗口函数根据处理的方式可以分为两类:增量聚合函数和全窗口函数。

1.5.1 增量聚合函数(ReduceFunction / AggregateFunction)

窗口将数据收集起来,最基本的处理操作就是进行聚合。每来一条数据,就在之前的结果上聚合一次,这就是“增量聚合”。

典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

(1) 规约函数(ReduceFunction)

需求案例:读取 Socket 的水位数据,计算每30s中的VC 累加和 , 并在窗口触发时输出结果。

/**

* 窗口函数:增量聚合 Reduce

*/

public class WindowReduceDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

KeyedStream sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 1、指定 窗口分配器 使用滚动窗口,窗口长度为30s,每30s的数据在一个窗口内

WindowedStream sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));

// 2、窗口函数: 增量聚合 Reduce

/**

* 窗口函数的 Reduce :

* 1、相同的 Key 的第一条数据来的时候,不会调用 Reduce 方法

* 2、增量聚合的意思是:数据来一条就基于上次的结果计算一次,但不会输出

* 3、在窗口触发(结束)时,才会输出窗口的最终计算结果

*/

SingleOutputStreamOperator reduce = sensorWs.reduce(new ReduceFunction() {

@Override

public WaterSensor reduce(WaterSensor v1, WaterSensor v2) throws Exception {

System.out.println("调用 Reduce 方法:上一条数据:" + v1 + "-----当前数据:" + v2);

return new WaterSensor(v1.getId(), v1.getTs(), v1.getVc() + v2.getVc());

}

});

// 输出窗口计算结果

reduce.print();

env.execute();

}

}

输入:

/**

* 30s内输入完成

*

* 因为使用的KeyBy,相同Key才会被分配到一个窗口中,所以id要一致

*

*/

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,5,5

s1,6,6

结果:

调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=1}-----当前数据:WaterSensor{id='s1', ts=2, vc=2}

调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=3}-----当前数据:WaterSensor{id='s1', ts=3, vc=3}

调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=6}-----当前数据:WaterSensor{id='s1', ts=4, vc=4}

调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=10}-----当前数据:WaterSensor{id='s1', ts=5, vc=5}

调用 Reduce 方法:上一条数据:WaterSensor{id='s1', ts=1, vc=15}-----当前数据:WaterSensor{id='s1', ts=6, vc=6}

WaterSensor{id='s1', ts=1, vc=21}

Reduce小结:

相同的 Key 的第一条数据来的时候,不会调用 Reduce 方法 增量聚合的意思是:数据来一条就基于上次的结果计算一次,但不会输出 在窗口触发(结束)时,才会输出窗口的最终计算结果

(2) 聚合函数(AggregateFunction)

ReduceFunction可以解决大多数归约聚合的问题,而AggregateFunction可以看作是ReduceFunction的通用版本,所以AggregateFunction使用更为灵活,在ReduceFunction中,输入类型、中间状态存储类型、输出类型都必要保持类型一致,而AggregateFunction有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT),三种类型都可以不同。

与上面需求一致:

/**

* 窗口函数:增量聚合 Aggregate

*/

public class WindowAggregateDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

KeyedStream sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 1、指定 窗口分配器 使用滚动窗口,窗口长度为30s,每30s的数据在一个窗口内

WindowedStream sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));

// 2、窗口函数: 增量聚合 Aggregate

/**

* 1、本窗口的第一条数据达到时,创建窗口、初始化累加器

* 2、增量聚合:数据来一条计算一次(调用add方法)

* 3、窗口输出时调用一次getResult方法

* 4、输入类型、累加器、输出类型 三者可以类型不一致

*/

SingleOutputStreamOperator aggregate = sensorWs.aggregate(new AggregateFunction() { // 输入类型, 累加器类型(存储中间计算值), 输出类型

/**

* 初始化累加器

*/

@Override

public Integer createAccumulator() {

System.out.println("初始化累加器");

return 0;

}

/**

* 具体的聚合逻辑

*/

@Override

public Integer add(WaterSensor waterSensor, Integer integer) {

System.out.println("调用add方法 当前数据:" + waterSensor);

return integer + waterSensor.getVc();

}

/**

* 窗口触发时获取最终计算结果并输出

*/

@Override

public String getResult(Integer integer) {

System.out.println("调用getResult方法");

return integer.toString();

}

@Override

public Integer merge(Integer integer, Integer acc1) {

// 只有会话窗口才会调用

System.out.println("调用merge方法");

return null;

}

});

// 输出窗口计算结果

aggregate.print();

env.execute();

}

}

输入:

/**

* 30s内输入完成

*/

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,5,5

s1,6,6

s1,7,7

输出结果:

初始化累加器

调用add方法 当前数据:WaterSensor{id='s1', ts=1, vc=1}

调用add方法 当前数据:WaterSensor{id='s1', ts=2, vc=2}

调用add方法 当前数据:WaterSensor{id='s1', ts=3, vc=3}

调用add方法 当前数据:WaterSensor{id='s1', ts=4, vc=4}

调用add方法 当前数据:WaterSensor{id='s1', ts=5, vc=5}

调用add方法 当前数据:WaterSensor{id='s1', ts=6, vc=6}

调用add方法 当前数据:WaterSensor{id='s1', ts=7, vc=7}

调用getResult方法

28

Aggregate 小结:

1.三个需实现的接口,一个会话窗口需实现的接口      

createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。

merge():合并两个累加器,并将合并后的状态作为一个累加器返回。(会话窗口使用)

getResult():从累加器中提取聚合的输出结果。

add():将输入的元素添加到累加器中。

2.本窗口的第一条数据达到时,创建窗口、初始化累加器

3.增量聚合:数据来一条计算一次(调用add方法)

4.窗口输出时调用一次getResult方法

5.输入类型、累加器、输出类型 三者可以类型不一致

1.5.2 全窗口函数(full window functions)

全窗口函数与增量聚合函数不同,增量聚合函数是数据来一条处理一条,而全窗口函数是将数据全部收集起来,等到窗口触发时才统一计算。并且全窗口函数的可以获取更多的信息,例如窗口的上下文信息(比如窗口的结束时间)。

在Flink中,全窗口函数也有两种:WindowFunction(不推荐)和ProcessWindowFunction。

(1) 窗口函数(WindowFunction)

WindowFunction 是老版本的通用窗口函数接口,但是没有提供更多的信息,也没有提供高级的功能,所以不推荐使用,可以被ProcessWindowFunction全覆盖。

stream

.keyBy()

.window()

.apply(new MyWindowFunction());

(2)  处理窗口函数(ProcessWindowFunction)

ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。

/**

* 窗口函数:全窗口函数 Process

*/

public class WindowProcessDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

KeyedStream sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 1、指定 窗口分配器 使用滚动窗口,窗口长度为10s,每10s的数据在一个窗口内

WindowedStream sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));

SingleOutputStreamOperator process = sensorWs.process(new ProcessWindowFunction() {

/**

*

* @param s The key for which this window is evaluated. 该窗口的 Key

* @param context The context in which the window is being evaluated. 窗口上下文

* @param elements The elements in the window being evaluated. 窗口中所有的数据

* @param out A collector for emitting elements. 采集器

* @throws Exception

*/

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

});

process.print();

env.execute();

}

输入:

[root@VM-55-27-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,5,5

输出:

窗口的开始时间:2023-11-18 11:13:30 000--窗口的结束时间:2023-11-18 11:13:00 000

--key 为s1的窗口数据包含

[

[WaterSensor{id='s1', ts=1, vc=1},

WaterSensor{id='s1', ts=2, vc=2},

WaterSensor{id='s1', ts=3, vc=3},

WaterSensor{id='s1', ts=4, vc=4},

WaterSensor{id='s1', ts=5, vc=5}]

]

5条数据

1.5.3 增量聚合和全窗口函数的结合使用

增量函数的优点是数据来一条处理一条,只存储中间计算值,所以占用的空间少。而全窗口需要储存窗口内的所有数据,最后再进行统一计算,但可以上下文获取到更多的窗口信息。在实际开发中,则可以结合这两者的优点。

在调用WindowedStream的.reduce()和.aggregate()方法时,不止可以传入一个ReduceFunction或AggregateFunction进行增量聚合,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

public class WindowAggregateAndProcessDemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

KeyedStream sensorKS = sensorDS.keyBy(WaterSensor::getId);

// 1、指定 窗口分配器 使用滚动窗口,窗口长度为10s,每10s的数据在一个窗口内

WindowedStream sensorWs = sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(30)));

/**

* 增量聚合函数 与 全窗口函数 一起使用

*/

SingleOutputStreamOperator aggregate = sensorWs.aggregate(

new MyAggregateFunc(),

new MyProcessFunc()

);

// 输出窗口计算结果

aggregate.print();

env.execute();

}

private static class MyAggregateFunc implements AggregateFunction{

@Override

public Integer createAccumulator() {

System.out.println("初始化累加器");

return 0;

}

@Override

public Integer add(WaterSensor waterSensor, Integer integer) {

System.out.println("调用add方法 当前数据:" + waterSensor);

return integer + waterSensor.getVc();

}

@Override

public String getResult(Integer integer) {

System.out.println("调用getResult方法");

return integer.toString();

}

@Override

public Integer merge(Integer integer, Integer acc1) {

System.out.println("调用merge方法");

return null;

}

}

private static class MyProcessFunc extends ProcessWindowFunction {

@Override

public void process(String s, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含" + elements + " " + count + "条数据");

}

}

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,5,5

输出:

初始化累加器

调用add方法 当前数据:WaterSensor{id='s1', ts=1, vc=1}

调用add方法 当前数据:WaterSensor{id='s1', ts=2, vc=2}

调用add方法 当前数据:WaterSensor{id='s1', ts=3, vc=3}

调用add方法 当前数据:WaterSensor{id='s1', ts=4, vc=4}

调用add方法 当前数据:WaterSensor{id='s1', ts=5, vc=5}

调用getResult方法

窗口的开始时间:2023-11-18 11:51:30 000--窗口的结束时间:2023-11-18 11:52:00 000 --key 为s1的窗口数据包含[15] 1条数据

增量聚合和全窗口函数的结合使用的效果:增量聚合结束后将计算结果(只有一条)发给全窗口函数进行处理。

二.时间语义

2.1 Flink中的时间语义

数据在网络传输中会存在一定的延迟,也意味着数据从生产到Flink真正处理的时间也存在延迟。数据被生产的时刻则被称为“事件时间”,数据被Flink真正处理的时刻被称为“处理时间”,到底以哪一种时间作为衡量标准,就是所谓的“时间语义”。

在实际应用中,事件时间往往会被作为参数传递,例如MySQL数据表的 create_time 字段,或者是前端传来的时间戳。使用事件时间更能保证数据的准确性。

从 Flink 1.12 版本开始,事件时间为默认的时间语义。

三. 水位线(Watermark)

3.1事件时间和窗口

在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟代表的是数据的时间进展,而不会随着系统时间而自动流逝,而是靠新数据的时间戳来推动的,且只会向前推进。

这样的好处在于,在 Flink 的计算过程中可以完全不依赖系统时间,不论何时进行统计处理,都可以靠着逻辑时钟保证窗口计算的正确性。

3.2 什么是水位线

在Flink中,用来衡量事件时间进展的标记,就被称作“水位线”(Watermark)。

具体实现上,水位线其实就是一个时间戳,作为数据流的标记,用来指示当前的事件时间,当某个数据到来之后,就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

水位线是会被 Flink 存储的。

(1) 有序流中的水位线 

1、理想状态(数据量小):数据可以按照生成顺序进入流中,每条数据产生一个水位线。

2、实际应用中,如果当前数据量非常大,数据间的时间差非常小, 如果也按照每条数据产生一个水位线则会非常影响效率。所以为了提高效率,一般可以每隔一段时间生成一个水位线。        

(2) 乱序流中的水位线 

在分布式系统中,由于网络传输的不确定性,可能导致数据达到的时间并不是有序的,这就是“乱序数据”。

1、乱序流+数据量小

还是靠数据来驱动,来一条数据就提取其时间戳作为水位线插入,不过现在是乱序数据,在生成水位线前,需要先判断当前数据的时间戳是否大于之前的水位线,如果大于才生成新的水位线,否则就不生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

2、乱序流+数据量大

数据量大则可以周期性的生成水位线来提升效率。并且保存之前数据的最大时间戳,需要插入水位线时,可以将这个最大时间戳作为水位线插入。

3、乱序流+迟到数据

迟到数据指的是,例如有一个第9秒生产的数据,但是第11秒才到达Flink,那么则会落到[ 10 , 20 ) 的窗口中。

我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒。必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。

现在我们知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要 

3.3 水位线和窗口的工作原理

在 Flink 中,窗口其实并不是一个固定位置的框,而是理解为一个“桶”,在Flink中,窗口可以把流切割成有限大小的多个“存储桶”(对应窗口);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。

3.4 生成水位线

3.4.1 生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果希望处理更快、实时性更强,则可以将水位线延迟设置得低些,不过这样会导致很多迟到数据被窗口遗漏,计算结果不准确;如果计算结果的准确性有要求,则可以将水位线延迟设置得高些,这样会导致处理延迟增加。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

3.4.2 水位线生成策略

在 Flink 的 DataStream API 中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间。

DataStream stream = env.addSource(new DataSource());

DataStream withTimestampsAndWatermarks =

stream.assignTimestampsAndWatermarks();

说明:WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口,该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。

3.4.3 Flink内置水位线

(1) 有序流(时间戳单调递增)中内置水位线设置

对于有序流,主要特点就是时间戳单调增长,而不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

// ***定义 WaterMark 策略

WatermarkStrategy waterSensorWatermarkStrategy = WatermarkStrategy

// 单调递增的事件时间,没有延迟时间

.forMonotonousTimestamps()

// 指定 时间戳分配器,从数据中提取

.withTimestampAssigner(new SerializableTimestampAssigner() {

/**

*

* @param waterSensor 当前数据

* @param l

* @return

*/

@Override

public long extractTimestamp(WaterSensor waterSensor, long l) {

System.out.println("当前数据:" + waterSensor + " ==> l:" + l);

// 从数据中返回的时间戳(毫秒))

return waterSensor.getTs() * 1000L;

}

});

// ***指定 watermark策略

SingleOutputStreamOperator sensorWithWaterMark = sensorDS

.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

sensorWithWaterMark

.keyBy(WaterSensor::getId)

// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.process(new ProcessWindowFunction() {

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

}).print();

env.execute();

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,3,3

s1,4,4

s1,7,7

s1,9,9

s1,10,10

s1,15,15

s1,20,20

输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=3, vc=3} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=4, vc=4} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=7, vc=7} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=9, vc=9} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=3, vc=3}, WaterSensor{id='s1', ts=4, vc=4}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=9, vc=9}]] 6条数据

当前数据:WaterSensor{id='s1', ts=15, vc=15} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=20, vc=20} ==> l:-9223372036854775808

窗口的开始时间:1970-01-01 08:00:10 000--窗口的结束时间:1970-01-01 08:00:20 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=10, vc=10}, WaterSensor{id='s1', ts=15, vc=15}]] 2条数据

有序流的水位线设置非常简单,就是把数据中表示事件时间的属性返回。

(2) 乱序流中内置水位线设置

由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟时间的结果再-1。调用WatermarkStrategy. forBoundedOutOfOrderness()传入最大延迟时间。

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

// ***定义 WaterMark 策略

WatermarkStrategy waterSensorWatermarkStrategy = WatermarkStrategy

// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间)

.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s

// 指定 时间戳分配器,从数据中提取

.withTimestampAssigner(new SerializableTimestampAssigner() {

/**

*

* @param waterSensor 当前数据

* @param l

* @return

*/

@Override

public long extractTimestamp(WaterSensor waterSensor, long l) {

System.out.println("当前数据:" + waterSensor + " ==> l:" + l);

// 返回的时间戳(毫秒))

return waterSensor.getTs() * 1000L;

}

});

// ***指定 watermark策略

SingleOutputStreamOperator sensorWithWaterMark = sensorDS

.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

sensorWithWaterMark

.keyBy(WaterSensor::getId)

// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.process(new ProcessWindowFunction() {

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

}).print();

env.execute();

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1 // 水位线 = -2

s1,2,2 // 水位线 = -1

s1,6,6 // 水位线 = 3

s1,8,8 // 水位线 = 5

s1,5,5 // 水位线 = 2

s1,9,9 // 水位线 = 6

s1,10,10 // 水位线 = 7

s1,7,7 // 水位线 = 4

s1,12,12 // 水位线 = 9

s1,3,3 // 水位线 = 0

s1,13,13 // 水位线 = 10 窗口触发关闭

输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=6, vc=6} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=8, vc=8} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=9, vc=9} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=7, vc=7} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=12, vc=12} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=3, vc=3} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=6, vc=6}, WaterSensor{id='s1', ts=8, vc=8}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=9, vc=9}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=3, vc=3}]] 8条数据

(3) 内置水位线生成原理

都是周期性生成水位线的,默认200ms有序流水位线生成:当前最大事件时间 - 1ms乱序流水位线生成:当前最大事件时间 - 最大等待时间 - 1ms有序流就是一种特殊的乱序流,最大等待时间为0

3.4.4 自定义水位线生成器 

(1) 周期性水位线生成器(Periodic Generator)

需要自定义周期性水位线生成器则可以实现WatermarkGenerator中的onEvent 记录事件时间、onPeriodicEmit 周期性生成水位线。

public class MyPeriodWatermarkGenerator implements WatermarkGenerator {

private Long maxTs; // 当前最大时间戳

private Long delayTime; // 等待时间

public MyPeriodWatermarkGenerator(Long delayTime) {

this.delayTime = delayTime;

maxTs = Long.MIN_VALUE - this.delayTime - 1 ;

}

/**

* 每条数据来都会调用一次该方法。主要用于记录、更新当前最大的时间戳

* @param t

* @param l

* @param watermarkOutput

*/

@Override

public void onEvent(T t, long l, WatermarkOutput watermarkOutput) {

maxTs = Math.max(maxTs , l);

}

/**

* 周期性调用,主要用于周期性生成 Watermark

* @param watermarkOutput

*/

@Override

public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

watermarkOutput.emitWatermark(new Watermark(maxTs));

}

}

在选择水位线生成器时,则可以使用这个自定义水位线生成器。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SingleOutputStreamOperator sensorDS = env

.addSource(new DataSource())

.map(new MyMapFunctionImpl());

WatermarkStrategy waterSensorWatermarkStrategy = WatermarkStrategy

.forGenerator(new WatermarkStrategy() {

@Override

public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

// 使用自定义水位生成器并设置等待时间

return new MyPeriodWatermarkGenerator<>(3000l);

}

})

.withTimestampAssigner(new SerializableTimestampAssigner() {

// 事件时间提取器

@Override

public long extractTimestamp(WaterSensor waterSensor, long l) {

return waterSensor.getTs() * 1000L;

}

});

(2) 断点式水位线生成器(Punctuated Generator)

断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的事件时,就立即生成水位线。我们把生成水位线的逻辑写在onEvent方法当中即可。

/**

* 断点式水位线

*/

public class MyPuntuatedWatermarkGenerator implements WatermarkGenerator {

private Long maxTs; // 当前最大时间戳

private Long delayTime; // 等待时间

public MyPuntuatedWatermarkGenerator(Long delayTime) {

this.delayTime = delayTime;

maxTs = Long.MIN_VALUE - this.delayTime - 1 ;

}

/**

* 每条数据来都会调用一次该方法。主要用于记录、更新当前最大的时间戳,并且立即更新当前水位线

* @param t

* @param l

* @param watermarkOutput

*/

@Override

public void onEvent(T t, long l, WatermarkOutput watermarkOutput) {

maxTs = Math.max(maxTs , l);

watermarkOutput.emitWatermark(new Watermark(maxTs));

}

/**

* 周期性调用,断点式不需要

* @param watermarkOutput

*/

@Override

public void onPeriodicEmit(WatermarkOutput watermarkOutput) {

watermarkOutput.emitWatermark(new Watermark(maxTs));

}

}

(3) 在数据源中生成水位线

我们可以直接在自定义的数据源中抽取事件时间,然后生成水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FileSource fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("input/words.txt")).build();

// 在数据源中发送水位线 乱序流-延迟3s

env.fromSource(fileSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),"file").print();

env.execute();

}

3.5  水位线的传递

水位线并不能代表整个程序的处理进度,而是只能代表某个子任务的处理进度,因为水位线会随着数据往下游传递,也就是说不同节点的处理进度是不一样的。

在多并行度下的流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时,会以最小的那个作为当前任务的事件时钟,向下游传递。

例子:并行度为2,算子链为:source -> map -> 水位线乱序 延迟3s ->  时间滚动窗口 大小为10

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(2);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

// ***定义 WaterMark 策略

WatermarkStrategy waterSensorWatermarkStrategy = WatermarkStrategy

// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间)

.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s

// 指定 时间戳分配器,从数据中提取

.withTimestampAssigner(new SerializableTimestampAssigner() {

/**

*

* @param waterSensor 当前数据

* @param l

* @return

*/

@Override

public long extractTimestamp(WaterSensor waterSensor, long l) {

System.out.println("当前数据:" + waterSensor + " ==> l:" + l);

// 返回的时间戳(毫秒))

return waterSensor.getTs() * 1000L;

}

});

// ***指定 watermark策略

SingleOutputStreamOperator sensorWithWaterMark = sensorDS

.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

sensorWithWaterMark

.keyBy(WaterSensor::getId)

// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.process(new ProcessWindowFunction() {

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

}).print();

env.execute();

}

 输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,2,2

s1,5,5

s1,7,7

s1,9,9

s1,10,10

s1,11,11

s1,12,12

s1,13,13

s1,14,14

输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=7, vc=7} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=9, vc=9} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=11, vc=11} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=12, vc=12} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=14, vc=14} ==> l:-9223372036854775808

2> 窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=7, vc=7}, WaterSensor{id='s1', ts=9, vc=9}]] 5条数据

可以看到,在并行度为2下,水位线为10 (13-3) 时,并没有触发窗口输出,这就是因为在多并行度下,一个任务接收到多个上游并行任务传递来的水位线时,会以最小的那个作为当前任务的事件时钟,向下游传递。

分析:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1 // 水位线:-2

s1,2,2 // 水位线: -1,多并行度下,向下传递最小的水位线 -2

s1,5,5 // 水位线: 2, 多并行度下,向下传递最小的水位线 -1

s1,7,7 // 水位线: 4, 多并行度下,向下传递最小的水位线 2

s1,10,10 // 水位线: 7, 多并行度下,向下传递最小的水位线 4

s1,11,11 // 水位线: 8, 多并行度下,向下传递最小的水位线 7

s1,12,12 // 水位线: 9, 多并行度下,向下传递最小的水位线 8

s1,13,13 // 水位线: 10,多并行度下,向下传递最小的水位线 9

s1,14,14 // 水位线: 11,多并行度下,向下传递最小的水位线 10 触发[ 0 , 10 )窗口输出

3.5.1 水位线的空闲等待

在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待。

例子:将数据以奇偶的规则放在不同的 KeyBy 分区,观察水位线推进

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// !并行度为2

env.setParallelism(2);

SingleOutputStreamOperator socketDs = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

// 重分区:当前数据 % 下游算子并行度

.partitionCustom(new MyPartitioner(), r -> r)

// 将输入的字符串转为整型

.map(r -> Integer.parseInt(r))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.forMonotonousTimestamps() // 单调递增的事件时间

.withTimestampAssigner((r, ts) -> r * 1000l) // 水位线提取器,将数据*1000作为水位线

);

socketDs.keyBy( r -> r % 2) // 将数据奇偶划分,在两个不同的分区

.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 开窗:十秒的滑动窗口

.process(new ProcessWindowFunction() {

@Override

public void process(Integer integer, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + integer + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

}).print();

env.execute();

输入奇数:

[root@VM-55-24-centos ~]# nc -lk 1234

3

5

7

9

11

13

 结果:控制台并无任何输出,也就意味着 [ 0 , 10 )窗口并没有被触发。

这是因为在多个上游并行任务中,当前task会以最小的那个作为当前任务的事件时钟,而将数据分为奇偶,则奇数在一个分区,偶数在一个分区,只输入奇数,那么另一个分区为空,就会导致当前水位线一直是 Long.MIN_VALUE ,从而无法正常推进水位线。

解决这个问题则可以通过设置空闲窗口时间withIdleness:

WatermarkStrategy

.>forBoundedOutOfOrderness(Duration.ofSeconds(20))

// 设置空闲窗口时间为3s 窗口空闲3s则将空窗口的水位线标记为闲置

.withIdleness(Duration.ofSeconds(3));

官方介绍:

There are two places in Flink applications where a WatermarkStrategy can be used: 1) directly on sources and 2) after non-source operation.

The first option is preferable, because it allows sources to exploit knowledge about shards/partitions/splits in the watermarking logic. Sources can usually then track watermarks at a finer level and the overall watermark produced by a source will be more accurate. Specifying a WatermarkStrategy directly on the source usually means you have to use a source specific interface/ Refer to Watermark Strategies and the Kafka Connector for how this works on a Kafka Connector and for more details about how per-partition watermarking works there.

The second option (setting a WatermarkStrategy after arbitrary operations) should only be used if you cannot set a strategy directly on the source:

如果其中一个输入分割/分区/碎片有一段时间不携带事件,这意味着水印生成器也不能获得任何新的信息来为水印做基础。我们称之为空闲输入或空闲源。这是一个问题,因为您的一些分区可能仍然带有事件。在这种情况下,水印将被保留,因为它被计算为所有不同的平行水印的最小值。

为了解决这个问题,你可以使用一个 WatermarkStrategy 来检测闲置状态并将输入标记为闲置。为此,WatermarkStrategy 提供了一个方便的帮助器:

WatermarkStrategy .>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withIdleness(Duration.ofMinutes(1));

3.6 迟到数据的处理

迟到数据与乱序不同:乱序指的是数据到达Flink时的事件时间并不一定是顺序的;而迟到数据指的是当前数据的事件时间小于当前水位线,例如上一个窗口已经关闭了,属于上一个窗口的数据才达到。

3.6.1 推迟水位线推进

在水位线产生时,设置一个乱序容忍度(延迟时间),推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多的时间进入窗口。

// 水位线生成延时10s

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));

3.6.2 设置窗口延迟关闭

 窗口的生命周期是:触发计算 -> 销毁/关闭窗口。设置窗口延迟时间其实就是延长窗口的销毁/关闭时间。

设置窗口延迟时间也就是允许数据迟到。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭。

未设置窗口延迟关闭:

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

// ***定义 WaterMark 策略

WatermarkStrategy waterSensorWatermarkStrategy = WatermarkStrategy

// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间)

.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s

// 指定 时间戳分配器,从数据中提取

.withTimestampAssigner(new SerializableTimestampAssigner() {

/**

*

* @param waterSensor 当前数据

* @param l

* @return

*/

@Override

public long extractTimestamp(WaterSensor waterSensor, long l) {

System.out.println("当前数据:" + waterSensor + " ==> l:" + l);

// 返回的时间戳(毫秒))

return waterSensor.getTs() * 1000L;

}

});

// ***指定 watermark策略

SingleOutputStreamOperator sensorWithWaterMark = sensorDS

.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

sensorWithWaterMark

.keyBy(WaterSensor::getId)

// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.process(new ProcessWindowFunction() {

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

}).print();

env.execute();

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,5,5

s1,10,10

s1,12,12

s1,13,13

s1,3,3

 输出:

当前数据:WaterSensor{id='s1', ts=1, vc=1} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=10, vc=10} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=12, vc=12} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=5, vc=5}]] 2条数据

当前数据:WaterSensor{id='s1', ts=3, vc=3} ==> l:-9223372036854775808

可以看到,当 s1,13,13 到来时,触发了 [ 0 , 10 ) 的窗口关闭,随后来的 s1,3,3 并不会再被 [ 0 , 10 ) 窗口计算。

设置窗口延迟关闭:

sensorWithWaterMark

.keyBy(WaterSensor::getId)

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.allowedLateness(Time.seconds(3)) // 允许窗口延迟3s关闭

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,2,2

s1,8,8

s1,13,13

s1,14,14

s1,5,5

输出:

当前数据:WaterSensor{id='s1', ts=2, vc=2} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=8, vc=8} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=13, vc=13} ==> l:-9223372036854775808

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=8, vc=8}]] 2条数据

当前数据:WaterSensor{id='s1', ts=14, vc=14} ==> l:-9223372036854775808

当前数据:WaterSensor{id='s1', ts=5, vc=5} ==> l:-9223372036854775808

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=2, vc=2}, WaterSensor{id='s1', ts=8, vc=8}, WaterSensor{id='s1', ts=5, vc=5}]] 3条数据

1.窗口允许迟到,则在关窗前,每一条迟到的数据达到,都会被窗口触发计算输出。

2.窗口真正被关闭后,迟到数据则无法再进入窗口。

3.6.3 使用侧流接收迟到的数据

利用之前的推迟水位线推进或设置窗口延迟关闭的方法,对于真正关窗的迟到数据都无法进行处理,Flink 提供了 sideOutputLateData() 将关窗后的迟到数据放入侧输出流。

.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .allowedLateness(Time.seconds(3)) .sideOutputLateData(lateWS)

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator sensorDS = env

.socketTextStream("xxx.xxx.xxx.xxx", 1234)

.map(new MyMapFunctionImpl());

// ***定义 WaterMark 策略

WatermarkStrategy waterSensorWatermarkStrategy = WatermarkStrategy

// 乱序的事件时间,需设置最大等待时间(当前窗口水位线 = 当前窗口最大事件时间 - 等待时间)

.forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s

// 指定 时间戳分配器,从数据中提取

.withTimestampAssigner(new SerializableTimestampAssigner() {

/**

*

* @param waterSensor 当前数据

* @param l

* @return

*/

@Override

public long extractTimestamp(WaterSensor waterSensor, long l) {

// 返回的时间戳(毫秒))

return waterSensor.getTs() * 1000L;

}

});

// ***指定 watermark策略

SingleOutputStreamOperator sensorWithWaterMark = sensorDS

.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

// ***指定侧输出流存放关窗后的迟到数据

OutputTag outputTag = new OutputTag("late-data", Types.POJO(WaterSensor.class));

SingleOutputStreamOperator process = sensorWithWaterMark

.keyBy(WaterSensor::getId)

// ***使用事件时间的窗口,而非处理时间的窗口 10s的滚动窗口

.window(TumblingEventTimeWindows.of(Time.seconds(10)))

.allowedLateness(Time.seconds(3)) // 允许窗口延迟3s关闭

.sideOutputLateData(outputTag)

.process(new ProcessWindowFunction() {

@Override

public void process(String s,

ProcessWindowFunction.Context context,

Iterable elements,

Collector out) throws Exception {

long start = context.window().getStart();

long end = context.window().getEnd();

String windowStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss SSS");

String windowEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss SSS");

long count = elements.spliterator().estimateSize();

out.collect("窗口的开始时间:" + windowStart + "--窗口的结束时间:" + windowEnd + " --key 为" + s + "的窗口数据包含[" + elements.toString() + "] " + count + "条数据");

}

});

// 获取侧输出流

process.getSideOutput(outputTag).printToErr("测输出流中的迟到数据" + outputTag.getTypeInfo());

process.print();

env.execute();

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234

s1,1,1

s1,5,5

s1,10,10

s1,9,9

s1,13,13

s1,15,15

s1,16,16

s1,8,8

s1.12.12

 输出:

窗口的开始时间:1970-01-01 08:00:00 000--窗口的结束时间:1970-01-01 08:00:10 000 --key 为s1的窗口数据包含[[WaterSensor{id='s1', ts=1, vc=1}, WaterSensor{id='s1', ts=5, vc=5}, WaterSensor{id='s1', ts=9, vc=9}]] 3条数据

·测输出流中的迟到数据PojoType> WaterSensor{id='s1', ts=8, vc=8}

·测输出流中的迟到数据PojoType> WaterSensor{id='s1', ts=12, vc=12}

可以看到,在输入 s1,16,16 时已经关闭了 [ 0 , 10 ) 的窗口,后面迟到的数据放入了侧输出流。 

乱序、迟到的数据处理设置经验:

watermark等待时间不宜设置过大,一般是秒级别,在乱序和延迟之间取舍。设置一定的窗口允许迟到,只考虑大部分迟到数据,不考虑极端小部分的迟到数据。极端小部分迟到数据,放入侧输出流,获取后做处理。

三.基于时间的合流——双流联结(Join)

可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。

不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。

3.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中匹配公共键(key)的数据放在窗口中进行配对处理。

3.1.1 窗口联结的调用

用法:

stream1.join(stream2)

.where() // 指定流1要连接的Key

.equalTo() // 指定流2要连接的Key

.window() // 两条流一起开窗

.apply() // 处理逻辑

例子:两条流,匹配出同一时间内的Key相同的数据。

/**

* Window Join

*/

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env =

StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 数据流 1

SingleOutputStreamOperator> ds1 = env.fromElements(

new Tuple2<>("a", 1),

new Tuple2<>("a", 2),

new Tuple2<>("b", 3),

new Tuple2<>("b", 4),

new Tuple2<>("c", 4)

).assignTimestampsAndWatermarks(

WatermarkStrategy

.>forMonotonousTimestamps() // 单调递增的事件时间

.withTimestampAssigner((v, ts) -> v.f1 * 1000)); // 水位线提取器

// 数据流 1

SingleOutputStreamOperator> ds2 = env.fromElements(

new Tuple3<>("a", 3 , 11),

new Tuple3<>("a", 11 , 22),

new Tuple3<>("b", 6 , 3),

new Tuple3<>("b", 9 , 13),

new Tuple3<>("c", 10 , 12)

).assignTimestampsAndWatermarks(

WatermarkStrategy

.>forMonotonousTimestamps() // 单调递增的事件时间

.withTimestampAssigner((v, ts) -> v.f1 * 1000)); // 水位线提取器

/**

* Window Join

* 1、落在同一时间窗口内的数据才能匹配

* 2、根据数据中的某个Key进行匹配

* 3、只能获取匹配成功的数据

* 4、类似 Inner Join

*/

DataStream join = ds1 // 第一条流

.join(ds2) // join 第二条流

.where(r1 -> r1.f0) // 第一条流中要匹配的key

.equalTo(r2 -> r2.f0) // 第一条流中要匹配的key

.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 十秒滚动窗口

.apply(new JoinFunction, Tuple3, String>() {

@Override

public String join(Tuple2 v1, Tuple3 v2) throws Exception {

return v1 + "<===匹配===>" + v2;

}

});

env.execute();

}

结果:

(a,1)<===匹配===>(a,3,11)

(a,2)<===匹配===>(a,3,11)

(b,3)<===匹配===>(b,6,3)

(b,3)<===匹配===>(b,9,13)

(b,4)<===匹配===>(b,6,3)

(b,4)<===匹配===>(b,9,13)

 只有Key匹配,且落在同一时间窗口的数据才能匹配。

3.1.2 间隔联结(Interval Join)

在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理,因为数据很可能卡在窗口的两侧边缘,例如 a,5 与 a,11 ,虽然Key相匹配,但是a,11属于 [ 10 , 20 ] 的窗口,则无法匹配,显然基于时间的窗口联合无能为力。

为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

下方的流a去间隔联结上方的流b,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 数据流 1

SingleOutputStreamOperator> ds1 = env

.socketTextStream("xxx.xxx.xxx", 1234)

.map(new MapFunction>() {

@Override

public Tuple2 map(String s) throws Exception {

String[] data = s.split(",");

return Tuple2.of(data[0],Integer.valueOf(data[1]));

}

})

.assignTimestampsAndWatermarks(

WatermarkStrategy

.>forMonotonousTimestamps() // 单调递增的事件时间

.withTimestampAssigner((v, ts) -> v.f1 * 1000)

); ;

// 数据流 2

SingleOutputStreamOperator> ds2 = env

.socketTextStream("xxx.xxx.xxx", 4321)

.map(new MapFunction>() {

@Override

public Tuple3 map(String s) throws Exception {

String[] data = s.split(",");

return Tuple3.of(data[0],Integer.valueOf(data[1]),Integer.valueOf(data[2]));

}

})

.assignTimestampsAndWatermarks(

WatermarkStrategy

.>forMonotonousTimestamps() // 单调递增的事件时间

.withTimestampAssigner((v, ts) -> v.f1 * 1000)

); // 水位线提取器;

// 两条流分别KeyBy,Key就是关联条件

KeyedStream, String> ks1 = ds1.keyBy(k1 -> k1.f0);

KeyedStream, String> ks2 = ds2.keyBy(k2 -> k2.f0);

// 定义两个侧输出流存放左右流的迟到数据

OutputTag> leftLateTag = new OutputTag<>("left-late", Types.TUPLE(Types.STRING, Types.INT));

OutputTag> rightLateTag = new OutputTag<>("right-late", Types.TUPLE(Types.STRING, Types.INT,Types.INT));

// 调用 Interval join 间隔联合

SingleOutputStreamOperator process = ks1.intervalJoin(ks2)

.between(Time.seconds(-2), Time.seconds(2)) // 上界偏移-2s,下界偏移2s

.sideOutputLeftLateData(leftLateTag) // 第一条流的迟到数据放入侧输出流

.sideOutputRightLateData(rightLateTag) // 第二条流的迟到数据放入侧输出流

.process(new ProcessJoinFunction, Tuple3, String>() {

/**

*

* @param left join左边的流

* @param right join右边的流

* @param ctx 上下文信息

* @param out 采集器

* @throws Exception

*/

@Override

public void processElement(Tuple2 left, Tuple3 right, ProcessJoinFunction, Tuple3, String>.Context ctx, Collector out) throws Exception {

out.collect(left + "<---匹配--->" + right);

}

});

process.getSideOutput(leftLateTag).printToErr("左流迟到数据");

process.getSideOutput(rightLateTag).printToErr("右流迟到数据");

process.print();

env.execute();

}

输入:

[root@VM-12-13-centos ~]# nc -lk 1234

1> a,6

3> a,10

[root@VM-12-13-centos ~]# nc -lk 4321

2> a,5,5

4> a,12,12

5> a,4,4

输出:

(a,6)<---匹配--->(a,5,5)

(a,10)<---匹配--->(a,12,12)

右流迟到数据> (a,4,4)

Interval join 1、只支持事件时间 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后 3、process中,只能处理匹配上的数据 4、两条流关联后的watermark,以两条流中最小的为准 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理         => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流

参考阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。