java客户端作为kafka消费者测试
生活随笔
收集整理的這篇文章主要介紹了
java客户端作为kafka消费者测试
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
【README】
本文主要對 java客戶端作為kafka 消費者進行測試, 生產者由 kafka客戶端扮演;?
?
【1】普通消費者
設置消費者組;
重置消費者的offset, 即每次都從最頭開始消費(默認僅保持7天內數據) ;
類似于 命令行 --from-beginning
kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning
小結:從頭開始消費,必須滿足2個條件;
條件1: 必須重新換組, 如本文中的消費者組 從 sichuan 更新為 sichuan1 ; 條件2: 需要設置offset, 修改為 earliest, 默認值是 lastest; /*** 普通消費者*/ public class MyConsumer {public static void main(String[] args) {/* 1.創建消費者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2開啟自動提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自動提交的延時*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認值是 lastest /* 創建消費者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環拉取 */ while(true) {/* 消費消息-獲取數據 */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費者] " + rd.key() + "--" + rd.value()); }} /* 關閉消費者 */ // consumer.close(); } }?從官網可以找到以上配置值;?https://kafka.apache.org/0110/documentation.html#configuration
?
【2】kafka消費者-手動提交offset?
手動提交offset有3種方式:
- 方式1:同步手動提交;
- 方式2:異步手動提交;?
- 方式3:自定義手動提交策略;
0)為啥需要手動提交?
kafka自動提交是在kafka拉取到數據之后就直接提交,這樣很容易丟失數據,尤其是在需要事物控制的時候。 很多情況下我們需要從kafka成功拉取數據之后,對數據進行相應的處理之后再進行提交。如拉取數據之后進行寫入mysql這種 , 所以這時我們就需要進行手動提交kafka的offset下標。這里順便說下offset具體是什么。 offset:指的是kafka的topic中的每個消費組消費的下標。 簡單的來說就是一條消息對應一個offset下標,每次消費數據的時候如果提交offset,那么下次消費就會從提交的offset加一那里開始消費。 比如一個topic中有100條數據,我消費了50條并且提交了,那么此時的kafka服務端記錄提交的offset就是49(offset從0開始),那么下次消費的時候offset就從50開始消費。1)關閉自動提交(默認為true)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);第一次啟動 consumer 從 90 開始消費;
第2次啟動相同 consumer ,還是從90開始消費;
2) 如何使用手動提交?
kafka提供了手動提交offset的api; 方法1:commitSync 同步提交: ; 方法2:commitAsync 異步提交; 兩者相同點:都會將本次 poll 的一批數據最高的偏移量提交; 不同點是, commitSync 阻塞當前線程,一直到提交成功, 并且會自動失敗重試; 而 commitAsync 沒有失敗重試機制, 可能提交失敗;3)同步手動提交offset
/*** 手動同步提交offset */ public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.創建消費者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關閉自動提交(默認為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動提交的延時*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認值是 lastest /* 創建消費者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環拉取 */ while(true) {/* 消費消息-獲取數據 */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,當前線程會阻塞直到 offset提交成功 */ consumer.commitSync();} /* 關閉消費者 */ // consumer.close(); } }4)異步手動提交offset?
/*** 異步手動提交offset */ public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.創建消費者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關閉自動提交(默認為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動提交的延時*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認值是 lastest /* 創建消費者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環拉取 */ while(true) {/* 消費消息-獲取數據 */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【異步提交】 當前線程會阻塞直到 offset提交成功 */ consumer.commitAsync(new OffsetCommitCallback() {@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception !=null) {System.out.println("異步提交失敗");} else {System.out.println("異步提交成功"); }}}); } /* 關閉消費者 */ // consumer.close(); } }5)自定義手動提交offset策略
5.0)為啥需要自定義?
因為異步提交有一些問題,如下: 先消費數據,后提交offset, 可能導致數據重復消費; 先提交offset, 后走業務邏輯,可能會丟數據;5.1)應用場景:
把 offset 存儲到本地庫 和 消息消費邏輯 在同一個數據庫事務里面;
5.2)如何實現?需要實現 ConsumerRebalanceListener 來實現。
/*** 自定義手動提交offset策略 */ public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.創建消費者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關閉自動提交(默認為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動提交的延時*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認值是 lastest /* 創建消費者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】調用}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】調用 /* 分區分配方法 */for (TopicPartition partition : partitions) { /*定位到某個 offset*/consumer.seek(partition, 1); // TODO: 這里需要輸入1 }} });/* 循環拉取 */ while(true) {/* 消費消息-獲取數據 */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,當前線程會阻塞直到 offset提交成功 */ consumer.commitSync();} /* 關閉消費者 */ // consumer.close(); } }補充: 消費者rebalance 是什么?
消費者 rebalance, 什么時候觸發 rebalance? 如 同一個消費者組下的 某個消費者機器宕機,或新增一個消費者機器,都會觸發 rebalance,即重新分配 kafka分區數據與 消費者的對應關系;?
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的java客户端作为kafka消费者测试的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java客户端作为kafka生产者测试
- 下一篇: java生产者实现kafka拦截器