kafak消费者从头开始消费(消费者组)
生活随笔
收集整理的這篇文章主要介紹了
kafak消费者从头开始消费(消费者组)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
【README】
本文主要用于描述 kafka 消費者如何從頭開始消費;
【1】從頭開始消費
1)從頭開始消費,需要滿足兩個條件, 如下:
- 條件1, 使用一個全新的消費者組id;
- 條件2,指定 auto.offset.reset 為 earliest ;
2)代碼如下:
public static void main(String[] args) {/* 1.創(chuàng)建消費者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2開啟自動提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自動提交的延時*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan2"); // group.id /*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("third", "second"));/* 循環(huán)拉取 */ int i =0;while(true) {if (i++ > 10) break; // 只消費10條數(shù)據(jù) /* 消費消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費者] " + rd.key() + "--" + rd.value()); }} /* 關(guān)閉消費者 */ consumer.close(); }總結(jié)
以上是生活随笔為你收集整理的kafak消费者从头开始消费(消费者组)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (转)Kafka 消费者 Java 实现
- 下一篇: 转:Kafka事务使用和编程示例/实例