参考:

技术解析|Doris Connector 结合 Flink CDC 实现 MySQL 分库分表 Exactly Once 精准接入-阿里云开发者社区

逻辑图:

1. Flink环境:

https://flink.apache.org/zh/

下载flink-1.15.1

wget https://dlcdn.apache.org/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz

解压,修改配置

tar -zxvf flink-1.15.1-bin-scala_2.12.tgz cd flink-1.15.1

修改配置

修改rest.bind-address为 0.0.0.0

vi conf/flink-conf.yaml

下载依赖jar包 至 flink安装目录lib下

wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar

wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.14_2.12/1.0.3/flink-doris-connector-1.14_2.12-1.0.3.jar

启动flink

./bin/start-cluster.sh

访问WebUI

http://192.168.0.158:8081

2、MySQL数据表及数据

开启Binlog,进入容器修改/etc/mysql/mysql.cnf,然后重启mysql

[mysqld]

log_bin=mysql_bin

binlog-format=Row

server-id=1

进入MySQL命令行:创建数据库emp,数据表employee:

CREATE DATABASE emp;

USE emp;

CREATE TABLE employee (

emp_no INT NOT NULL,

birth_date DATE NOT NULL,

first_name VARCHAR(14) NOT NULL,

last_name VARCHAR(16) NOT NULL,

gender ENUM ('M','F') NOT NULL,

hire_date DATE NOT NULL, PRIMARY KEY (emp_no)

); ​

INSERT INTO `employee` VALUES

(10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),

(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),

(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),

(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),

(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),

(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),

(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),

(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),

(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),

(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),

(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),

(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),

(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),

(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),

(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),

(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),

(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),

(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),

(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),

(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26');

3. Doris数据表

进入MySQL命令行:创建Doris数据库demo,数据表employee_info

CREATE DATABASE demo;

USE demo;

CREATE TABLE employee_info (

emp_no int NOT NULL,

birth_date date,

first_name varchar(20),

last_name varchar(20),

gender char(2),

hire_date date,

database_name varchar(50),

table_name varchar(200)

)

UNIQUE KEY(`emp_no`, `birth_date`)

DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1

PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );

4. Flink数据表及数据

启动fink-sql-client

./bin/sql-client.sh embedded

开启Checkpoint

Flink作业周期性执行checkpoint,记录Binlog位点,当作业发生Failover时,便会从之前记录的Binlog位点继续处理。

生产环境建议设置为60秒。

Flink SQL> SET execution.checkpointing.interval = 10s

创建MySQL CDC表

Flink SQL> CREATE TABLE employee_source (

database_name STRING METADATA VIRTUAL,

table_name STRING METADATA VIRTUAL,

emp_no int NOT NULL,

birth_date date,

first_name STRING,

last_name STRING,

gender STRING,

hire_date date,

PRIMARY KEY (`emp_no`) NOT ENFORCED

)

WITH (

'connector' = 'mysql-cdc',

'hostname' = 'localhost',

'port' = '3336',

'username' = 'root',

'password' = '1234.abcd',

'database-name' = 'emp',

'table-name' = 'employee'

);

查询数据:

Flink SQL> select * from employee_source limit 10;

创建Doris Sink表

Flink SQL> CREATE TABLE cdc_doris_sink (

emp_no int ,

birth_date STRING,

first_name STRING,

last_name STRING,

gender STRING,

hire_date STRING,

database_name STRING,

table_name STRING

)

WITH (

'connector' = 'doris',

'fenodes' = 'localhost:8030',

'table.identifier' = 'demo.employee_info',

'username' = 'root',

'password' = '1234.abcd'

);

参数说明:

connector : 指定连接器是doris

fenodes:doris FE节点IP地址及http port

table.identifier : Doris对应的数据库及表名

username:doris用户名

password:doris用户密码

查询数据:

Flink SQL> select * from cdc_doris_sink;

添加数据同步任务

Flink SQL> insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date,database_name,table_name)

select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date ,database_name,table_name from employee_source;

WebUI可以看到正在执行中的任务,说明添加完成

查看Doris数据表中数据

mysql> select * from employee_info;

5. 问题说明:

NoResourceAvailableException: Could not acquire the minimum required resources

进入flink目录,修改conf/conf/flink-conf.yaml:taskmanager.numberOfTaskSlots: 4 , 一般配置为cpu的个数。

查看原文