文章目录

前言一、实验平台二、实验内容一、Kafka与MySQL的组合使用1.实验要求2.在MySQL中操作3.安装Kafka4.使用Kafka5.在PyCharm中操作二、消费者手动提交1.实验要求2.在PyCharm中操作三、Kafka消费者订阅分区1.实验要求2.在终端操作3.在PyCharm中操作

三、实验小bug总结

前言

Kafka 是由 Apache 软件基金会开发的一个开源消息队列平台,它是一种高性能、可扩展、分布式的发布-订阅消息系统。Kafka 的架构被设计为高效、低延迟,并具有高吞吐量、持久性和可靠性。

在 Kafka 中,生产者将消息发布到主题(topic)中,消费者则从主题中消费消息,使用者可以将其看作一个 highly scalable 分布式 commit log 或者消息系统 (Messaging system),每个消息包含一个 key,一个 value 和一个额外的 timestamp。消息保留时间通过配置进行控制,当时间或空间满了的时候就根据策略来清除老数据,默认情况下老数据只保存 7 天。

特点: 1.高吞吐量:Kafka 在发布-订阅消息方面具有非常高的性能。它可以几乎实时地处理高速流入的大量数据。 实时处理:Kafka 能够处理高达数以百万计的消息,并准确地将消息排序和在群组内进行调度。 2.持久性和可靠性:与传统的消息系统不同,Kafka 具有持久性和可靠性。客户端自己提交当前偏移量,避免了可能出现的重复读取问题。 3.可扩展性:Kafka 可以在不繁琐的配置或修改信息格式等环节就能进行扩展。 4.多样化数据类型和来源:通过使用支持多种编程语言和操作系统的 API,Kafka 可以连接到许多各种来源的应用程序。

总之,Kafka 具有高性能、低时延,适合处理大规模物联网设备、日志、报警信息、传感器数据、消息等。

所以今天就来写一份关于熟悉Kafka的基本使用方法的实验,希望可以与小伙伴们一起探讨~~

一、实验平台

(1)操作系统:Windows7及以上(我用的是Windows 11) (2)Kafka版本:kafka_2.12-2.4.0 (3)MySQL版本:8.0

二、实验内容

一、Kafka与MySQL的组合使用

1.实验要求

假设有一个学生表student,如下表所示,编写Python程序完成如下操作。 (1)读取student表的数据内容,将其转换为JSON格式,发送给Kafka (2)从Kafka中获取JSON格式数据,打印出来

snosnamessexsage95001JohnM2395002TomM23

2.在MySQL中操作

(1)打开MySQL 方式一: 方式二:

可以通过 DOS 命令启动 MySQL 服务,windows+R,在搜索框中输入cmd,进去之后再输入services.msc,就进去服务系统里了,再启动就行 进去以后输入密码就可以开始执行mysql语句了 (2)创建数据库

create database school001;

(3)查看数据库

show databases;

发现数据库已经被创建完成 (4)使用该数据库

use school001;

(5)在该数据库中创建student表

create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));

(6)查询该数据库中的student表

show tables;

(7)向student表中插入值

insert into student values("95001","John","M",23);

insert into student values("95002","Tom","M",23);

(8)查询student表中的数据

select * from student;

查询结果: (到这里我们的student表就创建成功了!)

3.安装Kafka

简单介绍: Kafka 的运行需要 Java 环境的支持,因此,安装 Kafka 前需要在 Windows 操作系统中安装 JDK 访问 Kafka 官网,下载 Kafka 2.4.0的安装文件 kafka 2.12-2.4.0.1gz,解压缩到" C : \ "目录下(也可以放到D盘,不过最好放在D盘根目录下,不然后续代码容易报错,我试过) 因为 Katka 的运行依赖于 Zookeeper ,因此,还需要下载并安装 Zookeeper 。当然, Kafka 也内置了 Zookeeper 服务,因此,也可以不额外安装 Zookeeper ,直接使用内置的Zookeeper 服务。为简单起见,这里直接使用内置的Zookeeper 服务。

win+r—>输入cmd然后回车

输入命令pip install kafka-python安装python-kafka模块

查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)

具体怎么安装可参考:kafka安装部署

4.使用Kafka

在实验中要用到Kafka就要先启动它的Zookeeper服务和Kafka,且在实验过程中,千万不可以将其关闭,一旦关闭,服务就会停止 在 Windows 操作系统中 打开第1个 cmd 命令行窗口,启动 Zookeeper 服务:

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

