日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

关于Kafka 的 consumer 消费者手动提交详解

發布時間:2025/5/22 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 关于Kafka 的 consumer 消费者手动提交详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

在上一篇 Kafka使用Java實現數據的生產和消費demo 中介紹如何簡單的使用kafka進行數據傳輸。本篇則重點介紹kafka中的 consumer 消費者的講解。

應用場景

在上一篇kafka的consumer消費者,我們使用的是自動提交offset下標。
但是offset下標自動提交其實在很多場景都不適用,因為自動提交是在kafka拉取到數據之后就直接提交,這樣很容易丟失數據,尤其是在需要事物控制的時候。
很多情況下我們需要從kafka成功拉取數據之后,對數據進行相應的處理之后再進行提交。如拉取數據之后進行寫入mysql這種 , 所以這時我們就需要進行手動提交kafka的offset下標。

這里順便說下offset具體是什么。
offset:指的是kafka的topic中的每個消費組消費的下標。
簡單的來說就是一條消息對應一個offset下標,每次消費數據的時候如果提交offset,那么下次消費就會從提交的offset加一那里開始消費。
比如一個topic中有100條數據,我消費了50條并且提交了,那么此時的kafka服務端記錄提交的offset就是49(offset從0開始),那么下次消費的時候offset就從50開始消費。

測試

說了這么,那么我們開始進行手動提交測試。
首先,使用kafka 的producer 程序往kafka集群發送了100條測試數據。

程序打印中已經成功發送了,這里我們在kafka服務器使用命令中來查看是否成功發送.
命令如下:

kafka-console-consumer.sh --zookeeper master:2181 --topic KAFKA_TEST2 --from-beginning

注:
1.master 是我在linux中做了IP映射的關系,實際可以換成IP。
2.因為kafka是集群,所以也可以在集群的其他機器進行消費。

可以看到已經成功發送了100條。

成功發送消息之后,我們再使用kafka的consumer 進行數據消費。

因為是用來測試手動提交
所以 將 enable.auto.commit 改成 false 進行手動提交
并且設置每次拉取最大10條

props.put("enable.auto.commit", "false"); props.put("max.poll.records", 10);

將提交方式改成false之后
需要手動提交只需加上這段代碼

consumer.commitSync();

那么首先嘗試消費不提交,測試能不能重復消費。
右鍵運行main方法進行消費,不提交offset下標。

成功消費之后,結束程序,再次運行main方法進行消費,也不提交offset下標。

并未手動進行提交,而且并未更改消費組名,但是可以看到已經重復消費了!

接下來,開始測試手動提交。

  • 測試目的:
    1.測試手動提交之后的offset,能不能再次消費。
    2.測試未提交的offset,能不能再次進行消費。
  • 測試方法: 當消費到50條的時候,進行手動提交,然后剩下的50條不進行提交。
  • 希望達成的目的: 手動提交的offset不能再次消費,未提交的可以再次進行消費。
  • 為了達到上述目的,我們測試只需添加如下代碼即可:

    if(list.size()==50){consumer.commitSync(); }

    更改代碼之后,開始運行程序
    測試示例圖如下:

    簡單的一看,和之前未提交的一樣,貌似沒有什么問題。
    但是正常來說,未提交的下標不應該重復進行消費,直到它提交為止嗎?
    因為要進行重復消費,但是messageNo 會一直累加,只會手動的提交前50條offset,
    后面的50條offset會一直無法消費,所以打印的條數不應該是100,而是應該一直打印。

    那么測試的結果和預想的為什么不一致呢?
    之前不是已經測試過可以重復消費未提交的offset嗎?
    其實這點可以根據兩次啟動方式的不同而得出結論。
    開始測試未提交重復消費的時候,實際我是啟動-暫停-啟動,那么本地的consumer實際是被初始化過兩次。
    而剛剛測試的實際consumer只有初始化一次。
    至于為什么初始化一次就不行呢?
    因為kafka的offset下標的記錄實際會有兩份,服務端會自己記錄一份,本地的消費者客戶端也會記錄一份,提交的offset會告訴服務端已經消費到這了,但是本地的并不會因此而改變offset進行再次消費。

    簡單的來說假如有10條數據,在第5條的時候進行提交了offset下標,那么服務端就知道該組消費的下標到第5條了,如果同組其他的consumer進行消費的時候就會從第6條開始進行消費。但是本地的消費者客戶端并不會因此而改變,它還是會繼續消費下去,并不會再次從第6條開始消費,所以會出現上圖情況。

    但是項目中運行之后,是不會因此而重啟的,所以這時我們可以換一種思路。
    就是如果觸發某個條件,所以導致offset未提交,我們就可以關閉之前的consumer,然后新new一個consumer,這樣就可以再次進行消費了! 當然配置要和之前的一樣。

    那么將之前的提交代碼更改如下:

    if(list.size()==50){consumer.commitSync(); }else if(list.size()>50){consumer.close();init();list.clear();list2.clear(); }

    注:這里因為是測試,為了簡單明了,所以條件我寫的很簡單。實際情況請根據個人的為準。

    示例圖如下:

    說明:
    1.因為每次是拉取10條,所以在60條的時候kafka的配置初始化了,然后又從新拉取了50-60條的數據,但是沒有提交,所以并不會影響實際結果。
    2.這里為了方便截圖展示,所以打印條件改了,但是不影響程序!

    從測試結果中,我們達到了之前想要測試的目的,未提交的offset可以重復進行消費。
    這種做法一般也可以滿足大部分需求。
    例如從kafka獲取數據入庫,如果一批數據入庫成功,就提交offset,否則不提交,然后再次拉取。
    但是這種做法并不能最大的保證數據的完整性。比如在運行的時候,程序掛了之類的。
    所以還有一種方法是手動的指定offset下標進行獲取數據,直到kafka的數據處理成功之后,將offset記錄下來,比如寫在數據庫中。那么這種做法,等到下一篇再進行嘗試吧!

    該項目我放在github上了,有興趣的可以看看!
    地址:https://github.com/xuwujing/kafka

    到此,本文結束,謝謝閱讀!

    轉載于:https://www.cnblogs.com/xuwujing/p/8432984.html

    總結

    以上是生活随笔為你收集整理的关于Kafka 的 consumer 消费者手动提交详解的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。