interval join是区间join, 为了效率一般设置有一定的范围, 但是某些数据超出设置范围就会丢失, 对于无法容忍丢失的场景, 可以使用cogroup+侧输出流+connect 解决, 将join不到的数据保存到指定介质.

大概思路如下

使用windowAll进行数据筛选, 延迟很久的数据利用sideOutputLateData也能被下发使用cogroup 将窗口数据保存, 然后处理数据, 输出到主流在主流分流, join上的数据还是放在主流, 而join不上的数据, 输出到侧输出流将join失败流和延迟流进行connect, keyby分组后, 进行process处理.分组后相同的key会在一起, 所以先到达的数据保存到状态, 然后设置定时器, 后到达的数据查找对方状态进行join, 假如定时器响应之后, 后来的数据还没有到达, 后续保存到指定介质.

流程图

缺点就是需要自己实现join逻辑

简单Demo 如下, flink 版本是1.14的:

package com.kimi.flink.dataStream.demo.api;

import io.netty.handler.codec.DateFormatter;

import lombok.Data;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.CoGroupFunction;

import org.apache.flink.api.common.functions.JoinFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.datastream.*;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.ProcessFunction;

import org.apache.flink.streaming.api.functions.co.CoProcessFunction;

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;

import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;

import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;

import org.apache.flink.streaming.api.windowing.time.Time;

import org.apache.flink.streaming.api.windowing.triggers.Trigger;

import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import org.apache.flink.streaming.api.windowing.windows.Window;

import org.apache.flink.util.Collector;

import org.apache.flink.util.StringUtils;

import org.apache.flink.util.OutputTag;

import java.io.Serializable;

import java.sql.Timestamp;

import java.time.*;

import java.time.format.DateTimeFormatter;

import java.util.*;

import java.util.concurrent.TimeUnit;

/**

* 这个demo核心点是在每个流前面 开一个windowAll窗口, 利用windowAll窗口和coGroup共同窗口大小的特性,

* 将延迟很久的数据(窗口已经结束)通过sideOutputLateData能够下发, 自定义收集起来.

*

* 数据模拟输入情况在最下面有演示

*

*/

public class CoGroupJoinAndConnectExample {

private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws Exception {

// 创建 Flink 流式处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 创建数据源1 使用 nv -lk 8888 模拟数据

SingleOutputStreamOperator stream1 = getSocketStream(env, "localhost", 8888);

// 创建数据源2

SingleOutputStreamOperator stream2 = getSocketStream(env, "localhost", 9999);

OutputTag stream1LateTag = new OutputTag("stream1LateTag", TypeInformation.of(new TypeHint() {})) {};

OutputTag stream2LateTag = new OutputTag("stream2LateTag", TypeInformation.of(new TypeHint() {})) {};

OutputTag joinFailTag = new OutputTag("joinFail", TypeInformation.of(new TypeHint() {})) {};

OutputTag timerFailTag = new OutputTag("timerFail", TypeInformation.of(new TypeHint() {})) {};

int windowSize = 10;

SingleOutputStreamOperator lateStream1WithWindow = stream1

.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))

.sideOutputLateData(stream1LateTag)

.apply(new AllWindowFunction() {

@Override

public void apply(TimeWindow window, Iterable stream1values, Collector out) throws Exception {

for (Student student : stream1values) {

// System.out.println("windowAll下发: " + student);

out.collect(student);

}

}

});

SingleOutputStreamOperator lateStream2WithWindow = stream2

.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))

.sideOutputLateData(stream2LateTag)

.apply(new AllWindowFunction() {

@Override

public void apply(TimeWindow window, Iterable stream2values, Collector out) throws Exception {

for (Student student : stream2values) {

// System.out.println("windowAll下发: " + student);

out.collect(student);

}

}

});

DataStream lateStream1 = lateStream1WithWindow.getSideOutput(stream1LateTag);

DataStream lateStream2 = lateStream2WithWindow.getSideOutput(stream2LateTag);

DataStream lateStream = lateStream1.union(lateStream2);

lateStream.print("发现迟到数据: ");

