日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka消费者详解

發布時間:2025/3/20 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka消费者详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一.基本概念

1.消費者和消費組

Kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題時,每個消費者會收到不同分區的消息。假設有一個T1主題,該主題有4個分區;同時我們有一個消費組G1,這個消費組只有一個消費者C1。那么消費者C1將會收到這4個分區的消息,如下所示:

?

Kafka一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對于上面的例子,假如我們新增了一個新的消費組G2,而這個消費組有兩個消費者,那么會是這樣的:

?

二.消息接收

1.必要參數設置

/*** Kafka 消費者分析*/ @Slf4j public class KafkaConsumerAnalysis {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test";public static final String groupId = "group.yfy";public static final AtomicBoolean isRunning = new AtomicBoolean(true); ?public static Properties initConfig() {Properties props = new Properties();// 與KafkaProducer中設置保持一致props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 必填參數,該參數和KafkaProducer中的相同,制定連接Kafka集群所需的broker地址清單,可以設置一個或者多個props.put("bootstrap.servers", brokerList);// 消費者隸屬于的消費組,默認為空,如果設置為空,則會拋出異常,這個參數要設置成具有一定業務含義的名稱props.put("group.id", groupId);// 指定KafkaConsumer對應的客戶端ID,默認為空,如果不設置KafkaConsumer會自動生成一個非空字符串props.put("client.id", "consumer.client.id.demo"); ?return props;}public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic)); ?// 正則訂閱主題 // ? ? ? consumer.subscribe(Pattern.compile("test.*")); ?// 指定訂閱的分區 // ? ? ? consumer.assign(Arrays.asList(new TopicPartition("test", 0))); ?try {while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());System.out.println("key = " + record.key() + ", value = " + record.value());//do something to process record.}}} catch (Exception e) {log.error("occur exception ", e);} finally {consumer.close();}} }

1)這是 個無限循環。消費者實際上是一個長期運行的應用程序,它通過持續輪詢kafka請求數據。

2)消費者必須持續對 Kafka進行輪詢,否則會被認為己經死亡,分區會被移交給群組里的其他消費者。傳給poll() 方法的參數是一個超時時間,用于控制poll()方法的的阻塞時間。如果該參數被設為 0,poll ()會立即返回 ,否則它會在指定的毫秒數內一直等待 broker 返回數據。

3)poll ()方法能返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息、記錄分區的信息、記錄在分區里的偏移量以及記錄的鍵值對。我們一般會遍歷這個列表 ,逐條處理這些記錄。 poll()方法有一個超時參數,它指定了方法在多久之后可以返回,不管有沒有可用的數據都要返回。 超時時間的設置取決于應用程序對響應速度的要求,比如要在多長時間內把控制權歸還給執行輪詢的線程。4)在退出應用程序之前 close()方怯關閉消費者。網絡連接和 socket 也會隨之關閉,并立即觸發一次再均衡,而不是等待群組協調器發現它不再發送心跳井認定它已死亡,因為那樣需要更長的時間,導致整個群組在一段時間內無法讀取消息

2.訂閱主題和分區

創建完消費者后我們便可以訂閱主題了,只需要通過調用subscribe()方法即可,這個方法接收一個主題列表

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic));

另外,我們也可以使用正則表達式來匹配多個主題,而且訂閱之后如果又有匹配的新主題,那么這個消費組會立即對其進行消費。正則表達式在連接Kafka與其他系統時非常有用。比如訂閱所有的測試主題:

consumer.subscribe(Pattern.compile("test.*"));

指定訂閱的分區

consumer.assign(Arrays.asList(new TopicPartition("test", 0)));

3.反序列化

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

4.位移提交

對于Kafka中的分區而言,它的每條消息都有唯一的offset,用來表示消息在分區中的位置。

當我們調用poll()時,該方法會返回我們沒有消費的消息。當消息從broker返回消費者時,broker并不跟蹤這些消息是否被消費者接收到;Kafka讓消費者自身來管理消費的位移,并向消費者提供更新位移的接口,這種更新位移方式稱為提交(commit)。

自動提交

這種方式讓消費者來管理位移,應用本身不需要顯式操作。當我們將enable.auto.commit設置為true,那么消費者會在poll方法調用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一樣,自動提交也是由poll()方法來驅動的;在調用poll()時,消費者判斷是否到達提交時間,如果是則提交上一次poll返回的最大位移。

需要注意到,這種方式可能會導致消息重復消費。假如,某個消費者poll消息后,應用正在處理消息,在3秒后Kafka進行了重平衡,那么由于沒有更新位移導致重平衡后這部分消息重復消費。

同步提交

