interval join是区间join, 为了效率一般设置有一定的范围, 但是某些数据超出设置范围就会丢失, 对于无法容忍丢失的场景, 可以使用cogroup+侧输出流+connect 解决, 将join不到的数据保存到指定介质.
大概思路如下
使用windowAll进行数据筛选, 延迟很久的数据利用sideOutputLateData也能被下发使用cogroup 将窗口数据保存, 然后处理数据, 输出到主流在主流分流, join上的数据还是放在主流, 而join不上的数据, 输出到侧输出流将join失败流和延迟流进行connect, keyby分组后, 进行process处理.分组后相同的key会在一起, 所以先到达的数据保存到状态, 然后设置定时器, 后到达的数据查找对方状态进行join, 假如定时器响应之后, 后来的数据还没有到达, 后续保存到指定介质.
流程图
缺点就是需要自己实现join逻辑
简单Demo 如下, flink 版本是1.14的:
package com.kimi.flink.dataStream.demo.api;
import io.netty.handler.codec.DateFormatter;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.OutputTag;
import java.io.Serializable;
import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 这个demo核心点是在每个流前面 开一个windowAll窗口, 利用windowAll窗口和coGroup共同窗口大小的特性,
* 将延迟很久的数据(窗口已经结束)通过sideOutputLateData能够下发, 自定义收集起来.
*
* 数据模拟输入情况在最下面有演示
*
*/
public class CoGroupJoinAndConnectExample {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
// 创建 Flink 流式处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 创建数据源1 使用 nv -lk 8888 模拟数据
SingleOutputStreamOperator
// 创建数据源2
SingleOutputStreamOperator
OutputTag
OutputTag
OutputTag
OutputTag
int windowSize = 10;
SingleOutputStreamOperator
.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.sideOutputLateData(stream1LateTag)
.apply(new AllWindowFunction
@Override
public void apply(TimeWindow window, Iterable
for (Student student : stream1values) {
// System.out.println("windowAll下发: " + student);
out.collect(student);
}
}
});
SingleOutputStreamOperator
.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.sideOutputLateData(stream2LateTag)
.apply(new AllWindowFunction
@Override
public void apply(TimeWindow window, Iterable
for (Student student : stream2values) {
// System.out.println("windowAll下发: " + student);
out.collect(student);
}
}
});
DataStream
DataStream
DataStream
lateStream.print("发现迟到数据: ");
// 执行 co-group join 操作
DataStream
.coGroup(lateStream2WithWindow)
.where((KeySelector
.equalTo((KeySelector
// 设置 join 窗口时间为 10 秒
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new CoGroupFunction
@Override
public void coGroup(Iterable
Iterable
Collector
boolean stream1IfEmpty = false;
for (Student student1 : stream1) {
stream1IfEmpty = true;
boolean isJoined = false;
for (Student student2 : stream2) {
// 1. join上了补上 sex, 发送到主流
student1.setSex(student2.getSex());
out.collect(student1);
isJoined = true;
}
// 2. stream1的sex 没有join上, 发送到主流
if (!isJoined) {
out.collect(student1);
}
}
// stream1 没有数据, 那么只有stream2有
if (!stream1IfEmpty) {
// 3. stream2的name 没有join上, 发送到主流
for (Student student : stream2) {
out.collect(student);
}
}
}
}
);
SingleOutputStreamOperator
@Override
public void processElement(Student student, Context ctx, Collector
String name = student.getName();
String sex = student.getSex();
// name sex 不为null是join上数据, 继续发送主流
if (name != null && sex != null) {
out.collect(student);
} else {
// 否则某一个为null是join不上的数据, 发送到侧输出流
ctx.output(joinFailTag, student);
}
}
}).returns(TypeInformation.of(new TypeHint
}));
DataStream
SingleOutputStreamOperator
.keyBy((KeySelector
(KeySelector
.process(new CoProcessFunction
private ValueState
private ValueState
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
failStreamState = getRuntimeContext().getState(new ValueStateDescriptor<>("failStream", Student.class));
lateStreamState = getRuntimeContext().getState(new ValueStateDescriptor<>("lateStream", Student.class));
}
/**
* 处理失败流数据
* @param failStreamStudent
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement1(Student failStreamStudent, Context ctx, Collector
if (lateStreamState.value() != null) {
if (failStreamStudent.getSex() == null && lateStreamState.value().getSex() != null) {
failStreamStudent.setSex(lateStreamState.value().getSex());
} else if (failStreamStudent.getName() == null && lateStreamState.value().getName() != null) {
failStreamStudent.setName(lateStreamState.value().getName());
}
// 补充完数据, 输出到主流
out.collect(failStreamStudent);
lateStreamState.clear();
} else {
failStreamState.update(failStreamStudent);
System.out.println("failStream注册定时器: " + failStreamStudent);
ctx.timerService().registerEventTimeTimer(failStreamState.value().getLogTime().getTime() + 20000);
}
}
/**
* 处理迟到流数据
* @param lateStreamStudent
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement2(Student lateStreamStudent, Context ctx, Collector
if (failStreamState.value() != null) {
if (lateStreamStudent.getSex() == null && failStreamState.value().getSex() != null) {
lateStreamStudent.setSex(failStreamState.value().getSex());
} else if (lateStreamStudent.getName() == null && failStreamState.value().getName() != null) {
lateStreamStudent.setName(failStreamState.value().getName());
}
// 补充完数据, 输出到主流
out.collect(lateStreamStudent);
failStreamState.clear();
} else {
lateStreamState.update(lateStreamStudent);
System.out.println("lateStream注册定时器 " + lateStreamStudent);
ctx.timerService().registerEventTimeTimer(lateStreamState.value().getLogTime().getTime() + 20000);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector
super.onTimer(timestamp, ctx, out);
if (failStreamState.value() != null) {
// 发送到失败流
ctx.output(timerFailTag, failStreamState.value());
}
if (lateStreamState.value() != null) {
// 发送到失败流
ctx.output(timerFailTag, lateStreamState.value());
}
failStreamState.clear();
lateStreamState.clear();
}
}).returns(TypeInformation.of(new TypeHint
}));
DataStream
// 后续可以将processStream union connectStream 一起发送到某介质
processStream.print("主流join成功 ===> ");
connectStream.print("定时器join成功 ===> ");
// 后续所有join失败的数据都会走这里
timerFailStream.print("定时器join失败 ===> ");
env.execute("Flink CoGroup Join + connect Example");
}
/**
* @param env
* @param hostname
* @param port
* @return
*/
private static SingleOutputStreamOperator
StreamExecutionEnvironment env,
String hostname,
int port) {
return env.socketTextStream(hostname, port)
.map(new MapFunction
@Override
public Student map(String value) throws Exception {
if (!StringUtils.isNullOrWhitespaceOnly(value)) {
String[] parts = value.split(",");
if (parts.length == 3) {
Student student = new Student();
student.setId(parts[0].trim());
if (port == 8888) {
student.setName(parts[1].trim());
} else if (port == 9999) {
student.setSex(parts[1].trim());
}
Timestamp timestamp = Timestamp.valueOf(LocalDateTime.parse(parts[2], formatter));
student.setLogTime(timestamp);
// System.out.println("getSocketStream: " + student);
return student;
}
}
return null;
}
})
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy.
Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner
@Override
public long extractTimestamp(Student element, long recordTimestamp) {
return element.getLogTime().getTime();
}
}));
}
@Data
static class Student implements Serializable {
private String id;
private String name;
private String sex;
private Timestamp logTime;
}
}
// 这里演示每条流分别只有一条数据进行join,stream1寻找stream2补充`性别`数据
// 或者stream2寻找stream1补充`名字`数据
// 情况一:
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:05
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:05
// 接着再下发数据发触发窗口
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:15
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:15
// 控制台输出:
// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:05.0)
//---------------------------------------------------
// 情况二, 模拟数据短暂延迟, 在定时器join成功:
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:05
// 接着再下发数据
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:15
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:15
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:05
// 因为(1,女,2022-03-08 15:00:05) 延迟, 所以(1,婷婷,2022-03-08 15:00:05)下发,注册定时器
// 等待(1,女,2022-03-08 15:00:05)到来, 会在定时器join上
// 控制台输出:
// failStream注册定时器: CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=null, logTime=2022-03-08 15:00:05.0)
// 发现迟到数据: > CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)
// 定时器join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:05.0)
//------------------------------------------------------
// 情况三, 模拟数据延迟很久, 之前早到的数据已经在定时器join失败下发了:
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:05
// 接着再下发数据
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:15
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:15
//
// 控制台输出:
// failStream注册定时器: CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=null, logTime=2022-03-08 15:00:05.0)
//
// 接着再下发数据
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:25
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:25
// 控制台输出:
// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:15.0)
//
// 接着再下发数据
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:35
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:35
// 控制台输出:
// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:25.0)
// 定时器join失败 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=null, logTime=2022-03-08 15:00:05.0)
//
// 接着再下发数据
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:05
//
// 控制台输出:
// lateStream注册定时器 CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)
// 发现迟到数据: > CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)
//
// 接着再下发数据
// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:55
// 9999>输入stream2数据: 1,女,2022-03-08 15:00:55
// 控制台输出:
// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:35.0)
// 定时器join失败 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)
//
推荐链接
发表评论