Flink学习笔记

前言:今天是学习 flink 的第 19 天啦!学习了 flinkSQL 中窗口的应用,包括滚动窗口,滑动窗口,会话窗口,累计窗口,学会了如何计算累计值(类似于中视频计划中的累计播放量业务需求),多维数据分析等大数据热点问题,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!

Tips:"分享是快乐的源泉,在我的博客里,不仅有知识的海洋,还有满满的正能量加持,快来和我一起分享这份快乐吧!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"

文章目录

Flink学习笔记六、FlinkSQL 窗口1. 窗口表值函数(tvfs)2. 窗口分类函数及聚合操作2.1 滚动窗口(Tumble Windows)2.2 滑动窗口(Hop Windows)2.3 会话窗口(Session Windows,暂不支持 Window TVF)2.4 累计窗口(Comulate Windows flink1.13 版本新特性)

3. 多维数据分析3.1 GROUPING SETS3.2 ROLLUP3.3 CUBE3.4 GROUPING 和 GROUPING_ID3.4.1 GROUPING 函数3.4.2 GROUPING_ID(兼容 Hive)

3.5 Window Top-N

4. Over Windows4.1 ROWS OVER WINDOW4.2 RANGE OVER WINDOW

5. TableAPI 窗口的定义5.1.1 滚动窗口5.1.2 滑动窗口5.1.3 会话窗口

六、FlinkSQL 窗口

1. 窗口表值函数(tvfs)

将流变成特殊的“批”处理,常用的窗口:

滑动窗口滚动窗口会话窗口(flink 1.14 版本支持)累计窗口(flink 1.13 版本新增)

在 flink 1.13 之前,是一个特殊的 GroupWindowFunction

SELECT

TUMBLE_START( bidtime, INTERVAL '10' MINUTE),

TUMBLE_END( bidtime, INTERVAL '10' MINUTE),

TUMBLE_ROWTIME( bidtime, INTERVAL '10' MINUTE),

SUM(price)

FROM MyTable

GROUP BY TUMBLE( bidtime, INTERVAL '10' MINUTE),

在 flink 1.13 之后,用 Table-Value Function 进行语法标准化

SELECT window_start, window_end, window_time, SUM(price)

FROM TABLE(

TUMBLE(TABLE MyTable, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)

)

GROUP BY window_start, window_end;

2. 窗口分类函数及聚合操作

2.1 滚动窗口(Tumble Windows)

语法:

TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

data:一个表名。

timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻转窗口。

size:是指定滚动窗口宽度的持续时间。

数据:

2021-04-15 08:05:00,4.00,C

2021-04-15 08:07:00,2.00,A

2021-04-15 08:09:00,5.00,D

2021-04-15 08:11:00,3.00,B

2021-04-15 08:13:00,1.00,E

2021-04-15 08:17:00,6.00,F

需求:现在有一个实时数据看板,需要计算当前每10分钟GMV的总和

package cn.itcast.day02.Window;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

/**

* @author lql

* @time 2024-03-16 17:33:47

* @description TODO

*/

public class GroupWindowsSqlTumbleExample {

public static void main(String[] args) throws Exception {

//todo 1)构建flink流处理的运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//todo 2)设置并行度

env.setParallelism(1);

//todo 3)构建flink的表的运行环境

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

String filePath = GroupWindowsSqlTumbleExample.class.getClassLoader().getResource("bid.csv").getPath();

tabEnv.executeSql("create table Bid(" +

"bidtime TIMESTAMP(3)," +

"price DECIMAL(10, 2), " +

"item string," +

"watermark for bidtime as bidtime - interval '1' second) " +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'file:///"+filePath+"',"

+ "'format' = 'csv'"

+ ")");

Table table = tabEnv.sqlQuery("" +

"select window_start,window_end,sum(price) as sum_price " +

" from table(" +

" tumble(table Bid, DESCRIPTOR(bidtime), interval '10' MINUTES))" +

" group by window_start,window_end");

tabEnv.toAppendStream(table, Row.class).print();

env.execute();

}

}

结果:

+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00]

+I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]

