一、问题描述

在使用使用kafka-consumer-group.sh查看消息消费情况,消息都已经消费完了,但是CONSUMER-ID,HOST,CLIENT-ID字段的信息不显示,而且,消费实例也在运行中,却出现了Consumer group 'manage.group1' has no active members.,如下图所示:消费者的代码如下: public class OffsetConsumer {

public static void main(String[] args) {

//设置配置信息

Map config = new HashMap<>();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1");

// 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式

config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer consumer = new KafkaConsumer<>(config);

// 手动设置消费者消费的主题分区为2

consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0),

new TopicPartition("tp_demo_01", 1),

new TopicPartition("tp_demo_01", 2)));

while (true) {

ConsumerRecords records = consumer.poll(Duration.ofSeconds(3));

records.forEach(new Consumer>() {

@Override

public void accept(ConsumerRecord record) {

System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition());

}

});

}

}

}

 二、问题分析

之所以出现上面的的问题,是因为使用了消费组的手动分区,也就是consumer.assign()方式,如果使用了手动分区,则分区的自动管理方式不会再起作用,而且如果消费组成员变更或主题的元数据等信息改变,将不会触发再平衡机制。

三、解决办法

使用kafka的消息订阅方式,即consumer.subscribe()方法,分区的分配等方式,让kafka集群自己去管理,不再人为干预。修改源码如下: public class OffsetConsumer {

public static void main(String[] args) {

//设置配置信息

Map config = new HashMap<>();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1");

// 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式

config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer consumer = new KafkaConsumer<>(config);

//订阅主题

consumer.subscribe(Arrays.asList("tp_demo_01"));

while (true) {

ConsumerRecords records = consumer.poll(Duration.ofSeconds(3));

records.forEach(new Consumer>() {

@Override

public void accept(ConsumerRecord record) {

System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition());

}

});

}

}

}

 四、结果显示

 

参考链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。