Flink获取Kafka中消息的Offset 写入到Mysql
前期准备环境准备python Faker制造假数据脚本编写数据测试
功能展示
虽然Flink消费kafka有着完善的checkpoint机制,可以使得程序停止后再次能从上一次的消费位点继续消费,但是有时候flink的checkpoint也会失败,或者checkpoint管理起来不够灵活,我们想自己维护kafka 的offset信息。
但是Flink封装的FlinkKafkaConsumer并不能直接的获取kafka 消息的offset
前期准备
环境准备
启动Zookeeper以及Kafka
查看启动状态
python Faker制造假数据脚本编写
以下代码使用到了制造假数据的Faker第三方模块需要先使用pip安装faker 内联代码片。
pip3 install faker
# -*- coding=utf8 -*-
import time
import random
import json
from pykafka import KafkaClient
from faker import Faker
from tqdm import tqdm
from collections import defaultdict
# broker地址 topic名称
client = KafkaClient(hosts="192.168.77.100:9092,192.168.77.100:9092,192.168.77.100:9092")
topic = client.topics["huawei"]
# faker 工厂类
class FakerFactory():
def __init__(self):
self.fk = Faker()
def create_id(self, id):
return str(id).encode()
def create_name(self):
return self.fk.name().encode()
def create_profile(self):
return self.fk.profile()
def create_age(self):
return str(random.randint(18, 90)).encode()
def create_score(self):
return str(random.randint(1, 100)).encode()
def create_event_time(self):
return str(int(round(time.time() * 1000))).encode()
def creaate_str(self):
return self.fk.pystr()
class MyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, bytes):
return str(obj, encoding='utf-8')
return json.JSONEncoder.default(self, obj)
if __name__ == '__main__':
pbar = tqdm(range(100))
fakerObj = FakerFactory()
with topic.get_sync_producer() as producer:
for i in tqdm(range(1200)):
data_continer = defaultdict(dict)
id = fakerObj.create_id(i)
name = fakerObj.create_name()
age = fakerObj.create_age()
score = fakerObj.create_score()
timestamp = fakerObj.create_event_time()
profile_data = fakerObj.create_profile()
del profile_data["current_location"]
del profile_data["birthdate"]
# print(type(profile_data))
# print(profile_data)
message = ",".encode().join([id, name, age, score, timestamp])
data_continer["description"] = fakerObj.creaate_str()
data_continer["stu_info"]["id"] = id
data_continer["stu_info"]["name"] = name
data_continer["stu_info"]["age"] = age
data_continer["stu_info"]["score"] = score
data_continer["stu_info"]["timestamp"] = timestamp
data_continer["profile_data"] = profile_data
# data_continer = data_continer.decode()
json_massage = json.dumps(data_continer, sort_keys=True, separators=(',', ':'), cls=MyEncoder)
print("===>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", json_massage)
producer.produce(json_massage.encode())
time.sleep(1)
data_continer.clear()
Json数据展示 内联代码片。
{
"description": "fiDuNUBJvGFKvscLMxQP",
"profile_data": {
"address": "385 Patrick Ranch\nEast Parker, MS 22995",
"blood_group": "A-",
"company": "Mckinney-Curtis",
"job": "Medical secretary",
"mail": "christinamurphy@hotmail.com",
"name": "Philip Michael",
"residence": "2602 Benson Points Apt. 825\nTammychester, CA 52288",
"sex": "M",
"ssn": "792-83-9640",
"username": "benjaminoliver",
"website": ["https://savage-perez.com/", "https://www.moore-hampton.com/"]
},
"stu_info": {
"age": "66",
"id": "164",
"name": "Cristina Avery",
"score": "12",
"timestamp": "1673616141785"
}
}
数据测试
可以正常消费到
根据Kafka获取的报文进行消费 直接上代码: 代码逻辑如下
import com.alibaba.fastjson2.{JSON, JSONArray, JSONObject}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.log4j.{Level, Logger}
import java.util.Properties
import java.sql.{Connection, DriverManager, PreparedStatement}
// 测试样例类
case class PersonInfo(address: String,
job: String,
site: String,
name: String,
age: Int,
description: String,
eventime: Long)
object ParseJson {
Logger.getLogger("org").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* //默认checkpoint功能是disabled的,想要使用的时候需要先启用
* // 每隔5000 ms进行启动一个检查点【设置checkpoint的周期】
* env.enableCheckpointing(5000L)
* // 高级:
* // 设置模式为exactly-once(默认)
* env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
* // 确保检查点之间有至少500ms的间隔【checkpoint最小间隔】
* env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000L)
* // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
* env.getCheckpointConfig.setCheckpointTimeout(60000L)
*
* // 同一时间只允许进行一个检查点
* env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
* // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
*
*
* // * ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
* // * ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
* //
* env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
*
* //设置statebackend状态检查点
* env.setStateBackend(new FsStateBackend("hdfs://node01:8020/kafka2mysql/checkpoints", true)) //异步快照
*
*
* //语义类型 EvenTime
* env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) */
// 配置kafka
val topic = "huawei"
val prop = new Properties()
prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
prop.setProperty("group.id", "flink")
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
prop.setProperty("auto.offset.reset", "latest")
//构建FlinkKafkaConsumer对象
val kafkaConsumer = new FlinkKafkaConsumer[String](
topic,
new MyKafkaDeserializationSchema(), // 自定义实现kafka的消息反序列化,从重写的deserialize方法中获取到 Consumer对象的Offset以及Partition信息,下侧代码块【MyKafkaDeserializationSchema.scala】具体展示
prop)
// 接收topic数据
val kafkaSource: DataStream[String] = env.addSource(kafkaConsumer)
val personInfo: DataStream[PersonInfo] = kafkaSource.map(data => {
val jsonData: JSONObject = JSON.parseObject(data)
println(jsonData)
val description: String = jsonData.getString("description")
val stuInfo: JSONObject = jsonData.getJSONObject("stu_info")
val profile_data = jsonData.getJSONObject("profile_data")
val address: String = profile_data.getString("address")
val name: String = profile_data.getString("name")
val job: String = profile_data.getString("job")
val age: Int = stuInfo.getString("age").toInt
val timestamp: Long = stuInfo.getString("timestamp").toLong
val website: JSONArray = profile_data.getJSONArray("website")
val dataStr = if (website == null) {
""
} else {
(0 until website.size()).map(website.getString).mkString(",")
}
// 整合成样例类对象
PersonInfo(address = address, job = job, site = dataStr, name = name, age = age, description = description, eventime = timestamp)
})
personInfo.addSink(new MyKafka2DB2)
env.execute()
}
}
// 自定义sink类: MyKafka2DB2
class MyKafka2DB2 extends RichSinkFunction[PersonInfo] {
var conn: Connection = _
//插入语句
var insertTmp: PreparedStatement = _
var updateTmp: PreparedStatement = _
override def open(parameters: Configuration): Unit = {
Class.forName("com.mysql.jdbc.Driver")
//实现连接配置
conn = DriverManager.getConnection(
"jdbc:mysql://node03:3306/testDB?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"root",
"123456")
val insert_sql = "insert into personInfo values(?,?,?,?,?,?,?);"
val update_sql = "update personInfo set address = ? where name = ?;"
//实现插入,invoke中具体插入的数据内容
insertTmp = conn.prepareStatement(insert_sql)
updateTmp = conn.prepareStatement(update_sql)
conn.setAutoCommit(false)
}
override def invoke(value: PersonInfo, context: SinkFunction.Context): Unit = {
// 上来就更新,如果 更新条数为0 就插入
updateTmp.setString(1, value.name)
updateTmp.setString(2, value.address)
updateTmp.executeUpdate()
if (updateTmp.getUpdateCount == 0) {
try {
insertTmp.setString(1, value.address)
insertTmp.setString(2, value.job)
insertTmp.setString(3, value.site)
insertTmp.setString(4, value.name)
insertTmp.setInt(5, value.age)
insertTmp.setString(6, value.description)
insertTmp.setLong(7, value.eventime)
insertTmp.executeUpdate()
conn.commit()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}
override def close(): Unit = {
if (updateTmp == null) {
updateTmp.close()
}
if (insertTmp == null) {
insertTmp.close()
}
if (conn == null) {
conn.close()
}
}
}
特别注意:自定义MyKafkaDeserializationSchema,实现KafkaDeserializationSchema接口即可: 这里因为我的kafka消息已经是json串了,所以我把消息的offset 和 partition 信息直接插入到json里了。 如果 kafka中消息不是json串,那就可以自己组织数据结构,将 offset 和 partition 信息 插入到value信息中。
import com.alibaba.fastjson2.JSONObject
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
class MyKafkaDeserializationSchema extends KafkaDeserializationSchema[String] {
override def isEndOfStream(nextElement: String): Boolean = {
false
}
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): String = {
var value = ""
if (record == null || record.value() == null) "" else value = new String(record.value())
val offset = record.offset.toString
val partition = record.partition.toString
val jsonObject = JSONObject.parseObject(value)
jsonObject.put("partition", partition)
jsonObject.put("offset", offset)
jsonObject.toString /* 原来是json数据类型还返回json */
}
/* 数据类型信息参照SimpleStringSchema源码 */
override def getProducedType: TypeInformation[String] = {
BasicTypeInfo.STRING_TYPE_INFO
}
}
功能展示
运行代码可以看到原来的json串中加入了partiton的信息和offset的信息 至此大功告成!
tips:后续如果想更灵活的不使用Flink自带的CheckPoint机制可以将获取到的Partiton信息以及Offset信息 写入到redis中,每次消费都去通过redis访问到上次消费的Partiton和Offset以便每次都可以准确的消费到上次消费未完成的位置,非常灵活
精彩链接
发表评论