一、安装Kafka

1.执行以下命令完成Kafka的安装:

cd ~ //默认压缩包放在根目录

sudo tar -zxf kafka_2.12-2.6.0.tgz -C /usr/local

cd /usr/local

sudo mv kafka_2.12-2.6.0 kafka-2.6.0

sudo chown -R qiangzi ./kafka-2.6.0

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:

cd /usr/local/kafka-2.6.0

./bin/zookeeper-server-start.sh config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端,输入下面命令启动Kafka服务:

cd /usr/local/kafka-2.6.0

./bin/kafka-server-start.sh config/server.properties

//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。

bin/kafka-server-start.sh config/server.properties &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:

cd /usr/local/kafka-2.6.0

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsender

2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:

cd /usr/local/kafka-2.6.0

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.5.1表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark-3.5.1/jars”目录)。此外,还需要把“/usr/local/kafka-2.6.0/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

cd ~ .jar文件默认放在根目录

sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/

sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/

sudo cp /usr/local/kafka-2.6.0/libs/kafka-clients-2.6.0.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序

(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:

cd /usr/local/spark-3.5.1

mkdir mycode

cd ./mycode

mkdir kafka

mkdir -p kafka/src/main/scala

vi kafka/src/main/scala/KafkaWordProducer.scala

(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka010._

object KafkaWordProducer {

def main(args: Array[String]) {

if (args.length < 4) {

System.err.println("Usage: KafkaWordProducer " +

" ")

System.exit(1)

}

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeeper connection properties

val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.StringSerializer")

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

"org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

// Send some messages

while(true) {

(1 to messagesPerSec.toInt).foreach { messageNum =>

val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).

toString)

.mkString(" ")

print(str)

println()

val message = new ProducerRecord[String, String](topic, null, str)

producer.send(message)

}

Thread.sleep(1000)

}

}

}

2.编写消费者(Consumer)程序

在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下创建文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode

vi kafka/src/main/scala/KafkaWordCount.scala

import org.apache.spark._

import org.apache.spark.SparkConf

import org.apache.spark.rdd.RDD

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka010._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka010.KafkaUtils

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaWordCount{

def main(args:Array[String]){

val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")

val sc = new SparkContext(sparkConf)

sc.setLogLevel("ERROR")

val ssc = new StreamingContext(sc,Seconds(10))

ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop

val kafkaParams = Map[String, Object](

"bootstrap.servers" -> "localhost:9092",

"key.deserializer" -> classOf[StringDeserializer],

"value.deserializer" -> classOf[StringDeserializer],

"group.id" -> "use_a_separate_group_id_for_each_stream",

"auto.offset.reset" -> "latest",

"enable.auto.commit" -> (true: java.lang.Boolean)

)

val topics = Array("wordsender")

val stream = KafkaUtils.createDirectStream[String, String](

ssc,

PreferConsistent,

Subscribe[String, String](topics, kafkaParams)

)

stream.foreachRDD(rdd => {

val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))

val lines = maped.map(_._2)

val words = lines.flatMap(_.split(" "))

val pair = words.map(x => (x,1))

val wordCounts = pair.reduceByKey(_+_)

wordCounts.foreach(println)

})

ssc.start

ssc.awaitTermination

}

}

3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。

cd ./kafka

mkdir checkpoint

4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:

cd /usr/local/spark-3.5.1/mycode/

vi kafka/src/main/scala/StreamingExamples.scala

/*StreamingExamples.scala*/

package org.apache.spark.examples.streaming

import org.apache.spark.internal.Logging

import org.apache.log4j.{Level, Logger} /** Utility functions for Spark Streaming examples. */

object StreamingExamples extends Logging {

/** Set reasonable logging levels for streaming if the user has not configured log4j. */

def setStreamingLogLevels() {

val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements

if (!log4jInitialized) {

// We first log something to initialize Spark's default logging, then we override the

// logging level.

logInfo("Setting log level to [WARN] for streaming example." +

" To override add a custom log4j.properties to the classpath.")

Logger.getRootLogger.setLevel(Level.WARN)

} } }

5.编译打包程序

现在在“/usr/local/spark-3.5.1/mycode/kafka/src/main/scala”目录下,就有了如下3个scala文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/

vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.18"

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"

libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"

libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd /usr/local/spark-3.5.1/mycode/kafka/

/usr/local/sbt-1.9.0/sbt/sbt package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint ("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd /usr/local/hadoop-2.10.1

./sbin/start-dfs.sh

或者

start-dfs.sh

start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。 要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。 然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd /usr/local/spark-3.5.1/mycode/kafka/

/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" ./target/scala-2.12/sime-project_2.12-1.0.jar localhost:9092 wordsender 3 5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。 执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd /usr/local/spark-3.5.1/mycode/kafka/

/usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordCount" ./target/scala-2.12/simple-oject_2.12-1.0.jar

运行上面命令以后,就启动了词频统计功能,屏幕上就会显示如下类似信息:

参考阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。