学习目录

一、基本概念1.什么是SparkStreaming2.快速入门3.DStream 创建(1)RDD队列的方式(2)自定义数据源的方式(3)Kafka数据源的方式

一、基本概念

1.什么是SparkStreaming

定义:

Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

SparkStreaming 准实时(延迟时间:秒,分钟为单位),微批次(设置一段时间进行处理数据 如:3s、5s等)的数据处理框架。

和Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装

Discretized Stream:

Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。

SparkStreaming特点:

易用容错易整合到SparkStreaming中

2.快速入门

案例说明:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并 统计不同单词出现的次数

netcat 工具解压后进入该文件夹,输入cmd,在cmd中输入:nc -lp 9999

环境配置

org.apache.spark

spark-streaming_2.12

3.0.0

代码

package com.bigdata.SparkStreaming

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

import org.apache.spark.streaming.{Seconds, StreamingContext}

/**

* @author wangbo

* @version 1.0

*/

object SparkStreaming_01_wordcount {

def main(args: Array[String]): Unit = {

//TODO 创建环境对象

//StreamingContext创建时,需要传递两个参数

//第一个参数表示环境配置

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")

//第二个参数表示批量处理的周期(采集周期)

val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位

//TODO 逻辑处理

//获取端口数据

//注意这里需要到 F:\尚硅谷-大数据\spark3.0\2.资料\netcat-win32-1.12 输入 cmd ,然后在cmd中输入 nc -lp 9999

val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)

val wordToCount: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordToCount.print()

/*

由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭

如果main方法执行完毕,应用程序也会自动结束,所以不能让mian执行完毕

*/

//启动采集器

ssc.start()

//等待采集器的关闭

ssc.awaitTermination()

}

}

3.DStream 创建

DStream的创建有很多,主要有以下三种方式:

RDD队列的方式自定义数据源Kafka 数据源(面试、开发重点)

(1)RDD队列的方式

用法及其说明:可以通过使用 ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理

package com.bigdata.SparkStreaming

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**

* @author wangbo

* @version 1.0

*/

object SparkStreaming_02_Queue {

def main(args: Array[String]): Unit = {

//TODO 创建环境对象

//StreamingContext创建时,需要传递两个参数

//第一个参数表示环境配置

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")

//第二个参数表示批量处理的周期(采集周期)

val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位

//3.创建 RDD 队列

val rddQueue = new mutable.Queue[RDD[Int]]()

//4.创建 QueueInputDStream

val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)

//5.处理队列中的 RDD 数据

val mappedStream = inputStream.map((_,1))

val reducedStream = mappedStream.reduceByKey(_ + _)

//6.打印结果

reducedStream.print()

//7.启动采集器

ssc.start()

//8.循环创建并向 RDD 队列中放入 RDD

for (i <- 1 to 5) {

rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)

Thread.sleep(2000)

}

//8.等待采集器的关闭

ssc.awaitTermination()

}

}

(2)自定义数据源的方式

用法及其说明:需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

需求:自定义数据源,实现监控某个端口号,获取该端口号内容

package com.bigdata.SparkStreaming

import org.apache.spark.SparkConf

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.dstream.ReceiverInputDStream

import org.apache.spark.streaming.receiver.Receiver

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

import scala.util.Random

/**

* @author wangbo

* @version 1.0

*/

/**

* 自定义数据源的方式创建DStream

*/

object SparkStreaming_02_DIY {

def main(args: Array[String]): Unit = {

//TODO 创建环境对象

//StreamingContext创建时,需要传递两个参数

//第一个参数表示环境配置

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")

//第二个参数表示批量处理的周期(采集周期)

val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位

val messageDStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)

messageDStream.print()

//7.启动采集器

ssc.start()

//8.等待采集器的关闭

ssc.awaitTermination()

}

/*

自定义数据采集器

1. 继承Receiver,定义泛型,传递参数

2. 重写方法

*/

class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){

private var flag = true

override def onStart(): Unit = {

new Thread(new Runnable { //随机生成数

override def run(): Unit = {

while (true){

val message: String = "采集的数据为:" + new Random().nextInt(10).toString

store(message)

Thread.sleep(500)

}

}

})

}

override def onStop(): Unit = {

flag = false

}

}

}

(3)Kafka数据源的方式

版本:DirectAPI 是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台,Kafka 0-10 Direct 模式

注意:zookeeper和kafka要启动

添加依赖

org.apache.spark

spark-streaming-kafka-0-10_2.12

3.0.0

com.fasterxml.jackson.core

jackson-core

2.10.1

代码部分

package com.bigdata.SparkStreaming

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.Random

/**

* @author wangbo

* @version 1.0

*/

/**

* 自定义数据源的方式创建DStream

*/

object SparkStreaming_04_Kafka {

def main(args: Array[String]): Unit = {

//TODO 创建环境对象

//第一个参数表示环境配置

val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")

//第二个参数表示批量处理的周期(采集周期)

val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位

//定义kafka参数

val kafkaPara: Map[String, Object] = Map[String, Object](

ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->

"hadoop100:9092,hadoop102:9092,hadoop103:9092",

ConsumerConfig.GROUP_ID_CONFIG -> "kafka",

"key.deserializer" ->

"org.apache.kafka.common.serialization.StringDeserializer",

"value.deserializer" ->

"org.apache.kafka.common.serialization.StringDeserializer"

)

val kafkaData: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](

ssc,

LocationStrategies.PreferConsistent,

ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara) //Set("kafka")主题为kafka,kafkaPara为kafka的配置

)

kafkaData.map(_.value()).print()

//7.启动采集器

ssc.start()

//8.等待采集器的关闭

ssc.awaitTermination()

}

}

查看原文