2.2 滑动窗口(Hop Windows)

语法:

HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

data:是一个表名。

timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到滑动窗口。

slide:是一个持续时间,指定了连续跳跃窗口开始之间的持续时间

size:是指定跳变窗口宽度的持续时间

需求:每隔 5 分钟,统计 10 分钟的数据

package cn.itcast.day02.Window;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.Table;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import org.apache.flink.types.Row;

/**

* @author lql

* @time 2024-03-16 19:28:30

* @description TODO

*/

public class GroupWindowsSqlHopExample {

public static void main(String[] args) throws Exception {

//todo 1)构建flink流处理的运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//todo 2)设置并行度

env.setParallelism(1);

//todo 3)构建flink的表的运行环境

EnvironmentSettings settings = EnvironmentSettings.newInstance()

.useBlinkPlanner()

.inStreamingMode()

.build();

StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);

String filePath = GroupWindowsSqlHopExample.class.getClassLoader().getResource("bid.csv").getPath();

tabEnv.executeSql("create table Bid(" +

"bidtime TIMESTAMP(3)," +

"price DECIMAL(10, 2), " +

"item string," +

"watermark for bidtime as bidtime - interval '1' second) " +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'file:///"+filePath+"',"

+ "'format' = 'csv'"

+ ")");

Table table = tabEnv.sqlQuery("" +

"select window_start,window_end,sum(price) as sum_price " +

" from table(" +

" hop(table Bid, DESCRIPTOR(bidtime), interval '5' MINUTES, interval '10' MINUTES))" +

" group by window_start,window_end");

tabEnv.toAppendStream(table, Row.class).print();

env.execute();

}

}

结果:

+I[2021-04-15T08:00, 2021-04-15T08:10, 11.00]

+I[2021-04-15T08:05, 2021-04-15T08:15, 15.00]

+I[2021-04-15T08:10, 2021-04-15T08:20, 10.00]

+I[2021-04-15T08:15, 2021-04-15T08:25, 6.00]

2.3 会话窗口(Session Windows,暂不支持 Window TVF)

Flink1.13 版本中不支持 Window TVF,预计在 flink1.14 版本中支持;

需求:用老版本实现,定义 Session Gap 为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口

package cn.itcast.day02.Window;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

* @author lql

* @time 2024-03-16 19:37:20

* @description TODO

*/

public class GroupWindowsSqlSessionExample {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String filePath = GroupWindowsSqlSessionExample.class.getClassLoader().getResource("bid.csv").getPath();

// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t

tEnv.executeSql("create table Bid(" +

"bidtime TIMESTAMP(3)," +

"price DECIMAL(10, 2), " +

"item string," +

"watermark for bidtime as bidtime - interval '1' second) " +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'file:///"+filePath+"',"

+ "'format' = 'csv'"

+ ")");

tEnv.sqlQuery("SELECT " +

" SESSION_START(bidtime, INTERVAL '3' minute) as wStart, " +

" SESSION_END(bidtime, INTERVAL '3' minute) as wEnd, " +

" SUM(price) sum_price " +

"FROM Bid " +

"GROUP BY SESSION(bidtime, INTERVAL '3' minute)"

)

.execute()

.print();

}

}

结果:

+----+-------------------------+-------------------------+-----------+

| op | wStart | wEnd | sum_price |

+----+-------------------------+-------------------------+-----------+

| +I | 2021-04-15 08:05:00.000 | 2021-04-15 08:16:00.000 | 15.00 |

| +I | 2021-04-15 08:17:00.000 | 2021-04-15 08:20:00.000 | 6.00 |

+----+-------------------------+-------------------------+-----------+

2 rows in set

2.4 累计窗口(Comulate Windows flink1.13 版本新特性)

语法:

CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

TABLE 表名称

DESCRIPTOR 表中作为开窗的时间字段名称

step 大窗口的分割长度

size 指定最大的那个时间窗口

需求:10 分钟作为窗口,统计每隔两分钟的累计数(类似于中视频计划计算播放量完美累计曲线!)

package cn.itcast.day02.Window;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**

