简介

Flink CDC:

​ 解决了传统数据库实时同步的痛点, 该技术抛弃了其他第三方组件(例如Kafka等),能够实时读取Mysql master节点全量和增量数据,能够捕获所有数据的变化,同时它完全与业务解耦,运维也及其简单。具体介绍请参考:Flink_CDC搭建及简单使用 及 flink-cdc-connectors。

Apache Doris:

它是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。

Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!目前Apache Doris已成为Apache顶级项目,成为许多一线大厂实时数据分析数据仓库的不二选择。具体请参考:Apache Doris 。

技术选型

​ 本次采用Flink-CDC和flink-doris-connector技术利用FLink DataStream的方式实现Mysql全量或增量数据同步至分析性数据仓库Doris中。

版本选择:

FlinkFlink-doris-connectorFlink-CDCDoris1.11.x1.11.6-2.12-xx1.0.0/1.1.00.13.+1.12.x1.12.7-2.12-xx1.2.0/1.3.00.13.+1.13.x1.13.5-2.12-xx1.4.0 / 2.0.* / 2.1.* / 2.2.*0.13.+1.14.x1.14.4-2.12-xx2.2.*0.13.+

注:Flink-doris-connector版本号解读:例如1.13.5-2.12-1.0.1 表示flink 版本 1.13.5,scala 版本 2.12,connector 版本 1.0.1。

具体实现

本次代码构建选择的版本号为:

FlinkFlink-doris-connectorFlink-CDCDoris1.13.61.0.32.2.00.15.13

导入依赖:

com.ververica

flink-connector-mysql-cdc

2.2.0

org.apache.flink

flink-java

1.13.6

provided

org.apache.flink

flink-clients_2.12

1.13.6

provided

org.apache.flink

flink-streaming-java_2.12

1.13.6

provided

org.apache.flink

flink-connector-jdbc_2.12

1.13.6

provided

org.apache.flink

flink-table-planner-blink_2.12

1.13.6

provided

org.apache.doris

flink-doris-connector-1.13_2.12

1.0.3

代码实现:

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint

env.enableCheckpointing(3000);

MySqlSource mySqlSource = MySqlSource.builder()

.hostname("127.0.0.1")

.port(3306)

.databaseList("test") // set captured database

.tableList("test.test_cdc") // set captured table

.username("bigdata")

.password("123456")

.deserializer(new JsonDebeziumDeserializationSchema()).build();

Properties pro = new Properties();

pro.setProperty("format", "json");

pro.setProperty("strip_outer_array", "true");

DataStreamSource dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql source");

dataStreamSource.setParallelism(1).print();

/**

* flink cdc同步数据时的数据几种数据格式:

* insert :{"before":null,"after":{"id":30,"name":"wangsan","age":27,"address":"北京"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1652844463000,"snapshot":"false","db":"test","sequence":null,"table":"test_cdc","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":10525,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1652844463895,"transaction":null}

* update :{"before":{"id":30,"name":"wangsan","age":27,"address":"北京"},"after":{"id":30,"name":"wangsan","age":27,"address":"上海"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1652844511000,"snapshot":"false","db":"test","sequence":null,"table":"test_cdc","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":10812,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1652844511617,"transaction":null}

* delete :{"before":{"id":25,"name":"wanger","age":26,"address":"西安"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1652844336000,"snapshot":"false","db":"test","sequence":null,"table":"test_cdc","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":10239,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1652844336733,"transaction":null}

*

* doris:

* 因为doris是不能根据非主键字段做删除操作的,所以当mysql中有删除操作时,这边边采用逻辑删除的方式,将删出字段标记为已删除

* 后期在做数据分析时,可以将这部分数据过滤掉即可。

* is_delete:逻辑删除标志符 0表示正常 1表示删除

* updateStamp:数据的落库时间

*/

env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql source").map(line -> {

JSONObject lineObj = JSONObject.parseObject(line);

//如果是insert或者update操作,则取after中的数据,将是否删除设置为0

JSONObject data;

if ("d".equals(lineObj.getString("op"))) {

//如果是delete操作,则取before中的数据,将其设置为1

data = JSONObject.parseObject(lineObj.getString("before"));

data.put("is_delete", 1);

data.put("updateStamp", new Date().toLocaleString());

} else {

data = JSONObject.parseObject(lineObj.getString("after"));

data.put("is_delete", 0);

data.put("updateStamp", new Date().toLocaleString());

}

return data.toJSONString();

}).addSink(DorisSink.sink(DorisExecutionOptions.builder()

.setBatchSize(3)

.setBatchIntervalMs(10L)

.setMaxRetries(3)

.setStreamLoadProp(pro)

.setEnableDelete(true)

.build(),

DorisOptions.builder()

.setFenodes("127.0.0.1:8030")

.setTableIdentifier("test_lit.test_cdc_sink")

.setUsername("root")

.setPassword("").build()));

env.execute("Print MySQL Snapshot + Binlog");

}

总结:

​ 通过Flink CDC和flink doris connector 技术,我们可以很简单的将Mysql中增量或全量数据同步至Doris中,链路短,时效性高,简化了传统数据同步的方式,便于维护。

查看原文