注意,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Zookeeper 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Zookeeper 服务就会停止 如图: 打开第2个 cmd 命令行窗口,然后输入如下命令启动 Kafka 服务:

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)

.\bin\windows\kafka-server-start.bat .\config\server.Properties

同样地,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Kafka 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Kafka 服务就会停止 若执行上面的命令以后,如果启动失败,并且出现提示信息"此时不应有\QuickTime\QTSstem\QTJava.zip ",则需要把环境变量 CLASSPATH 的相关信息删除。具体方法是,

右键单击"计算机",再单击"属性"一"高级系统设置"一"环境变量",然后,找到变量 CLASSPATH ,把类似下面的信息删除: C : Program Files (x86) QuickTime\QTSystem QTJava . zip

然后重新启动计算机,让配置修改生效。重新启动计算机以后,再次按照上面的方法启动Zookeeper和Kafka

为了测试 Kafka ,这里创建一个主题,名称为" topic_test ",其包含一个分区,只有一个副本。在第3个 cmd 命令行窗口中执行如下命令:

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)

.\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -

factor 1-- partitions 1-- topic topic_test

可以继续执行如下命令,查看 topic _ test 是否创建成功:

.\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181

如果创建成功,就可以在执行结果中看到 topic _ test 继续在第3个 cmd 命令行窗口中执行如下命令,创建一个生产者来产生消息:

.\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test

该命令执行以后,屏幕上的光标会持续闪烁,这时,可以用键盘输入一些内容,例如: I love Kafka Kafka is good 新建第4个 cmd 命令行窗口,执行如下命令来消费消息:

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning

该命令执行以后,屏幕上显示刚才输入的语句" I love Kafka “和” Kafka is good "

5.在PyCharm中操作

创建一个.py文件,写入以下代码,用于实现读取student表的数据内容,将其转换为JSON格式,发送给Kafka的功能

# 运行前先在win上启动zookeap和kafka

# 导入相关模块

from kafka import KafkaProducer

import json

# 连接kafka json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码

producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开

data = {

'sno': '95001',

'sname': 'John',

'ssex': 'M',

'sage': 23

}

# 发送数据

producer.send('test001', data)

# 关闭资源

producer.close()

运行结果如下图所示:

创建一个.py文件,写入以下代码,用于实现从Kafka中获取JSON格式数据,打印出来的功能

# 运行前先在win上启动mysql

# 导入消费模块

import json

# 导入kafka的消费模块

from kafka import KafkaConsumer

import json

import pymysql.cursors

# 连接kafka

consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')

# 对获取的数据进行解析

for msg in consumer:

# 转换为字符串类型

msg1 = str(msg.value, encoding=('utf-8'))

# 将字符串的数据加载为字典

dict = json.loads(msg1)

# 连接数据库

connect = pymysql.Connect(

host='localhost',

port=3306,

user='root',

passwd='xxxxxxxx',#这是你MySQL数据库的密码

db='school001',

charset='utf8'

)

# 获取操作数抠库的对象<游标>

cursor = connect.cursor()

# 将数抠织存到mysqL(插入数掷)

# 定义sql语句

sql = "select * from student;"

# 将数据作为参数传速给sqL,保存到hrgsql

cursor.execute(sql)

# 提交

connect.commit()

for row in cursor.fetchall():

print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)

print("共查询出", cursor.rowcount, '条数据')

connect.close()

运行结果如下图所示:

二、消费者手动提交

1.实验要求

生成一个data.json文件,内容如下: data = [ {“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]}, {“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]}, ] 根据上面给出的data.json文件,执行如下操作。 (1)编写生产者程序,将JSON文件数据发送给Kafka。 (2)编写消费者程序,读取Kafka的JSON格式数据,并手动提交偏移量。

2.在PyCharm中操作

创建一个Test写入以下代码,来实现生成data.json文件的功能:

import json

data = [

{"name": "Tony", "age": 21, "hobbies": ["basketball", "tennis"]},

{"name": "Lisa", "age": 20, "hobbies": ["sing", "dance"]},

]

with open('../../data.json', 'w') as f:

json.dump(data, f)

创建一个.py文件,编写生产者程序,来实现将JSON文件数据发送给Kafka的功能

# 可以使用 Python 的 json 模块读取 data.json 文件,并将数据转换为字符串后发送给 Kafka

from kafka import KafkaProducer

import json

