陶运道  QQ:275931339

题目:

环境说明:

Flink任务在Yarn上用per job模式(即Job分离模式,不采用Session模式),方便Yarn回收资源。

注:与订单金额计算相关使用order_money字段,同一个订单无需多次重复计算,需要考虑退款或者取消的订单。

编写Scala代码,使用Flink消费Kafka中Topic为ods_mall_log和ods_mall_data的数据并进行相应的数据统计计算(使用ProcessTime)。

使用Flink消费Kafka中topic为ods_mall_data的数据,根据数据中不同的表将数据分别分发至kafka的DWD层的fact_order_master、fact_order_detail的Topic中(只获取data的内容,具体的内容格式考生请自查),其他的表则无需处理;

一、读题分析

涉及组件:Scala,Flink,Kafka,json

涉及知识点:Flink处理数据,json文件的处理

本题重点要搞清楚ods_mall_data的数据格式,本例是json格式

1.数据按行组织,每行一条记录包括表名:data

{"table":"ods_mall_log", "data":[{"wp_web_sk":"60","wp_web_page_id":"AAAAAAA","start_date":"2001-09-03"}]}

{"table":"order_master", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAAK","start_date":"2001-09-03"}]}

{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAAAA","start_date":"2001-09-03"}]} 

{"table":"order_detail", "data":[{"wp_web__sk":"60","wp_web_page_id":"AAAAAA","start_date":"2001-09-03"}]}

{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtAAAA","id":"1"}]}

{"table":"order_detail", "data":[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]}                 

{"table":"order_master", "data":[{"order_master_id":"60","name":"mtAAA","id ":"1"}]}

{"table":"order_master", "data":[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]}

2.由于主题行数据很多,读入数据包含指定表数据到f中

val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")       .filter(line => line.contains("order_master_id") || line.contains("order_detail_id"))       .map(line => {         import com.google.gson.JsonParser         val jsonobj = new JsonParser().parse(line).getAsJsonObject        // jsonobj.getAsJsonObject("data").toString         jsonobj.get("data").toString       })

  f是按data中是否包含字符串order_master_id与order_detail_id条件筛选出来的行

 假设原数据为

执行程序后就变成

{"order_detail_id":"60","name":"dtAAAA","id":"1"}]

[{"order_detail_id":"60","name":"dtBBBB","id":"2"}]

[{"order_master_id":"60","name":"mtAAA","id ":"1"}]

[{"order_master_id":"60","name":"mtBBBB","id ":"2"}]

注意取data数据,不能理解的是,不能根据表中作为选择条件。只能根据data中行中字段选择

3.利用skin将 f数据写到指定的主题中

这段程序关键要理解f与t关系 (上段程序获得行数据集 f就放到此处t),

val kafkaSink = KafkaSink.builder[String]       .setBootstrapServers("master:9092")       .setKafkaProducerConfig(properties)       .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()         .setTopicSelector(new TopicSelector[String] {           override def apply(t: String): String = {  //t与f都是data数据           // println(t)             if (t.contains("order_master_id")) "fact_order_master"             else if (t.contains("order_detail_id")) "fact_order_detail"             else null           }         })         .setValueSerializationSchema(new SimpleStringSchema)         .build()).build() 二、处理过程

  注意 1.数据在主题test中,而不是ods_mall_data中,以方便阅读

       2.不必事先建立主题fact_order_detail,fact_order_master

import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.scala._ import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink, TopicSelector} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import java.util.Properties object scala_1 {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment // 设置流执行环境     import org.apache.flink.streaming.api.TimeCharacteristic     env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //设置使用处理时间     env.setParallelism(1) // 设置并行度     env.enableCheckpointing(5000)   // 启用检查点     // kafka source     val kafkaSource = KafkaSource.builder[String]       .setBootstrapServers("master:9092")       .setTopics("test")   //从主题aa      // .setGroupId("group-test")       .setStartingOffsets(OffsetsInitializer.earliest)       .setValueOnlyDeserializer(new SimpleStringSchema)       .build()      // kafka sink     val properties = new Properties()     properties.setProperty("trans.timeout.ms", "7200000") // 2 hours     // KafkaSink 允许将记录流写入一个或多个 Kafka 主题。     val kafkaSink = KafkaSink.builder[String]       .setBootstrapServers("master:9092")       .setKafkaProducerConfig(properties)       .setRecordSerializer(KafkaRecordSerializationSchema.builder[String] //.builder()         .setTopicSelector(new TopicSelector[String] {           override def apply(t: String): String = {  //t与f都是data数据           // println(t)             if (t.contains("order_master_id")) "fact_order_master"             else if (t.contains("order_detail_id")) "fact_order_detail"             else null           }         })         .setValueSerializationSchema(new SimpleStringSchema)         .build()).build()     val f=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")       .filter(line => line.contains("order_master") || line.contains("order_detail"))       .map(line => {         import com.google.gson.JsonParser         val jsonobj = new JsonParser().parse(line).getAsJsonObject        // jsonobj.getAsJsonObject("data").toString         jsonobj.get("data").toString       }) //取出包含order_master_id、order_detail_id每行data数据,即f有这两个表数据       f.sinkTo(kafkaSink) //发至不同主题       env.execute("Task1")   } }

三、重难点分析

1.主题是以行为内容的json格式,如下

例:{

     "username":"YWRtaW4=","password":"bGlhblNoaTIwMjA="

}

以下代码取出json指定字段的值

val jsonStr = "{\"username\":\"YWRtaW4=\",\"password\":\"bGlhblNoaTIwMjA=\"}";  val jsonobj = new JsonParser().parse(jsonStr).getAsJsonObject println(jsonobj) val a=jsonobj.get("password") //取字段的值 println(a)

2. 取出每行data值代码

(2)val f = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks[String], "kafka source")   .filter(line => line.contains("order_detail") || line.contains("order_master")) 只取这两种行    .map(x => {    import com.google.gson.JsonParser    val jsonobj = new JsonParser().parse(x).getAsJsonObject    jsonobj.get("data")  //取每行中data值  })

3.熟练使用kafka以下命令

kafka-topics.sh --list --bootstrap-server  master:9092   列主题

kafka-topics.sh --delete --bootstrap-server master:9092  --topic test  删除主题

kafka-topics .sh -- create  --bootstrap-server  master:9092   --topic topic

kafka-console-producer.sh --broker-list master:9092 --topic testc  生产消息

kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test

精彩链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。