1. 消费者总体工作流程

1)不同消费者组之间的消费者互相独立,可以消费相同的分区或者多个不同的分区;同一个消费者组内的消费者只能消费互不相同的分区。

2)使用offset记录消费者消费到哪儿了,保存在系统主题(__consumer_offsets)中,持久化到硬盘中。

2. 消费者组

Consumer Group(CG):消费者组,有多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。消费者组内的消费者负责消费不同的分区,一个分区只能由组内一个消费者消费。消费者组之间的消费者互不影响,所有的消费者必须有groupid,都属于某个消费者组。 消费者组是逻辑上的一个订阅者。

在消费者组初始化的过程中,coordinator辅助实现初始化和分区分配的过程。每个broker都有一个coordinator,到底选择哪一个coordinator辅助实现消费者组的初始化和分区分配呢?coordinator的选择 = groupid的哈希值 % 50,50是__consumer_offsets的分区数量, 即groupid的哈希值 % 50对应__consumer_offsets分区在哪个broker上,就使用哪个broker的coordinator。未来消费者组提交的offset也存储在__consumer_offsets的这个分区中。

具体的初始化流程如下:每个消费者都向这个coordinator发送JoinGroup请求,然后coordinator会选出一个consumer作为Leader,将需要消费的topic信息发送给Leader消费者,Leader消费者会制定一个消费方案(即各个消费者各自消费哪个分区)并发送给coordinator,coordinator将这个消费方案分发给各个消费者。

每个消费者都会和coordinator保持心跳通信(3s),如果超过一定时间(45s)未保持心跳通信,则会将消费者从组内移出,并触发再平衡策略;消费者处理数据时间过长(超过5min),也会触发再平衡。

消费者组消费数据详细流程:创建消费者网络客户端(ConsumerNetworkClient),用于和Kafka集群进行交互,里面有这些配置参数:fetch.min.byte,每批次最小抓取字节数,默认1字节;fetch.max.wait.ms, 一批数据最小值未到达超时时间,默认500ms;fetch.max.byte,每批次最大抓取字节数,默认50M。然后发送send方法发送请求拉取数据,通过回调方法onSuccess将数据保存在队列中,消费者默认从队列中一次拉取500条(max.poll.records)数据,拉取的数据还需要经过反序列化(parseRecord)和拦截器(Interceptors)。

3. 消费者API

 单个消费者消费所有分区数据(即一个消费者组里只有一个消费者):

// 配置

Properties properties = new Properties();

properties.put(ConsumerConfig.BOOTSTRAP_SERVER_CONFIG, "hadoop102:9092,hadoop103:9092");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者

KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties)

// 订阅主题

List topics = new ArryaList<>();

topics.add("first")

kafkaConsumer.subscribe(topics);

// 消费数据

while (true) {

ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecords consumerRecord : consumerRecords) {

System.out.println(consumerRecord);

}

}

消费特定分区数据:

// 配置

Properties properties = new Properties();

properties.put(ConsumerConfig.BOOTSTRAP_SERVER_CONFIG, "hadoop102:9092,hadoop103:9092");

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者

KafkaConsumer kafkaConsumer = new KafkaConsumer<>(properties)

// 订阅主题的分区

List topicPartitions = new ArryaList<>();

topicPartitions.add(new TopicPartition("first", 0));

kafkaConsumer.assign(topicPartitions);

// 消费数据

while (true) {

ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecords consumerRecord : consumerRecords) {

System.out.println(consumerRecord);

}

}

4. 分区的分配以及再平衡

四种分区分配策略:Range、RoundRobin、Sticky。CooperativeSticky。通过参数parititon.assignment.strategy可以修改分区分配策略。默认是使用Range+CooperativeSticky。

Range:对同一个topic里的分区按照序号排序,也会对消费者进行排序,通过partitions/consumers决定每个消费者消费几个分区,如果除不尽,则前面几个消费者会多消费1个分区。

Range分配策略会产生数据倾斜,因为前面几个消费者会多消费1个分区,这只是对于1个topic而言,如果topic多了,则前面几个消费者会明显多消费很多分区。

如果将消费者0干掉,并且在触发再平衡(重新给消费者分配消费的分区)前,那么消费者0本来该消费的所有任务就会全部交给某个消费者进行消费。触发再平衡之后,消费者1会消费0、1、2、3四个分区,消费者2会消费4、5、6三个分区。

