日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java客户端作为kafka消费者测试

發(fā)布時(shí)間:2023/12/3 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java客户端作为kafka消费者测试 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

【README】

本文主要對(duì) java客戶端作為kafka 消費(fèi)者進(jìn)行測(cè)試, 生產(chǎn)者由 kafka客戶端扮演;?

?

【1】普通消費(fèi)者

設(shè)置消費(fèi)者組;

重置消費(fèi)者的offset, 即每次都從最頭開(kāi)始消費(fèi)(默認(rèn)僅保持7天內(nèi)數(shù)據(jù)) ;

類似于 命令行 --from-beginning

kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning

小結(jié):從頭開(kāi)始消費(fèi),必須滿足2個(gè)條件;

條件1: 必須重新?lián)Q組, 如本文中的消費(fèi)者組 從 sichuan 更新為 sichuan1 ; 條件2: 需要設(shè)置offset, 修改為 earliest, 默認(rèn)值是 lastest;

/*** 普通消費(fèi)者*/ public class MyConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2開(kāi)啟自動(dòng)提交 */props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);/*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] " + rd.key() + "--" + rd.value()); }} /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }

?從官網(wǎng)可以找到以上配置值;?https://kafka.apache.org/0110/documentation.html#configuration

?

【2】kafka消費(fèi)者-手動(dòng)提交offset?

手動(dòng)提交offset有3種方式:

  • 方式1:同步手動(dòng)提交;
  • 方式2:異步手動(dòng)提交;?
  • 方式3:自定義手動(dòng)提交策略;

0)為啥需要手動(dòng)提交?

kafka自動(dòng)提交是在kafka拉取到數(shù)據(jù)之后就直接提交,這樣很容易丟失數(shù)據(jù),尤其是在需要事物控制的時(shí)候。 很多情況下我們需要從kafka成功拉取數(shù)據(jù)之后,對(duì)數(shù)據(jù)進(jìn)行相應(yīng)的處理之后再進(jìn)行提交。如拉取數(shù)據(jù)之后進(jìn)行寫入mysql這種 , 所以這時(shí)我們就需要進(jìn)行手動(dòng)提交kafka的offset下標(biāo)。這里順便說(shuō)下offset具體是什么。 offset:指的是kafka的topic中的每個(gè)消費(fèi)組消費(fèi)的下標(biāo)。 簡(jiǎn)單的來(lái)說(shuō)就是一條消息對(duì)應(yīng)一個(gè)offset下標(biāo),每次消費(fèi)數(shù)據(jù)的時(shí)候如果提交offset,那么下次消費(fèi)就會(huì)從提交的offset加一那里開(kāi)始消費(fèi)。 比如一個(gè)topic中有100條數(shù)據(jù),我消費(fèi)了50條并且提交了,那么此時(shí)的kafka服務(wù)端記錄提交的offset就是49(offset從0開(kāi)始),那么下次消費(fèi)的時(shí)候offset就從50開(kāi)始消費(fèi)。

1)關(guān)閉自動(dòng)提交(默認(rèn)為true)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

第一次啟動(dòng) consumer 從 90 開(kāi)始消費(fèi);
第2次啟動(dòng)相同 consumer ,還是從90開(kāi)始消費(fèi);

2) 如何使用手動(dòng)提交?

kafka提供了手動(dòng)提交offset的api; 方法1:commitSync 同步提交: ; 方法2:commitAsync 異步提交; 兩者相同點(diǎn):都會(huì)將本次 poll 的一批數(shù)據(jù)最高的偏移量提交; 不同點(diǎn)是, commitSync 阻塞當(dāng)前線程,一直到提交成功, 并且會(huì)自動(dòng)失敗重試; 而 commitAsync 沒(méi)有失敗重試機(jī)制, 可能提交失敗;

3)同步手動(dòng)提交offset

/*** 手動(dòng)同步提交offset */ public class ManSyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關(guān)閉自動(dòng)提交(默認(rèn)為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,當(dāng)前線程會(huì)阻塞直到 offset提交成功 */ consumer.commitSync();} /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }

4)異步手動(dòng)提交offset?

/*** 異步手動(dòng)提交offset */ public class ManASyncCommitOffsetConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關(guān)閉自動(dòng)提交(默認(rèn)為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"));/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【異步提交】 當(dāng)前線程會(huì)阻塞直到 offset提交成功 */ consumer.commitAsync(new OffsetCommitCallback() {@Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception !=null) {System.out.println("異步提交失敗");} else {System.out.println("異步提交成功"); }}}); } /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }

5)自定義手動(dòng)提交offset策略

5.0)為啥需要自定義?

因?yàn)楫惒教峤挥幸恍﹩?wèn)題,如下: 先消費(fèi)數(shù)據(jù),后提交offset, 可能導(dǎo)致數(shù)據(jù)重復(fù)消費(fèi); 先提交offset, 后走業(yè)務(wù)邏輯,可能會(huì)丟數(shù)據(jù);

5.1)應(yīng)用場(chǎng)景:

把 offset 存儲(chǔ)到本地庫(kù) 和 消息消費(fèi)邏輯 在同一個(gè)數(shù)據(jù)庫(kù)事務(wù)里面;

5.2)如何實(shí)現(xiàn)?需要實(shí)現(xiàn) ConsumerRebalanceListener 來(lái)實(shí)現(xiàn)。

/*** 自定義手動(dòng)提交offset策略 */ public class DiyCommitOffsetConsumer {public static void main(String[] args) {/* 1.創(chuàng)建消費(fèi)者配置信息 */Properties props = new Properties();/*2.給配置信息賦值*//*2.1連接的集群*/props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");/*2.2 關(guān)閉自動(dòng)提交(默認(rèn)為true) */ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); /*2.3 自動(dòng)提交的延時(shí)*/props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");/*2.4 key value的反序列化 */props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());/*2.5 消費(fèi)者組 */props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); /*2.6 重置消費(fèi)者的offset */ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默認(rèn)值是 lastest /* 創(chuàng)建消費(fèi)者 */KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); /* 訂閱主題 */consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】調(diào)用}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】調(diào)用 /* 分區(qū)分配方法 */for (TopicPartition partition : partitions) { /*定位到某個(gè) offset*/consumer.seek(partition, 1); // TODO: 這里需要輸入1 }} });/* 循環(huán)拉取 */ while(true) {/* 消費(fèi)消息-獲取數(shù)據(jù) */ConsumerRecords<String, String> consumerRds = consumer.poll(100);/* 解析并打印 ConsumerRecords *//* 遍歷 ConsumerRecords*/for(ConsumerRecord<String, String> rd : consumerRds) {System.out.println("[消費(fèi)者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); }/* 【同步提交】,當(dāng)前線程會(huì)阻塞直到 offset提交成功 */ consumer.commitSync();} /* 關(guān)閉消費(fèi)者 */ // consumer.close(); } }

補(bǔ)充: 消費(fèi)者rebalance 是什么?

消費(fèi)者 rebalance, 什么時(shí)候觸發(fā) rebalance? 如 同一個(gè)消費(fèi)者組下的 某個(gè)消費(fèi)者機(jī)器宕機(jī),或新增一個(gè)消費(fèi)者機(jī)器,都會(huì)觸發(fā) rebalance,即重新分配 kafka分區(qū)數(shù)據(jù)與 消費(fèi)者的對(duì)應(yīng)關(guān)系;

?

?

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)

總結(jié)

以上是生活随笔為你收集整理的java客户端作为kafka消费者测试的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。