1.准备环境

JDK1.8 MySQL Zookeeper Kakfa Maxweill IDEA

2.实操

2.1开启mysql的binlog

查看binlog 状态,是否开启

show variables like 'log_%'

如果log_bin显示为ON,则代表已开启。如果是OFF 说明还没开启。

[Linux] 编辑 /etc/my.cnf 文件,在[mysqld]后面增加

server-id=1

log-bin=mysql-bin

binlog_format=row

#如果不加此参数,默认所有库开启binlog

binlog-do-db=gmall_20230424

重启mysql 服务

service mysqld restart

再次查看binlog 状态

[Windows] 编辑 mysql安装目录 下 my.ini 文件,在[mysqld]后面增加 如上 linux 一样

2.2 Zookeeper 、 Kafka

2.2.1启动 ZK

bin/zkServer.sh start

2.2.2启动 Kakfa

#常规模式启动

bin/kafka-server-start.sh config/server.properties

#进程守护模式启动

nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

2.2.3创建 kafka-topic

bin/kafka-topics.sh --bootstrap-server 192.168.221.100:9092 --create --topic flink_test01 --partitions 1 --replication-factor 1

测试kafka-topic

#消费

bin/kafka-console-consumer.sh --bootstrap-server 192.168.221.100:9092 --topic flink_test01 --from-beginning

#生产

bin/kafka-console-producer.sh --broker-list 192.168.221.100:9092 --topic flink_t

2.3配置Maxwell

2.3.1创建Maxwell 所需要的 数据库 和 用户

1)创建数据库

CREATE DATABASE maxwell;

2)调整MySQL数据库密码级别

set global validate_password_policy=0;

set global validate_password_length=4;

3)创建Maxwell用户并赋予其必要权限

CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';

GRANT ALL ON maxwell.* TO 'maxwell'@'%';

GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

2.3.2配置Maxwell

在Maxwell安装包解压目录下,复制 并 编辑 config.properties.example

mv config.properties.example config.properties

vim config.properties

producer=kafka

kafka.bootstrap.servers=192.168.221.100:9092

#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table} kafka_topic=flink_test01

# mysql login info host=192.168.221.100

user=maxwell

password=maxwell

2.3.3 启动Maxwell

1)启动Maxwell

bin/maxwell --config config.properties

2)停止Maxwell

ps -ef | grep maxwell | grep -v grep | grep maxwell | awk '{print $2}' | xargs kill -9

2.4测试maxwell、mysql、kafka 正常使用

2.4.1查看Maxwell、kafka、zookeeper 进程

jps

2.4.2 mysql添加、修改、删除数据

查看 kafka 消费者

有消费 说明 流程是通畅 的

2.5 idea 编写程序

2.5.1 idea 创建 maven 项目

2.5.2 pom.xml 依赖

org.slf4j

slf4j-api

1.7.21

org.slf4j

slf4j-log4j12

1.7.21

org.apache.flink

flink-streaming-java_2.11

1.13.0

org.apache.flink

flink-java

1.13.0

org.apache.flink

flink-clients_2.11

1.13.0

org.apache.hadoop

hadoop-client

3.1.3

mysql

mysql-connector-java

8.0.13

org.apache.flink

flink-table-planner-blink_2.11

1.13.0

com.ververica

flink-connector-mysql-cdc

2.2.0

org.apache.flink

flink-connector-kafka_2.12

1.12.7

com.alibaba

fastjson

2.0.29

org.apache.kafka

kafka-clients

2.1.1

org.apache.flink

flink-runtime-web_2.11

1.13.0

compile

org.apache.flink

flink-connector-jdbc_2.11

1.12.0

org.apache.flink

flink-jdbc_2.11

1.9.1

org.apache.kafka

kafka-clients

3.0.0

org.apache.flink

flink-table-planner_2.11

1.10.1

2.5.3 编写测试代码

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.qiyu.dim.KafkaUtil;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.types.Row;

import org.apache.flink.util.Collector;

/**

* @Author liujian

* @Date 2023/4/24 9:40

* @Version 1.0

*/

public class Flink_kafka {

public static void main(String[] args) throws Exception {

// todo 1.获取执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// todo 2.将并行度设为1, (生产环境中,kafka中 topic有几个分区 设为几)

env.setParallelism(1);

// todo 3.读取 maxwell kafka 数据流

DataStream dataStreamSource=

env.addSource(KafkaUtil.getKafkaConsumer("flink_test01","192.168.221.100:9092"));

// todo 4.取kafka中的数据流 有效数据,获取 emp_user 表中的 新增、修改、初始化数据 。脏数据直接打印控制台,不处理

DataStream data = dataStreamSource.flatMap(new FlatMapFunction() {

@Override

public void flatMap(String s, Collector collector) throws Exception {

try {

// 将 数据流中 类型 转换 String >> JsonObject

JSONObject json = JSON.parseObject(s);

//取 emp_user 表数据

if (json.getString("table").equals("emp_user")) {

//取新增、修改数据

if (json.getString("type").equals("insert") || json.getString("type").equals("update")) {

System.out.println(json.getJSONObject("data"));

collector.collect(json.getJSONObject("data"));

}

}

} catch (Exception e) {

System.out.println("脏数据:" + s);

}

}

});

// todo 5. 将 有效数据转换 为 Row 类型 。JDBCOutputFormat只能处理Row,而Row是对prepared statement的参数的一个包装类

DataStream map = data.map(new MapFunction() {

@Override

public Row map(JSONObject jsonObject) throws Exception {

Row row = new Row(4);

row.setField(0, jsonObject.getString("id"));

row.setField(1, jsonObject.getString("name"));

row.setField(2, jsonObject.getString("age"));

row.setField(3, jsonObject.getString("sex"));

return row;

}

});

// todo 6. 将 数据存储 到 mysql 当中,同主键 数据 就修改, 无 就新增

String query =

"INSERT INTO gmall_20230424.emp_user_copy (id,name,age,sex) " +

"VALUES (?, ?,?,?) " +

"ON DUPLICATE KEY UPDATE name = VALUES(name) , age = VALUES(age) , sex = VALUES(sex)";

JDBCOutputFormat finish = JDBCOutputFormat.buildJDBCOutputFormat()

.setDrivername("com.mysql.jdbc.Driver")

.setDBUrl("jdbc:mysql://192.168.221.100:3306/gmall_20230424?user=root&password=000000")

.setQuery(query)

.setBatchInterval(1)

.finish();

//todo 7.提交存储任务

map.writeUsingOutputFormat(finish);

//todo 8.提交flink 任务

env.execute();

}

}

2.5.4 启动测试代码

2.5.5 测试

如 2.4.2 一样 ,在测试表 emp_user 中 进行 新增 、修改

查看 是否写入 emp_user_copy表中

INSERT into emp_user VALUES ("1","zhangsan",22,"F");

INSERT into emp_user VALUES ("2","lisi",22,"M");

INSERT into emp_user VALUES ("3","wangwu",22,"F");

INSERT into emp_user VALUES ("4","jia",22,"M");

INSERT into emp_user VALUES ("5","yi",22,"F");

UPDATE emp_user set age=23 where id ="4";

INSERT into emp_user VALUES ("6","666",22,"F");

新增 id为 4的数据时:age=22,但后面做了次 update age=23

参考链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。