// 执行 co-group join 操作

DataStream joinedStream = lateStream1WithWindow

.coGroup(lateStream2WithWindow)

.where((KeySelector) Student::getId)

.equalTo((KeySelector) Student::getId)

// 设置 join 窗口时间为 10 秒

.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))

.apply(new CoGroupFunction() {

@Override

public void coGroup(Iterable stream1,

Iterable stream2,

Collector out) throws Exception {

boolean stream1IfEmpty = false;

for (Student student1 : stream1) {

stream1IfEmpty = true;

boolean isJoined = false;

for (Student student2 : stream2) {

// 1. join上了补上 sex, 发送到主流

student1.setSex(student2.getSex());

out.collect(student1);

isJoined = true;

}

// 2. stream1的sex 没有join上, 发送到主流

if (!isJoined) {

out.collect(student1);

}

}

// stream1 没有数据, 那么只有stream2有

if (!stream1IfEmpty) {

// 3. stream2的name 没有join上, 发送到主流

for (Student student : stream2) {

out.collect(student);

}

}

}

}

);

SingleOutputStreamOperator processStream = joinedStream.process(new ProcessFunction() {

@Override

public void processElement(Student student, Context ctx, Collector out) throws Exception {

String name = student.getName();

String sex = student.getSex();

// name sex 不为null是join上数据, 继续发送主流

if (name != null && sex != null) {

out.collect(student);

} else {

// 否则某一个为null是join不上的数据, 发送到侧输出流

ctx.output(joinFailTag, student);

}

}

}).returns(TypeInformation.of(new TypeHint() {

}));

DataStream failStream = processStream.getSideOutput(joinFailTag);

SingleOutputStreamOperator connectStream = failStream.connect(lateStream)

.keyBy((KeySelector) Student::getId,

(KeySelector) Student::getId)

.process(new CoProcessFunction() {

private ValueState failStreamState;

private ValueState lateStreamState;

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

failStreamState = getRuntimeContext().getState(new ValueStateDescriptor<>("failStream", Student.class));

lateStreamState = getRuntimeContext().getState(new ValueStateDescriptor<>("lateStream", Student.class));

}

/**

* 处理失败流数据

* @param failStreamStudent

* @param ctx

* @param out

* @throws Exception

*/

@Override

public void processElement1(Student failStreamStudent, Context ctx, Collector out) throws Exception {

if (lateStreamState.value() != null) {

if (failStreamStudent.getSex() == null && lateStreamState.value().getSex() != null) {

failStreamStudent.setSex(lateStreamState.value().getSex());

} else if (failStreamStudent.getName() == null && lateStreamState.value().getName() != null) {

failStreamStudent.setName(lateStreamState.value().getName());

}

// 补充完数据, 输出到主流

out.collect(failStreamStudent);

lateStreamState.clear();

} else {

failStreamState.update(failStreamStudent);

System.out.println("failStream注册定时器: " + failStreamStudent);

ctx.timerService().registerEventTimeTimer(failStreamState.value().getLogTime().getTime() + 20000);

}

}

/**

* 处理迟到流数据

* @param lateStreamStudent

* @param ctx

* @param out

* @throws Exception

*/

@Override

public void processElement2(Student lateStreamStudent, Context ctx, Collector out) throws Exception {

if (failStreamState.value() != null) {

if (lateStreamStudent.getSex() == null && failStreamState.value().getSex() != null) {

lateStreamStudent.setSex(failStreamState.value().getSex());

} else if (lateStreamStudent.getName() == null && failStreamState.value().getName() != null) {

lateStreamStudent.setName(failStreamState.value().getName());

}

// 补充完数据, 输出到主流

out.collect(lateStreamStudent);

failStreamState.clear();

} else {

lateStreamState.update(lateStreamStudent);

System.out.println("lateStream注册定时器 " + lateStreamStudent);

ctx.timerService().registerEventTimeTimer(lateStreamState.value().getLogTime().getTime() + 20000);

}

}

@Override

public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception {

super.onTimer(timestamp, ctx, out);

if (failStreamState.value() != null) {

// 发送到失败流

ctx.output(timerFailTag, failStreamState.value());

}

if (lateStreamState.value() != null) {

// 发送到失败流

ctx.output(timerFailTag, lateStreamState.value());

}

failStreamState.clear();

lateStreamState.clear();

}

}).returns(TypeInformation.of(new TypeHint() {

}));

