柚子快报邀请码778899分享:kafka系列--消费
public String title; public ConsumerRecords
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroupname");
//默认自动提交
//properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG , "false"); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:port,ip:port"); /** * earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 * latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 * none 当该topic下所有分区中存在未提交的offset时,抛出异常。 */ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); /** * consumer向zookeeper提交offset的频率,单位是秒 */ properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); /** * RoundRobin策略有两个前提条件必须满足: * 同一个Consumer Group里面的所有消费者的num.streams必须相等; * 每个消费者订阅的主题必须相同 * * Range 均分 */ //properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); KafkaConsumer
kafkaConsumer.subscribe(Arrays.asList(topic));
//指定分区消费
//kafkaConsumer.assign(Arrays.asList(partition0, partition1)); boolean isRunning = true; //创建一个容量3的线程池 ExecutorService executor = Executors.newFixedThreadPool(3); int index=0; while(isRunning) { ++index; ConsumerRecords
柚子快报邀请码778899分享:kafka系列--消费
精彩文章
发表评论