Flink(八)CDC

一.简介二.DataStream方式1.MySQL binlog开启2.相关依赖3.编写代码4.打包5.测试

三.自定义反序列化

一.简介

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费

CDC的种类

Flink-CDC

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源

二.DataStream方式

1.MySQL binlog开启

修改/etc/my.cnf 文件 sudo vim /etc/my.cnf

server-id = 1

log-bin=mysql-bin

binlog_format=row

binlog-do-db=database-name // 数据库名字

binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库

重启 MySQL 使配置生效

sudo systemctl restart mysqld

2.相关依赖

org.apache.flink

flink-java

1.13.0

org.apache.flink

flink-streaming-java_2.12

1.13.0

org.apache.flink

flink-clients_2.12

1.13.0

org.apache.hadoop

hadoop-client

3.1.3

mysql

mysql-connector-java

5.1.49

com.alibaba.ververica

flink-connector-mysql-cdc

1.3.0

com.alibaba

fastjson

1.2.75

3.编写代码

public class FlinkCDC {

public static void main(String[] args) throws Exception {

// 1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 开启状态后端 1.管理状态,提供状态的查询和访问2.状态的保存位置是内存

env.setStateBackend(new HashMapStateBackend());

// 开启检查点 5秒一次

env.enableCheckpointing(5000);

// 设置检查点的保存位置

env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink-cdc");

// 检查点为精准一次

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setCheckpointTimeout(10000L);

// 2.通过Flink CDC 构建MySQL SOURCE

DebeziumSourceFunction sourceFunction = MySQLSource.builder()

.hostname("175.178.154.194")

.port(3306)

.username("root")

.password("xxxx")

.databaseList("rtdw-flink") // 库名

.tableList("rtdw-flink.base_category1") // 表名,可以有多个,因此用库名.表名来规定表名,防止不同库之间有相同的表名

.deserializer(new StringDebeziumDeserializationSchema()) // 数据输出格式(反序列化),这里暂时选择默认

.startupOptions(StartupOptions.initial()) // 读取方式,initial表示从头开始读取,还有一种latest方式,表示从启动CDC程序后,读取此时刻以后数据库变化的数据

.build();

DataStreamSource streamSource = env.addSource(sourceFunction);

// 3.打印数据

streamSource.print();

// 4.启动任务

env.execute("CDC TASK ");

}

}

4.打包

下面的打包插件可以将依赖一起打包

maven-compiler-plugin

3.6.1

1.8

1.8

maven-assembly-plugin

jar-with-dependencies

make-assembly

package

single

5.测试

开启flink集群

bin/start-cluster.sh

输入web页面

http://hadoop102:8081

添加jar包

指定启动类提交即可

可以看到输出

复制任务ID

执行命令 将该任务的检查点保存到hdfs(自定义目录)

bin/flink savepoint 224f7761b07bcee76bcaa74a5248a0e3 hdfs://hadoop102:8020/savepoint

取消任务 去MySQL修改表

找到保存点文件 实现了断点续传

三.自定义反序列化

我们可以看到,我们看到的数据格式十分复杂,不利于我们后期对数据的使用,因此我们需要自定义反序列化器,来实现数据的可用性和可读性

public class CusDeserialization implements DebeziumDeserializationSchema {

@Override

public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {

// 创建JSON对象

JSONObject res = new JSONObject();

// 获取topic 里面有数据库名和表名

String topic = sourceRecord.topic();

String[] fields = topic.split("\\.");

// 1.获取表名和数据库名

String database = fields[1];

String tableName = fields[2];

// 2.获取value 里面会有before(修改数据会有)和after

Struct value = (Struct)sourceRecord.value();

// 获取before的结构

Struct beforeStruct = value.getStruct("before");

// 将before对象放到JSON对象

JSONObject beforeJSON = new JSONObject();

// 非修改数据是不会有before的,所以要判断

if(beforeStruct != null){

// 获取元数据

Schema beforeSchema = beforeStruct.schema();

// 通过元数据获取字段名

List fieldList = beforeSchema.fields();

for (Field field : fieldList) {

// 获取字段值

Object beforeValue = beforeStruct.get(field);

// 放入JSON对象

beforeJSON.put(field.name(),beforeValue);

}

}

// 3.获取after

Struct after = value.getStruct("after");

JSONObject afterJSON = new JSONObject();

if(after != null){

Schema afterSchema = after.schema();

List fieldList = afterSchema.fields();

for (Field field : fieldList) {

Object afterValue = after.get(field);

afterJSON.put(field.name(),afterValue);

}

}

// 4.获取操作类型

Envelope.Operation operation = Envelope.operationFor(sourceRecord);

String type = operation.toString().toLowerCase();

if("create".equals(type)){ // 类型crud的全称

type = "insert";

}

// 5.数据字段写入JSON

res.put("database",database);

res.put("tableName",tableName);

res.put("before",beforeJSON);

res.put("after",afterJSON);

res.put("type",type);

// 6.发送数据至下游

collector.collect(res.toJSONString());

}

@Override

public TypeInformation getProducedType() {

return BasicTypeInfo.STRING_TYPE_INFO;

}

}

更改反序列化器

public class FlinkCDC {

public static void main(String[] args) throws Exception {

// 1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

// 2.通过Flink CDC 构建MySQL SOURCE

DebeziumSourceFunction sourceFunction = MySQLSource.builder()

.hostname("175.178.154.194")

.port(3306)

.username("root")

.password("zks123456")

.databaseList("rtdw-flink")

.tableList("rtdw-flink.base_sale_attr")

.deserializer(new CusDes())

.startupOptions(StartupOptions.initial())

.build();

DataStreamSource streamSource = env.addSource(sourceFunction);

// 3.打印数据

streamSource.print();

// 4.启动任务

env.execute("CDC TASK ");

}

}

相关阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。