RoundRobin:针对所有topic而言,将所有的partition和consumer都列出来,按照hashcode进行排序,通过轮询将partition分给消费者。

修改分区分配策略:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

如果将消费者0干掉,并且在触发再平衡(重新给消费者分配消费的分区)前,那么消费者0本来该消费的所有任务会通过轮询依次交给其他消费者进行消费。触发再平衡之后,消费者1会消费0、2、4、6四个分区,消费者2会消费1、3、5三个分区。 

Sticky:尽量均匀且随机分配分区给消费者。与Range策略的区别就是随机分配。

修改分区分配策略:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");

如果将其中某个消费者干掉,并且在触发再平衡(重新给消费者分配消费的分区)前,那么本来该消费的任务会随机分给其他消费者进行消费。

5. offset

消费者将消费的位置/偏移offset保存在系统主题中:__consumer_offsets,__consumer_offsets主题采用key-value的形式进行存储,key是groupid+topic+分区号,value就是offset值。每隔一段时间,Kafka会对这个topic进行compact,也就是每个groupid+topic+分区号保留最新数据。

要想查看系统主题中的offset值,首先需要在config/consumer.properties中添加配置:exclude.internal.topics=false。查看系统主题__consumer_offsets:

bin/kafka-console-consumer.sh --topic ​__consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

Kafka默认自动提交offset,相关参数:enable.auto.commit,是否开启自动提交,默认为true;auto.commit.interval.ms,自动提交的时间间隔,默认为5s。相关配置代码: 

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

手动提交offset:每次消费完数据之后由消费者自己提交offset信息,分为同步提交(阻塞当前线程,提交offset成功后才消费下一波数据,并且会有失败重试)和异步提交(发送了提交请求之后不管成功,直接消费下一波数据)。相关配置代码: 

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交代码:

// 消费数据

while (true) {

ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecords consumerRecord : consumerRecords) {

System.out.println(consumerRecord);

}

kafkaConsumer.commitSync(); // 同步提交

kafkaConsumer.commitAsync(); // 异步提交

}

当Kafka中没有初始偏移量(消费者组第一次消费)或者不存在当前偏移量时,该怎么办。Kafka提供了三中策略,分别对应三种参数配置auto.offset.reset:

1)earliest:自动将偏移量置为最早的偏移量,--from-beginning

2)latest(默认值): 自动将偏移量置为最新的偏移量

3)none:抛异常

如果想从指定offset开始消费,使用seek方法:

// 拿到分区信息

Set assignment = kafkaConsumer.assignment();

for (TopicPartition topicPartition : assignment) {

// 指定offset

kafkaConsumer.seek(topicPartition, 100);

}

上述代码可能收不到数据,因为消费者组的初始化是需要时间的,获得的assignment可能为空,为保证分区分配方案是初始化完成的,加上如下代码:

while (assignment.size() == 0) {

kafkaConsumer.poll(Duration.ofSeconds(1));

assignment = kafkaConsumer.assignment();

}

 6. 按照指定时间消费

上面实现了消费指定offset的数据,如果需要消费指定时间之后的数据(比如需要从一天前开始消费),那么需要想办法将时间转化为offset。

// 拿到分区信息

Set assignment = kafkaConsumer.assignment();

Map topicPartitionMap = new HashMap<>();

for (TopicPartition topicPartition : assignment) {

topicPartitionMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600);

}

Map topicPartitionOffsetMap = kafkaConsumer.offsetsForTimes(topicPartitionMap);

for (TopicPartition topicPartition : assignment) {

OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetMap.get(topicPartition);

// 指定offset

kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());

}

7. 漏消费和重复消费

自动提交offset场景下,如果提交offset后,消费者又继续消费了后面的数据,然后挂了,此时下一个offset还未自动提交,那么消费者恢复后,会从原来的offset的位置开始消费,于是出现了重复消费的问题。 

手动提交offset场景下,已经提交offset但是数据还在内存中并未完成落盘,此时消费者挂了,于是offset已经提交,数据并未真正处理完,出现了漏消费的问题。

要解决漏消费和重复消费的问题,即实现消费者的精确一次性消费,那么必须采用事务,即Kafka消费者将提交offset和消费数据这两个过程做原子绑定。另外,Kafka的下游消费者(比如Mysql)也必须支持事务。

8. 数据积压的解决方法

参考阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。