public class CheckOffsetAndCommit {public static final String brokerList = "192.168.37.129:9092";public static final String topic = "test";public static final String groupId = "group.yfy";private static AtomicBoolean running = new AtomicBoolean(true); ?public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ?// 手動提交開啟props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;} ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); ? ?TopicPartition tp = new TopicPartition(topic, 0);consumer.assign(Arrays.asList(tp));long lastConsumedOffset = -1;while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);if (records.isEmpty()) {break;}List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync();//同步提交消費位移}System.out.println("comsumed offset is " + lastConsumedOffset);OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);System.out.println("commited offset is " + offsetAndMetadata.offset());long posititon = consumer.position(tp);System.out.println("the offset of the next record is " + posititon);} }

異步提交

手動提交有一個缺點,那就是當發起提交調用時應用會阻塞。當然我們可以減少手動提交的頻率,但這個會增加消息重復的概率(和自動提交一樣)。另外一個解決辦法是,使用異步提交的API。

public class OffsetCommitAsyncCallback extends ConsumerClientConfig { ?private static AtomicBoolean running = new AtomicBoolean(true); ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic)); ?try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do some logical processing.}// 異步回調consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}} finally {consumer.close();} ?try {while (running.get()) {consumer.commitAsync();}} finally {try {consumer.commitSync();} finally {consumer.close();}}} }

但是異步提交也有個缺點,那就是如果服務器返回提交失敗,異步提交不會進行重試。相比較起來,同步提交會進行重試直到成功或者最后拋出異常給應用。異步提交沒有實現重試是因為,如果同時存在多個異步提交,進行重試可能會導致位移覆蓋。舉個例子,假如我們發起了一個異步提交commitA,此時的提交位移為2000,隨后又發起了一個異步提交commitB且位移為3000;commitA提交失敗但commitB提交成功,此時commitA進行重試并成功的話,會將實際上將已經提交的位移從3000回滾到2000,導致消息重復消費。

同步和異步組合提交

一般情況下,針對偶爾出現的提交失敗,我們可以使用同步和異步提交結合的方式提交。在消費者關閉前commitSync()提交一下。

? ? ? ?try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);consumer.commitAsync();}} finally {try {consumer.commitSync();} finally {consumer.close();}}
  • 如果一切正常,我們使用commitAsync()方法來提交。這樣速度更快,而且即使這次提交失敗,下一次提交很可能會成功。

  • 如果直接關閉消費者,就沒有所謂的“下一次提交”了。使用 commitSync()方法也會一直重試,直到提交成功或發生無法恢復的錯誤。

5.指定位移消費

到目前為止,我們知道消息的拉取是根據poll()方法中的邏輯來處理的,但是這個方法對于普通開發人員來說就是個黑盒處理,無法精確掌握其消費的起始位置。

seek()方法正好提供了這個功能,讓我們得以追蹤以前的消費或者回溯消費

public class SeekDemo extends ConsumerClientConfig { ? ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));// timeout參數設置多少合適?太短會使分區分配失敗,太長又有可能造成一些不必要的等待consumer.poll(Duration.ofMillis(2000));// 獲取消費者所分配到的分區Set<TopicPartition> assignment = consumer.assignment();System.out.println(assignment);for (TopicPartition tp : assignment) {// 參數partition表示分區,offset表示指定從分區的哪個位置開始消費consumer.seek(tp, 10);} // ? ? ? consumer.seek(new TopicPartition(topic,0),10);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}} ? }

增加判斷是否分配到了分區

public class SeekDemoAssignment extends ConsumerClientConfig { ? ?public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));long start = System.currentTimeMillis();Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, 10);}while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}} } ?

指定從分區末尾開始消費

? ? ? ?// 指定從分區末尾開始消費Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp));}

演示位移越界操作,修改代碼如下:

? ? ? ?// 指定從分區末尾開始消費Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp)+1);}

6.消費者攔截器

消費者攔截器主要是在消費到消息或者在提交消費位移時進行的一些定制化的操作。

public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String> {private static final long EXPIRE_INTERVAL = 10 * 1000; ?@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("before:" + records);long now = System.currentTimeMillis();Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords= new HashMap<>();for (TopicPartition tp : records.partitions()) {List<ConsumerRecord<String, String>> tpRecords = records.records(tp);List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();for (ConsumerRecord<String, String> record : tpRecords) {if (now - record.timestamp() < EXPIRE_INTERVAL) {newTpRecords.add(record);}}if (!newTpRecords.isEmpty()) {newRecords.put(tp, newTpRecords);}}return new ConsumerRecords<>(newRecords);} ?@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {offsets.forEach((tp, offset) ->System.out.println(tp + ":" + offset.offset()));} ?@Overridepublic void close() {} ?@Overridepublic void configure(Map<String, ?> configs) {} }

實現自定義攔截器之后,需要在KafkaConsumer中配置指定這個攔截器,如下

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,ConsumerInterceptorTTL.class.getName());

