Flink获取Kafka中消息的Offset 写入到Mysql

前期准备环境准备python Faker制造假数据脚本编写数据测试

功能展示

虽然Flink消费kafka有着完善的checkpoint机制,可以使得程序停止后再次能从上一次的消费位点继续消费,但是有时候flink的checkpoint也会失败,或者checkpoint管理起来不够灵活,我们想自己维护kafka 的offset信息。

但是Flink封装的FlinkKafkaConsumer并不能直接的获取kafka 消息的offset

前期准备

环境准备

启动Zookeeper以及Kafka

查看启动状态

python Faker制造假数据脚本编写

以下代码使用到了制造假数据的Faker第三方模块需要先使用pip安装faker 内联代码片。

pip3 install faker

# -*- coding=utf8 -*-

import time

import random

import json

from pykafka import KafkaClient

from faker import Faker

from tqdm import tqdm

from collections import defaultdict

# broker地址 topic名称

client = KafkaClient(hosts="192.168.77.100:9092,192.168.77.100:9092,192.168.77.100:9092")

topic = client.topics["huawei"]

# faker 工厂类

class FakerFactory():

def __init__(self):

self.fk = Faker()

def create_id(self, id):

return str(id).encode()

def create_name(self):

return self.fk.name().encode()

def create_profile(self):

return self.fk.profile()

def create_age(self):

return str(random.randint(18, 90)).encode()

def create_score(self):

return str(random.randint(1, 100)).encode()

def create_event_time(self):

return str(int(round(time.time() * 1000))).encode()

def creaate_str(self):

return self.fk.pystr()

class MyEncoder(json.JSONEncoder):

def default(self, obj):

if isinstance(obj, bytes):

return str(obj, encoding='utf-8')

return json.JSONEncoder.default(self, obj)

if __name__ == '__main__':

pbar = tqdm(range(100))

fakerObj = FakerFactory()

with topic.get_sync_producer() as producer:

for i in tqdm(range(1200)):

data_continer = defaultdict(dict)

id = fakerObj.create_id(i)

name = fakerObj.create_name()

age = fakerObj.create_age()

score = fakerObj.create_score()

timestamp = fakerObj.create_event_time()

profile_data = fakerObj.create_profile()

del profile_data["current_location"]

del profile_data["birthdate"]

# print(type(profile_data))

# print(profile_data)

message = ",".encode().join([id, name, age, score, timestamp])

data_continer["description"] = fakerObj.creaate_str()

data_continer["stu_info"]["id"] = id

data_continer["stu_info"]["name"] = name

data_continer["stu_info"]["age"] = age

data_continer["stu_info"]["score"] = score

data_continer["stu_info"]["timestamp"] = timestamp

data_continer["profile_data"] = profile_data

# data_continer = data_continer.decode()

json_massage = json.dumps(data_continer, sort_keys=True, separators=(',', ':'), cls=MyEncoder)

print("===>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", json_massage)

producer.produce(json_massage.encode())

time.sleep(1)

data_continer.clear()

Json数据展示 内联代码片。

{

"description": "fiDuNUBJvGFKvscLMxQP",

"profile_data": {

"address": "385 Patrick Ranch\nEast Parker, MS 22995",

"blood_group": "A-",

"company": "Mckinney-Curtis",

"job": "Medical secretary",

"mail": "christinamurphy@hotmail.com",

"name": "Philip Michael",

"residence": "2602 Benson Points Apt. 825\nTammychester, CA 52288",

"sex": "M",

"ssn": "792-83-9640",

"username": "benjaminoliver",

"website": ["https://savage-perez.com/", "https://www.moore-hampton.com/"]

},

"stu_info": {

"age": "66",

"id": "164",

"name": "Cristina Avery",

"score": "12",

"timestamp": "1673616141785"

}

}

数据测试

可以正常消费到

根据Kafka获取的报文进行消费 直接上代码: 代码逻辑如下

import com.alibaba.fastjson2.{JSON, JSONArray, JSONObject}

import org.apache.flink.configuration.Configuration

import org.apache.flink.runtime.state.filesystem.FsStateBackend

import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}

import org.apache.flink.streaming.api.environment.CheckpointConfig

import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

import org.apache.flink.streaming.api.scala._

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import org.apache.log4j.{Level, Logger}

import java.util.Properties

import java.sql.{Connection, DriverManager, PreparedStatement}

// 测试样例类

case class PersonInfo(address: String,

job: String,

site: String,

name: String,

age: Int,

description: String,

eventime: Long)