DataStream timerFailStream = connectStream.getSideOutput(timerFailTag);

// 后续可以将processStream union connectStream 一起发送到某介质

processStream.print("主流join成功 ===> ");

connectStream.print("定时器join成功 ===> ");

// 后续所有join失败的数据都会走这里

timerFailStream.print("定时器join失败 ===> ");

env.execute("Flink CoGroup Join + connect Example");

}

/**

* @param env

* @param hostname

* @param port

* @return

*/

private static SingleOutputStreamOperator getSocketStream(

StreamExecutionEnvironment env,

String hostname,

int port) {

return env.socketTextStream(hostname, port)

.map(new MapFunction() {

@Override

public Student map(String value) throws Exception {

if (!StringUtils.isNullOrWhitespaceOnly(value)) {

String[] parts = value.split(",");

if (parts.length == 3) {

Student student = new Student();

student.setId(parts[0].trim());

if (port == 8888) {

student.setName(parts[1].trim());

} else if (port == 9999) {

student.setSex(parts[1].trim());

}

Timestamp timestamp = Timestamp.valueOf(LocalDateTime.parse(parts[2], formatter));

student.setLogTime(timestamp);

// System.out.println("getSocketStream: " + student);

return student;

}

}

return null;

}

})

.filter(Objects::nonNull)

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(

Duration.ofSeconds(0))

.withTimestampAssigner(new SerializableTimestampAssigner() {

@Override

public long extractTimestamp(Student element, long recordTimestamp) {

return element.getLogTime().getTime();

}

}));

}

@Data

static class Student implements Serializable {

private String id;

private String name;

private String sex;

private Timestamp logTime;

}

}

// 这里演示每条流分别只有一条数据进行join,stream1寻找stream2补充`性别`数据

// 或者stream2寻找stream1补充`名字`数据

// 情况一:

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:05

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:05

// 接着再下发数据发触发窗口

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:15

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:15

// 控制台输出:

// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:05.0)

//---------------------------------------------------

// 情况二, 模拟数据短暂延迟, 在定时器join成功:

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:05

// 接着再下发数据

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:15

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:15

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:05

// 因为(1,女,2022-03-08 15:00:05) 延迟, 所以(1,婷婷,2022-03-08 15:00:05)下发,注册定时器

// 等待(1,女,2022-03-08 15:00:05)到来, 会在定时器join上

// 控制台输出:

// failStream注册定时器: CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=null, logTime=2022-03-08 15:00:05.0)

// 发现迟到数据: > CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)

// 定时器join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:05.0)

//------------------------------------------------------

// 情况三, 模拟数据延迟很久, 之前早到的数据已经在定时器join失败下发了:

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:05

// 接着再下发数据

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:15

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:15

//

// 控制台输出:

// failStream注册定时器: CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=null, logTime=2022-03-08 15:00:05.0)

//

// 接着再下发数据

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:25

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:25

// 控制台输出:

// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:15.0)

//

// 接着再下发数据

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:35

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:35

// 控制台输出:

// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:25.0)

// 定时器join失败 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=null, logTime=2022-03-08 15:00:05.0)

//

// 接着再下发数据

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:05

//

// 控制台输出:

// lateStream注册定时器 CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)

// 发现迟到数据: > CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)

//

// 接着再下发数据

// 8888>输入stream1数据: 1,婷婷,2022-03-08 15:00:55

// 9999>输入stream2数据: 1,女,2022-03-08 15:00:55

// 控制台输出:

// 主流join成功 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=婷婷, sex=女, logTime=2022-03-08 15:00:35.0)

// 定时器join失败 ===> > CoGroupJoinAndConnectExample.Student(id=1, name=null, sex=女, logTime=2022-03-08 15:00:05.0)

//

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。