* @author lql

* @time 2024-03-16 19:45:02

* @description TODO

*/

public class GroupWindowsSqlCumulateExample {

public static void main(String[] args) {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String filePath = GroupWindowsSqlCumulateExample.class.getClassLoader().getResource("bid.csv").getPath();

// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t

tEnv.executeSql("create table Bid(" +

"bidtime TIMESTAMP(3)," +

"price DECIMAL(10, 2), " +

"item string," +

"watermark for bidtime as bidtime - interval '1' second) " +

"with("

+ "'connector' = 'filesystem',"

+ "'path' = 'file:///"+filePath+"',"

+ "'format' = 'csv'"

+ ")");

tEnv.sqlQuery("SELECT window_start, window_end, SUM(price) as sum_price\n" +

" FROM TABLE(\n" +

" CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))\n" +

" GROUP BY window_start, window_end"

)

.execute()

.print();

}

}

结果:

+----+-------------------------+-------------------------+-----------+

| op | window_start | window_end | sum_price |

+----+-------------------------+-------------------------+-----------+

| +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:06:00.000 | 4.00 |

| +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:08:00.000 | 6.00 |

| +I | 2021-04-15 08:00:00.000 | 2021-04-15 08:10:00.000 | 11.00 |

| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:12:00.000 | 3.00 |

| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:14:00.000 | 4.00 |

| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:16:00.000 | 4.00 |

| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:18:00.000 | 10.00 |

| +I | 2021-04-15 08:10:00.000 | 2021-04-15 08:20:00.000 | 10.00 |

+----+-------------------------+-------------------------+-----------+

8 rows in set

3. 多维数据分析

3.1 GROUPING SETS

当前效果:

SELECT window_start,

window_end,

userId,

category,

sum(price) as sum_price

FROM TABLE(

TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId), ())

以前效果:

// ()

SELECT window_start, window_end, 'NULL' as userId, 'NULL' as category, sum(price) as sum_price