發送端同時發送兩條消息,其中一條修改timestamp的值來使其變得超時,如下:

? ? ? ?ProducerRecord<String, String> record = new ProducerRecord<>(topic, "Kafka-demo-001", "hello, Kafka!");ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, 0, System.currentTimeMillis() - 10 * 1000, "Kafka-demo-001", "hello, Kafka!->超時");

啟動消費端運行如下,只收到了未超時的消息:

before:org.apache.kafka.clients.consumer.ConsumerRecords@7adda9cc topic = test, partition = 1, offset = 18 key = Kafka-demo-001, value = hello, Kafka!

三.其它消費者參數

fetch.min.bytes

這個參數可以減少broker和消費者的壓力,因為減少了往返的時間。而對于有大量消費者的主題來說,則可以明顯減輕broker壓力。

2.fetch.max.wait.ms

上面的fetch.min.bytes參數指定了消費者讀取的最小數據量,而這個參數則指定了消費者讀取時最長等待時間,從而避免長時間阻塞。這個參數默認為500ms。

3.max.partition.fetch.bytes

這個參數指定了每個分區返回的最多字節數,默認為1M。也就是說,KafkaConsumer.poll()返回記錄列表時,每個分區的記錄字節數最多為1M。如果一個主題有20個分區,同時有5個消費者,那么每個消費者需要4M的空間來處理消息。實際情況中,我們需要設置更多的空間,這樣當存在消費者宕機時,其他消費者可以承擔更多的分區。

4.max.poll.records

這個參數控制一個poll()調用返回的記錄數,這個可以用來控制應用在拉取循環中的處理數據量。

總結

以上是生活随笔為你收集整理的Kafka消费者详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 特黄特色大片免费播放器使用方法 | 萌白酱在线观看 | 久久色资源网 | 亚洲高清无码久久久 | 日韩视频一区在线观看 | 久久久久夜夜夜精品国产 | 日本高清视频免费看 | 午夜亚洲一区 | 国偷自拍第113页 | 禁漫天堂黄漫画无遮挡观看 | 91久久久久久久久久久久久 | 二区影院| 中文字幕第22页 | 免费二区 | 韩国精品视频在线观看 | 天堂av2021 | 天堂网在线中文 | 亚洲精品天堂在线 | 国产精品久久久久久久久久辛辛 | 天天看a | 91区国产 | www.色欧美 | 日韩综合一区二区 | 天天插综合 | 国产成人无码精品久在线观看 | 蛇女欲潮性三级 | 欧美黄色性视频 | 影音先锋国产精品 | 伊人色在线 | 中国老太婆性做爰 | 久久成人18免费观看 | 国产一区二区91 | 蜜桃久久久aaaa成人网一区 | 深爱五月激情五月 | 一本久草 | av成人免费观看 | 欧美h视频在线观看 | 免费观看的av | av成人在线观看 | 国产精品无 | 亚洲激情午夜 | 放荡闺蜜高h苏桃情事h | 中文字幕av一区二区三区谷原希美 | 欧美日韩一二区 | 亚洲www视频 | 日韩成人av免费在线观看 | 少妇2做爰交换朴银狐 | 国产精品图片 | 国产免费叼嘿网站免费 | 国产精品1区2区3区4区 | 日韩一级片免费 | 青青青在线视频观看 | 色多多黄色 | 亚洲激情五月婷婷 | 国产手机精品视频 | 青青草在线观看视频 | 善良的女朋友在线观看 | 中文字幕日韩在线观看 | 亚洲欧美日韩中文字幕在线观看 | 久久96视频 | 精品人妻无码专区视频 | 欧美人与性囗牲恔配 | 久久亚洲免费 | 影音先锋亚洲成aⅴ人在 | 天天爽天天 | 日韩福利在线观看 | 国产每日更新 | 深夜国产福利 | 一级片在线免费观看 | 成人不卡av | 不卡av免费观看 | 久久a毛片| 国产一级做a爰片久久毛片男男 | 成人91网站 | 午夜电影网一区 | 欧美男人天堂 | 日本欧美视频 | 图片区视频区小说区 | 日韩精品av一区二区三区 | 久久久久性色av无码一区二区 | 精品亚洲一区二区三区四区五区高 | 婷婷一区二区三区四区 | 亚洲国产精品成人va在线观看 | 在线视频久久 | 久草电影在线 | 亚洲一区日韩精品 | 日韩色av | 在线免费日韩av | 中文字幕日韩人妻在线视频 | 嫦娥性艳史bd | 久久久久久av无码免费网站 | 香蕉视频免费看 | 天堂伊人网 | 女同在线视频 | 亚洲免费色图 | 天堂色区 | 午夜在线精品 | 有色影院| 中文字幕蜜桃 |