陶运道 QQ:275931339
题目:
环境说明:
Flink任务在Yarn上用per job模式(即Job分离模式,不采用Session模式),方便Yarn回收资源。
注:与订单金额计算相关使用order_money字段,同一个订单无需多次重复计算,需要考虑退款或者取消的订单。
编写Scala代码,使用Flink消费Kafka中Topic为ods_mall_log和ods_mall_data的数据并进行相应的数据统计计算(使用ProcessTime)。
使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层的fact_order_master、fact_order_detail的Topic中(只获取data的内容,具体的内容格式考生请自查),其他的表则无需处理;
一、读题分析
涉及组件:Scala,Flink,Kafka,json
涉及知识点:Flink处理数据,json文件的处理
本题重点要搞清楚ods_mall_data的数据格式,本例是json格式
1.数据按行组织,每行一条记录包括表名:data
{"table":"ods_mall_log", "data":[{"wp_web_sk":"60","wp_web_page_id":"AAAAAAA","start_date":"2001-09-03"}]}
{"table":"order_master", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAAK","start_date":"2001-09-03"}]}
{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAA","start_date":"2001-09-03"}]}
{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAA","start_date":"2001-09-03"}]}
{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtAAAA","id":"1"}]}
{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]}
{"table":"order_master", "data":[{"order_master_id":"60","name":"mtAAA","id ":"1"}]}
{"table":"order_master", "data":[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]}
2.由于主题行数据很多,读入数据包含指定表数据到f中
val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source") .filter(line => line.contains("order_master_id") || line.contains("order_detail_id")) .map(line => { import com.google.gson.JsonParser val jsonobj = new JsonParser().parse(line).getAsJsonObject // jsonobj.getAsJsonObject("data").toString jsonobj.get("data").toString })
f是按data中是否包含字符串order_master_id与order_detail_id条件筛选出来的行
假设原数据为
执行程序后就变成
{"order_detail_id":"60","name":"dtAAAA","id":"1"}]
[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]
[{"order_master_id":"60","name":"mtAAA","id ":"1"}]
[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]
注意取data数据,不能理解的是,不能根据表中作为选择条件。只能根据data中行中字段选择
3.利用skin将 f数据写到指定的主题中
这段程序关键要理解f与t关系 (上段程序获得行数据集 f就放到此处t),
val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("master:9092") .setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder() .setTopicSelector(new TopicSelector[String] { override def apply(t: String): String = { //t与f都是data数据 // println(t) if (t.contains("order_master_id")) "fact_order_master" else if (t.contains("order_detail_id")) "fact_order_detail" else null } }) .setValueSerializationSchema(new SimpleStringSchema) .build()).build() 二、处理过程
注意 1.数据在主题test中,而不是ods_mall_data中,以方便阅读
2.不必事先建立主题fact_order_detail,fact_order_master
import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import java.util.Properties object scala_1 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置流执行环境 import org.apache.flink.streaming.api.TimeCharacteristic env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //设置使用处理时间 env.setParallelism(1) // 设置并行度 env.enableCheckpointing(5000) // 启用检查点 // kafka source val kafkaSource = KafkaSource.builder[String] .setBootstrapServers("master:9092") .setTopics("test") //从主题aa // .setGroupId("group-test") .setStartingOffsets(OffsetsInitializer.earliest) .setValueOnlyDeserializer(new SimpleStringSchema) .build() // kafka sink val properties = new Properties() properties.setProperty("trans.timeout.ms", "7200000") // 2 hours // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。 val kafkaSink = KafkaSink.builder[String] .setBootstrapServers("master:9092") .setKafkaProducerConfig(properties) .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder() .setTopicSelector(new TopicSelector[String] { override def apply(t: String): String = { //t与f都是data数据 // println(t) if (t.contains("order_master_id")) "fact_order_master" else if (t.contains("order_detail_id")) "fact_order_detail" else null } }) .setValueSerializationSchema(new SimpleStringSchema) .build()).build() val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source") .filter(line => line.contains("order_master") || line.contains("order_detail")) .map(line => { import com.google.gson.JsonParser val jsonobj = new JsonParser().parse(line).getAsJsonObject // jsonobj.getAsJsonObject("data").toString jsonobj.get("data").toString }) //取出包含order_master_id、order_detail_id每行data数据,即f有这两个表数据 f.sinkTo(kafkaSink) //发至不同主题 env.execute("Task1") } }
三、重难点分析
1.主题是以行为内容的json格式,如下
例:{
"username":"YWRtaW4=","password":"bGlhblNoaTIwMjA="
}
以下代码取出json指定字段的值
val jsonStr = "{\"username\":\"YWRtaW4=\",\"password\":\"bGlhblNoaTIwMjA=\"}"; val jsonobj = new JsonParser().parse(jsonStr).getAsJsonObject println(jsonobj) val a=jsonobj.get("password") //取字段的值 println(a)
2. 取出每行data值代码
(2)val f = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source") .filter(line => line.contains("order_detail") || line.contains("order_master")) 只取这两种行 .map(x => { import com.google.gson.JsonParser val jsonobj = new JsonParser().parse(x).getAsJsonObject jsonobj.get("data") //取每行中data值 })
3.熟练使用kafka以下命令
kafka-topics.sh --list --bootstrap-server master:9092 列主题
kafka-topics.sh --delete --bootstrap-server master:9092 --topic test 删除主题
kafka-topics .sh -- create --bootstrap-server master:9092 --topic topic
kafka-console-producer.sh --broker-list master:9092 --topic testc 生产消息
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test
精彩链接
发表评论