一、问题描述
在使用使用kafka-consumer-group.sh查看消息消费情况,消息都已经消费完了,但是CONSUMER-ID,HOST,CLIENT-ID字段的信息不显示,而且,消费实例也在运行中,却出现了Consumer group 'manage.group1' has no active members.,如下图所示:消费者的代码如下: public class OffsetConsumer {
public static void main(String[] args) {
//设置配置信息
Map
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1");
// 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer
// 手动设置消费者消费的主题分区为2
consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0),
new TopicPartition("tp_demo_01", 1),
new TopicPartition("tp_demo_01", 2)));
while (true) {
ConsumerRecords
records.forEach(new Consumer
@Override
public void accept(ConsumerRecord
System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition());
}
});
}
}
}
二、问题分析
之所以出现上面的的问题,是因为使用了消费组的手动分区,也就是consumer.assign()方式,如果使用了手动分区,则分区的自动管理方式不会再起作用,而且如果消费组成员变更或主题的元数据等信息改变,将不会触发再平衡机制。
三、解决办法
使用kafka的消息订阅方式,即consumer.subscribe()方法,分区的分配等方式,让kafka集群自己去管理,不再人为干预。修改源码如下: public class OffsetConsumer {
public static void main(String[] args) {
//设置配置信息
Map
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "manage.group1");
// 设置kafka中没有初始偏移量,或初始偏移量在kafka中不存在的处理方式
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer
//订阅主题
consumer.subscribe(Arrays.asList("tp_demo_01"));
while (true) {
ConsumerRecords
records.forEach(new Consumer
@Override
public void accept(ConsumerRecord
System.out.println(record.key() + "\t" + record.value() + "\t" + record.partition());
}
});
}
}
}
四、结果显示
参考链接
发表评论