一、写在前面

在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。

二、代码示例

2.1 版本说明

1.14.6

2.4.3

2.8.5

1.4.9

2.3.5

1.8

2.11.8

8.0.22

2.11

2.2 导入相关依赖

org.apache.flink

flink-connector-jdbc_2.11

${flink.version}

mysql

mysql-connector-java

8.0.22

2.3 连接数据库,创建表

mysql> CREATE TABLE `ws` (

`id` varchar(100) NOT NULL

,`ts` bigint(20) DEFAULT NULL

,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8

2.4 创建POJO类

package com.flink.POJOs;

import java.util.Objects;

/**

* TODO POJO类的特点

* 类是公有(public)的

* 有一个无参的构造方法

* 所有属性都是公有(public)的

* 所有属性的类型都是可以序列化的

*/

public class WaterSensor {

//类的公共属性

public String id;

public Long ts;

public Integer vc;

//无参构造方法

public WaterSensor() {

//System.out.println("调用了无参数的构造方法");

}

public WaterSensor(String id, Long ts, Integer vc) {

this.id = id;

this.ts = ts;

this.vc = vc;

}

//生成get和set方法

public void setId(String id) {

this.id = id;

}

public void setTs(Long ts) {

this.ts = ts;

}

public void setVc(Integer vc) {

this.vc = vc;

}

public String getId() {

return id;

}

public Long getTs() {

return ts;

}

public Integer getVc() {

return vc;

}

//重写toString方法

@Override

public String toString() {

return "WaterSensor{" +

"id='" + id + '\'' +

", ts=" + ts +

", vc=" + vc +

'}';

}

//重写equals和hasCode方法

@Override

public boolean equals(Object o) {

if (this == o) return true;

if (o == null || getClass() != o.getClass()) return false;

WaterSensor that = (WaterSensor) o;

return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);

}

@Override

public int hashCode() {

return Objects.hash(id, ts, vc);

}

}

//scala的case类?

2.5 自定义map函数

package com.flink.POJOs;

import org.apache.flink.api.common.functions.MapFunction;

public class WaterSensorMapFunction implements MapFunction {

@Override

public WaterSensor map(String value) throws Exception {

String[] datas = value.split(",");

return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

}

}

2.5 Flink2MySQL

package com.flink.DataStream.Sink;

import com.flink.POJOs.WaterSensor;

import com.flink.POJOs.WaterSensorMapFunction;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;

import org.apache.flink.connector.jdbc.JdbcExecutionOptions;

import org.apache.flink.connector.jdbc.JdbcSink;

import org.apache.flink.connector.jdbc.JdbcStatementBuilder;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;

import java.sql.SQLException;

/**

* Flink 输出到 MySQL(JDBC)

*/

public class flinkSinkJdbc {

public static void main(String[] args) throws Exception {

//TODO 创建Flink上下文执行环境

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

streamExecutionEnvironment.setParallelism(1);

//TODO Source

DataStreamSource dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);

//TODO Transfer

SingleOutputStreamOperator waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());

/**TODO 写入 mysql

* 1、只能用老的 sink 写法

* 2、JDBCSink 的 4 个参数:

* 第一个参数: 执行的 sql,一般就是 insert into

* 第二个参数: 预编译 sql, 对占位符填充值

* 第三个参数: 执行选项 ---->攒批、重试

* 第四个参数: 连接选项---->url、用户名、密码

*/

SinkFunction sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",

new JdbcStatementBuilder() {

@Override

public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {

preparedStatement.setString(1, waterSensor.getId());

preparedStatement.setLong(2, waterSensor.getTs());

preparedStatement.setInt(3, waterSensor.getVc());

System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");

}

}

, JdbcExecutionOptions

.builder()

.withMaxRetries(3) // 重试次数

.withBatchSize(100) // 批次的大小:条数

.withBatchIntervalMs(3000) // 批次的时间

.build(),

new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()

.withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")

.withUsername("root")

.withPassword("********")

.withConnectionCheckTimeoutSeconds(60) // 重试的超时时间

.build()

);

//TODO 写入到Mysql

waterSensorSingleOutputStreamOperator.addSink(sinkFunction);

streamExecutionEnvironment.execute();

}

}

2.6 启动necat、Flink,观察数据库写入情况

nc -lk 9999 #启动necat、并监听8888端口,写入数据

启动Flink程序 查看数据库写入是否正常

相关阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。