data = [

{

"name": "Tony",

"age": 21,

"hobbies": ["basketball", "tennis"]

},

{

"name": "Lisa",

"age": 20,

"hobbies": ["sing", "dance"]

}

]

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for item in data:

# 将数据转换为字符串格式并发送给 Kafka 主题 test

message = json.dumps(item).encode('utf-8')

producer.send('test', value=message)

producer.close()

运行结果如下图所示:

创建一个.py文件,编写消费者程序,来实现读取Kafka的JSON格式数据,并手动提交偏移量的功能

# 我们可以使用 Kafka 消费者 API 进行数据消费,并在处理完每个消息后手动提交偏移量。

from kafka import KafkaConsumer

import json

# 配置 Kafka 消费者,指定主题和分组等信息

consumer = KafkaConsumer(

'test',

bootstrap_servers=['localhost:9092'],

auto_offset_reset='earliest',

enable_auto_commit=False, # 禁止自动提交偏移量

group_id='my-group')

# 循环消费 Kafka 消息

for message in consumer:

# 将传入的二进制消息内容解码为 JSON 格式的字符串

item = json.loads(message.value.decode('utf-8'))

print(item)

# 手动提交偏移量,确保下次消费时从正确的位置开始

consumer.commit()

运行结果如下图所示:

三、Kafka消费者订阅分区

1.实验要求

在命令行窗口中启动Kafka后,手动创建主题 “assign_topic” ,分区数量为2。具体命令如下:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

根据上面给出的主题,完成如下操作。 (1)编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic” 。 (2)编写消费者程序1,订阅主题的分区0,只消费分区0数据。 (3)编写消费者程序2,订阅主题的分区1,只消费分区1数据。

2.在终端操作

首先要完成主题以及分区的创建才能编写程序,不然程序会报错 步骤:

使用windows+r,在弹窗中输入cmd打开终端在终端中输入命令,创建主题和分区:

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

结果如下图(这是我之前已经创建好的结果图):

3.在PyCharm中操作

创建一个.py文件,写入以下代码,用于实现编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic的功能:

from kafka import KafkaProducer

import uuid

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for i in range(5):

message = str(uuid.uuid4()).encode('utf-8')

producer.send('assign_topic', value=message)

producer.close()

运行结果如下图所示:

创建一个.py文件,写入以下代码,用于实现订阅主题的分区0,只消费分区0数据的功能:

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(

bootstrap_servers=['localhost:9092'],

auto_offset_reset='earliest',

enable_auto_commit=False,

consumer_timeout_ms=1000

)

consumer.assign([TopicPartition('assign_topic', 0)])

for message in consumer:

print("Partition 0 - Message value: {}".format(message.value))

consumer.close()

运行结果如下图所示:

创建一个.py文件,写入以下代码,用于实现订阅主题的分区1,只消费分区1数据的功能:

from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(

bootstrap_servers=['localhost:9092'],

auto_offset_reset='earliest',

enable_auto_commit=False,

consumer_timeout_ms=1000

)

consumer.assign([TopicPartition('assign_topic', 1)])

for message in consumer:

print("Partition 1 - Message value: {}".format(message.value))

consumer.close()

运行结果如下图所示:

三、实验小bug

1. Kafka连接报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:? 答:是因为程序运行了多次的原因 把tmp文件和logs文件里面的东西都删掉,就可以解决了 2. 为什么消费者程序1中有东西输出而消费者程序2中什么却什么也没输出?

消费者程序1和消费者程序2是对同一个主题的两个消费者应用程序。可以针对以下情况进行分析。 在主题 assign_topic 中,Kafka有多个分区,可用于并行处理消息。在这里,被消费的消息都来自此主题的第一个分区(即分区 0)。 消费者程序1使用了 .subscribe() 方法来订阅主题,这将导致消费者加入到消费组中,然后通过负载均衡策略从所有分区接收消息。因此,消费者程序1输出打印了分区 0 中的消息。 消费者程序2使用了 .assign() 方法手动分配消费者处理的分区,而且只分配了主题 assign_topic 的第一个分区(即分区 0)。但是,由于该程序没有运行足够长的时间,并且没有消费到任何未提交的偏移量,所以当应用程序终止时不会向Kafka服务器发送任何提交请求,这就可能导致在下一次启动时重复消费确认过的消息。因此,在生产环境中,请务必根据具体情况定期地提交所消费的分区的偏移量。

总结

以上就是对Kafka的基本使用方法的实验啦,有不明白的地方可以留言哦,希望能共同进步~~

推荐链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。