FROM TABLE(

TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end

UNION ALL

// (userId)

SELECT window_start, window_end, userId as userId, 'NULL' as category, sum(price) as sum_price

FROM TABLE(

TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end, userId

UNION ALL

// (userId, category)

SELECT window_start, window_end,userId, category, sum(price) as sum_price

FROM TABLE(

TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end, userId, category

3.2 ROLLUP

速记:从右往左,全面到稀缺!

GROUP BY ROLLUP(a, b, c)

--等价于以下语句。

GROUPING SETS((a,b,c),(a,b),(a), ())

GROUP BY ROLLUP ( a, (b, c), d )

--等价于以下语句。

GROUPING SETS (

( a, b, c, d ),

( a, b, c ),

( a ),

( )

)

3.3 CUBE

速记:排列组合

GROUP BY CUBE(a, b, c)

--等价于以下语句。

GROUPING SETS((a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),())

GROUP BY CUBE ( (a, b), (c, d) )

--等价于以下语句。

GROUPING SETS (

( a, b, c, d ),

( a, b ),

( c, d ),

( )

)

// CUBE 和 GROUPING SETS 组合,相当于排列组合基础上加上元素

GROUP BY a, CUBE (b, c), GROUPING SETS ((d), (e))

--等价于以下语句。

GROUP BY GROUPING SETS (

(a, b, c, d), (a, b, c, e),

(a, b, d), (a, b, e),

(a, c, d), (a, c, e),

(a, d), (a, e)

)

3.4 GROUPING 和 GROUPING_ID

背景:GROUPING SETS 结果中使用 NULL 充当占位符,导致无法区分占位符 NULL 与数据中真正的 NULL。

3.4.1 GROUPING 函数

接受一个列名作为参数返回0,意味着 无NULL / 来自输入数据(原本存在的空值)返回1,意味着 NULL 是 GROUPING SETS 的占位符。

实例:

SELECT window_start, window_end, userId, category,

GROUPING(category) as categoryFlag,

sum(price) as sum_price,

IF(GROUPING(category) = 0, category, 'ALL') as `all`

FROM TABLE(

TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))

GROUP BY window_start, window_end, GROUPING SETS((userId, category), (userId))

结果:

window_startwindow_enduserIdcategorysum_priceflagall2021-05-23 05:16:35.0002021-05-23 05:16:40.000NULLNULL10.11ALL2021-05-23 05:16:40.0002021-05-23 05:16:45.000NULLNULL96.61ALL2021-05-23 05:16:45.0002021-05-23 05:16:50.000NULLNULL15.61ALL2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001电脑10.10电脑2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001手机14.10手机2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002手机82.50手机2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001电脑15.60电脑2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001NULL10.11ALL2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001NULL14.11ALL2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002NULL82.51ALL2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001NULL15.61ALL

3.4.2 GROUPING_ID(兼容 Hive)

MaxCompute还提供了无参数的 GROUPING__ID 函数,用于兼容Hive查询。

结果是将参数列的GROUPING结果按照BitMap的方式组成整数

MaxCompute 和 Hive 2.3.0 及以上版本兼容该函数,在Hive 2.3.0以下版本中该函数输出不一致,因此并不推荐使用此函数。

SELECT

a,b,c ,

COUNT(*),

GROUPING_ID

FROM VALUES (1,2,3) as t(a,b,c)

GROUP BY a, b, c GROUPING SETS ((a,b,c), (a));

GROUPING_ID既无输入参数,也无括号。此表达方式在 MaxCompute 中等价于 GROUPING_ID(a,b,c),参数与 GROUP BY 的顺序一致。

3.5 Window Top-N

模板:计算每10分钟营业时间窗内销售额最高的前3名供应商。

SELECT *

FROM (

SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum

FROM (

SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt

FROM TABLE(

TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

GROUP BY window_start, window_end, supplier_id

)

) WHERE rownum <= 3;

思路:先算滚动时间 10 分钟,按照窗口时间,id 分组求和,再排序函数取前三。

4. Over Windows

4.1 ROWS OVER WINDOW

按照行进行划分:BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW

注解:如果不加 rowCount 相当于从以前到现在,加上 rowCount 相当于从前 n 行到现在!

数据源:

itemIDitemTypeonSellTimepriceITEM001Electronic2021-05-11 10:01:00.00020ITEM002Electronic2021-05-11 10:02:00.00050ITEM003Electronic2021-05-11 10:03:00.00030ITEM004Electronic2021-05-11 10:03:00.00060ITEM005Electronic2021-05-11 10:05:00.00040ITEM006Electronic2021-05-11 10:06:00.00020ITEM007Electronic2021-05-11 10:07:00.00070ITEM008Clothes2021-05-11 10:08:00.00020ITEM009Clothes2021-05-11 10:09:00.00040ITEM010Clothes2021-05-11 10:11:00.00030

示例:按照 itemType 分组,onSellTime 升序,求从以前到现在总金额

select

itemID,

itemType,

onSellTime,

price,

sum(price) over w as sumPrice

from tmall_item

WINDOW w AS (

PARTITION BY itemType

ORDER BY onSellTime

ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW

)

结果:

itemIDitemTypeonSellTimepricesumPriceITEM001Electronic2021-05-11 10:01:00.00020.020.0ITEM002Electronic2021-05-11 10:02:00.00050.070.0ITEM003Electronic2021-05-11 10:03:00.00030.0100.0ITEM004Electronic2021-05-11 10:03:00.00060.0160.0ITEM005Electronic2021-05-11 10:05:00.00040.0200.0ITEM006Electronic2021-05-11 10:06:00.00020.0220.0ITEM007Electronic2021-05-11 10:07:00.00070.0290.0ITEM008Clothes2021-05-11 10:08:00.00020.020.0ITEM009Clothes2021-05-11 10:09:00.00040.060.0ITEM010Clothes2021-05-11 10:11:00.00030.090.0

4.2 RANGE OVER WINDOW

按照时间进行划分:ROWS BETWEEN ( UNBOUNDED | rowCount ) preceding AND CURRENT ROW

例子:实时统计两分钟内金额

select

itemID,

itemType,

onSellTime,

price,

sum(price) over w as sumPrice

from tmall_item

WINDOW w AS (

PARTITION BY itemType

ORDER BY onSellTime

RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW

)

5. TableAPI 窗口的定义

5.1.1 滚动窗口

Tumble 类方法:

over:定义窗口长度on:用来分组(按时间间隔)或者排序(按行数)的时间字段as:别名,必须出现在后面的groupBy中

例子:每隔5秒钟统计一次每个商品类型的销售总额

public class GroupWindowsTableApiTumbleExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator dataStream = env

.fromElements(

new OrderInfo("电脑", 1000L, 100D),

new OrderInfo("手机", 2000L, 200D),

new OrderInfo("电脑", 3000L, 300D),

new OrderInfo("手机", 4000L, 400D),

new OrderInfo("手机", 5000L, 500D),

new OrderInfo("电脑", 6000L, 600D))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(5))

.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));

