环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考: Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

在项目中创建文件夹(博主的是Kafkademo)打开终端,输入go mod init,进行go.mod文件的初始化: 我们在.mod文件内指定第三方包及其版本:

module Kafkademo

require (

github.com/Shopify/sarama v1.19

)

go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main

import (

"fmt"

"github.com/Shopify/sarama"

)

func main() {

config := sarama.NewConfig()

config.Producer.RequiredAcks = sarama.WaitForAll

}

这时候再打开终端输入go mod tidy 等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)

代码

package main

import (

"fmt"

"github.com/Shopify/sarama"

)

func main() {

config := sarama.NewConfig() //创建config实例

config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认

config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区

config.Producer.Return.Successes = true //成功交付的消息将在success channel返回

//创建信息

msg := &sarama.ProducerMessage{}

msg.Topic = "web.log"

msg.Value = sarama.StringEncoder("this is a test log")

//连接KafKa

client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)

if err != nil {

fmt.Println("producer closed, err:", err)

return

}

defer client.Close()

//发送消息

pid, offset, err := client.SendMessage(msg)

if err != nil {

fmt.Println("send msg failed,err:", err)

return

}

fmt.Printf("pid:%v offset:%v\n", pid, offset)

}

运行过程

首先我们打开终端开起ZooKepper服务zkServer

然后再Kafka所在文件夹下输入命令运行Kafka:

.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费

代码

package main

import (

"fmt"

"github.com/Shopify/sarama"

"time"

)

func main() {

customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)

if err != nil {

fmt.Println("failed init customer,err:", err)

return

}

partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区

if err != nil {

fmt.Println("failed get partition list,err:", err)

return

}

fmt.Println("partitions:", partitionlist)

for partition := range partitionlist { // 遍历所有分区

//根据消费者对象创建一个分区对象

pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)

if err != nil {

fmt.Println("failed get partition consumer,err:", err)

return

}

defer pc.Close() // 移动到这里

go func(consumer sarama.PartitionConsumer) {

defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了

for msg := range pc.Messages() {

fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)

}

}(pc)

time.Sleep(time.Second * 10)

}

}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

推荐阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。