object ParseJson {

Logger.getLogger("org").setLevel(Level.WARN)

def main(args: Array[String]): Unit = {

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

/**

* //默认checkpoint功能是disabled的,想要使用的时候需要先启用

* // 每隔5000 ms进行启动一个检查点【设置checkpoint的周期】

* env.enableCheckpointing(5000L)

* // 高级:

* // 设置模式为exactly-once(默认)

* env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

* // 确保检查点之间有至少500ms的间隔【checkpoint最小间隔】

* env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000L)

* // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】

* env.getCheckpointConfig.setCheckpointTimeout(60000L)

*

* // 同一时间只允许进行一个检查点

* env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

* // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】

*

*

* // * ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint

* // * ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint

* //

* env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

*

* //设置statebackend状态检查点

* env.setStateBackend(new FsStateBackend("hdfs://node01:8020/kafka2mysql/checkpoints", true)) //异步快照

*

*

* //语义类型 EvenTime

* env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) */

// 配置kafka

val topic = "huawei"

val prop = new Properties()

prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")

prop.setProperty("group.id", "flink")

prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

prop.setProperty("auto.offset.reset", "latest")

//构建FlinkKafkaConsumer对象

val kafkaConsumer = new FlinkKafkaConsumer[String](

topic,

new MyKafkaDeserializationSchema(), // 自定义实现kafka的消息反序列化,从重写的deserialize方法中获取到 Consumer对象的Offset以及Partition信息,下侧代码块【MyKafkaDeserializationSchema.scala】具体展示

prop)

// 接收topic数据

val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)

val personInfo: DataStream[PersonInfo] = kafkaSource.map(data => {

val jsonData: JSONObject = JSON.parseObject(data)

println(jsonData)

val description: String = jsonData.getString("description")

val stuInfo: JSONObject = jsonData.getJSONObject("stu_info")

val profile_data = jsonData.getJSONObject("profile_data")

val address: String = profile_data.getString("address")

val name: String = profile_data.getString("name")

val job: String = profile_data.getString("job")

val age: Int = stuInfo.getString("age").toInt

val timestamp: Long = stuInfo.getString("timestamp").toLong

val website: JSONArray = profile_data.getJSONArray("website")

val dataStr = if (website == null) {

""

} else {

(0 until website.size()).map(website.getString).mkString(",")

}

// 整合成样例类对象

PersonInfo(address = address, job = job, site = dataStr, name = name, age = age, description = description, eventime = timestamp)

})

personInfo.addSink(new MyKafka2DB2)

env.execute()

}

}

// 自定义sink类: MyKafka2DB2

class MyKafka2DB2 extends RichSinkFunction[PersonInfo] {

var conn: Connection = _

//插入语句

var insertTmp: PreparedStatement = _

var updateTmp: PreparedStatement = _

override def open(parameters: Configuration): Unit = {

Class.forName("com.mysql.jdbc.Driver")

//实现连接配置

conn = DriverManager.getConnection(

"jdbc:mysql://node03:3306/testDB?useUnicode=true&characterEncoding=UTF-8&useSSL=false",

"root",

"123456")

val insert_sql = "insert into personInfo values(?,?,?,?,?,?,?);"

val update_sql = "update personInfo set address = ? where name = ?;"

//实现插入,invoke中具体插入的数据内容

insertTmp = conn.prepareStatement(insert_sql)

updateTmp = conn.prepareStatement(update_sql)

conn.setAutoCommit(false)

}

override def invoke(value: PersonInfo, context: SinkFunction.Context): Unit = {

// 上来就更新,如果 更新条数为0 就插入

updateTmp.setString(1, value.name)

updateTmp.setString(2, value.address)

updateTmp.executeUpdate()

if (updateTmp.getUpdateCount == 0) {

try {

insertTmp.setString(1, value.address)

insertTmp.setString(2, value.job)

insertTmp.setString(3, value.site)

insertTmp.setString(4, value.name)

insertTmp.setInt(5, value.age)

insertTmp.setString(6, value.description)

insertTmp.setLong(7, value.eventime)

insertTmp.executeUpdate()

conn.commit()

} catch {

case ex: Exception => ex.printStackTrace()

}

}

}

override def close(): Unit = {

if (updateTmp == null) {

updateTmp.close()

}

if (insertTmp == null) {

insertTmp.close()

}

if (conn == null) {

conn.close()

}

}

}

特别注意:自定义MyKafkaDeserializationSchema,实现KafkaDeserializationSchema接口即可: 这里因为我的kafka消息已经是json串了,所以我把消息的offset 和 partition 信息直接插入到json里了。 如果 kafka中消息不是json串,那就可以自己组织数据结构,将 offset 和 partition 信息 插入到value信息中。

import com.alibaba.fastjson2.JSONObject

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}

import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema

import org.apache.kafka.clients.consumer.ConsumerRecord

class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[String] {

override def isEndOfStream(nextElement: String): Boolean = {

false

}

override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): String = {

var value = ""

if (record == null || record.value() == null) "" else value = new String(record.value())

val offset = record.offset.toString

val partition = record.partition.toString

val jsonObject = JSONObject.parseObject(value)

jsonObject.put("partition", partition)

jsonObject.put("offset", offset)

jsonObject.toString /* 原来是json数据类型还返回json */

}

/* 数据类型信息参照SimpleStringSchema源码 */

override def getProducedType: TypeInformation[String] = {

BasicTypeInfo.STRING_TYPE_INFO

}

}

功能展示

运行代码可以看到原来的json串中加入了partiton的信息和offset的信息 至此大功告成!

tips:后续如果想更灵活的不使用Flink自带的CheckPoint机制可以将获取到的Partiton信息以及Offset信息 写入到redis中,每次消费都去通过redis访问到上次消费的Partiton和Offset以便每次都可以准确的消费到上次消费未完成的位置,非常灵活

精彩链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。