table

.window(Tumble.over(lit(5).second())

.on($("timestamp")).as("w")) // 定义滚动窗口并给窗口起一个别名

.groupBy($("category"),

$("w")) // 窗口必须出现的分组字段中

.select($("category"),

$("w").start().as("window_start"),

$("w").end().as("window_end"),

$("money").sum().as("total_money"))

.execute()

.print();

env.execute();

}

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class OrderInfo {

private String category;

private Long timestamp;

private Double money;

}

}

5.1.2 滑动窗口

Slide 类方法:

over:定义窗口长度every:定义滑动步长on:用来分组(按时间间隔)或者排序(按行数)的时间字段as:别名,必须出现在后面的groupBy中

例子:每隔5秒钟统计过去10秒钟每个商品类型的销售总额

public class GroupWindowsTableApiTumbleExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator dataStream = env

.fromElements(

new OrderInfo("电脑", 1000L, 100D),

new OrderInfo("手机", 2000L, 200D),

new OrderInfo("电脑", 3000L, 300D),

new OrderInfo("手机", 4000L, 400D),

new OrderInfo("手机", 5000L, 500D),

new OrderInfo("电脑", 6000L, 600D))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(5))

.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));

table

.window(Slide.over(lit(10).second())

.every(lit(5).second())

.on($("timestamp"))

.as("w")) // 定义滚动窗口并给窗口起一个别名

.groupBy($("category"),

$("w")) // 窗口必须出现的分组字段中

.select($("category"),

$("w").start().as("window_start"),

$("w").end().as("window_end"),

$("money").sum().as("total_money"))

.execute()

.print();

env.execute();

}

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class OrderInfo {

private String category;

private Long timestamp;

private Double money;

}

}

5.1.3 会话窗口

Session 类方法:

withGap:会话时间间隔on:用来分组(按时间间隔)或者排序(按行数)的时间字段as:别名,必须出现在后面的groupBy中

例子:两次的时间间隔超过6秒的基础上,没有新的订单事件这个窗口就会关闭,然后处理这个窗口区间内所产生的订单数据计算

public class GroupWindowsTableApiTumbleExample {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

SingleOutputStreamOperator dataStream = env

.fromElements(

new OrderInfo("电脑", 1000L, 100D),

new OrderInfo("手机", 2000L, 200D),

new OrderInfo("电脑", 3000L, 300D),

new OrderInfo("手机", 4000L, 400D),

new OrderInfo("手机", 5000L, 500D),

new OrderInfo("电脑", 6000L, 600D))

.assignTimestampsAndWatermarks(

WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(5))

.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())

);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table table = tableEnv

.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));

table

.window(Session.withGap(lit(6).second())

.on($("timestamp"))

.as("w")) // 定义滚动窗口并给窗口起一个别名

.groupBy($("category"),

$("w")) // 窗口必须出现的分组字段中

.select($("category"),

$("w").start().as("window_start"),

$("w").end().as("window_end"),

$("money").sum().as("total_money"))

.execute()

.print();

env.execute();

}

@Data

@AllArgsConstructor

@NoArgsConstructor

public static class OrderInfo {

private String category;

private Long timestamp;

private Double money;

}

}

文章链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。