日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...

發(fā)布時間:2025/3/8 编程问答 14 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)... 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

寫在開頭:

本章是Kafka學(xué)習(xí)歸納第五部分,著重于強調(diào)Kafka的事一致性保證,消息重復(fù)消費場景及解決方式,記錄偏移量的主題,延時隊列的知識點。

文章內(nèi)容輸出來源:拉勾教育大數(shù)據(jù)高薪訓(xùn)練營。

一致性保證

水位標記

水位或水印(watermark)一詞,表示位置信息,即位移(offset)。Kafka源碼中使用的名字是高水位,HW(high watermark)。

LEO和HW

每個分區(qū)副本對象都有兩個重要的屬性:LEO和HW

LEO:即日志末端位移(log end offset),記錄了該副本日志中下一條消息的位移值。如果 LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,Leader LEO和 Follower LEO的更新是有區(qū)別的。

HW:即上面提到的水位值。對于同一個副本對象而言,其HW值不會大于LEO值。小于等于 HW值的所有消息都被認為是“已備份”的(replicated)。Leader副本和Follower副本的HW更新不同

上圖中,HW值是7,表示位移是 0~7 的所有消息都已經(jīng)處于“已提交狀態(tài)”(committed),而LEO值是14,8~13的消息就是未完全備份(fully replicated)——為什么沒有14?LEO指向的是下一條消息到來時的位移。

消費者無法消費分區(qū)下Leader副本中位移大于分區(qū)HW的消息

Follower副本何時更新LEO

Follower副本不停地向Leader副本所在的broker發(fā)送FETCH請求,一旦獲取消息后寫入自己的日志中進行備份。那么Follower副本的LEO是何時更新的呢?首先我必須言明,Kafka有兩套Follower副本

LEO:

1. 一套LEO保存在Follower副本所在Broker的副本管理機中;

2. 另一套LEO保存在Leader副本所在Broker的副本管理機中。Leader副本機器上保存了所有的follower副本的LEO。

Kafka使用前者幫助Follower副本更新其HW值;利用后者幫助Leader副本更新其HW。

1. Follower副本的本地LEO何時更新? Follower副本的LEO值就是日志的LEO值,每當(dāng)新寫入一條消息,LEO值就會被更新。當(dāng)Follower發(fā)送FETCH請求后,Leader將數(shù)據(jù)返回給Follower,此時Follower開始Log寫數(shù)據(jù),從而自動更新LEO值。

2. Leader端Follower的LEO何時更新? Leader端的Follower的LEO更新發(fā)生在Leader在處理 Follower FETCH請求時。一旦Leader接收到Follower發(fā)送的FETCH請求,它先從Log中讀取 相應(yīng)的數(shù)據(jù),給Follower返回數(shù)據(jù)前,先更新Follower的LEO。

Follower副本何時更新HW

Follower更新HW發(fā)生在其更新LEO之后,一旦Follower向Log寫完數(shù)據(jù),嘗試更新自己的HW值。

比較當(dāng)前LEO值與FETCH響應(yīng)中Leader的HW值,取兩者的小者作為新的HW值。

即:如果Follower的LEO大于Leader的HW,Follower HW值不會大于Leader的HW值。

Leader副本何時更新LEO

和Follower更新LEO相同,Leader寫Log時自動更新自己的LEO值。

Leader副本何時更新HW值

Leader的HW值就是分區(qū)HW值,直接影響分區(qū)數(shù)據(jù)對消費者的可見性

Leader會嘗試去更新分區(qū)HW的四種情況:

1. Follower副本成為Leader副本時:Kafka會嘗試去更新分區(qū)HW。

2. Broker崩潰導(dǎo)致副本被踢出ISR時:檢查下分區(qū)HW值是否需要更新是有必要的。

3. 生產(chǎn)者向Leader副本寫消息時:因為寫入消息會更新Leader的LEO,有必要檢查HW值是否需要更新

4. Leader處理Follower FETCH請求時:首先從Log讀取數(shù)據(jù),之后嘗試更新分區(qū)HW值

結(jié)論:

當(dāng)Kafka broker都正常工作時,分區(qū)HW值的更新時機有兩個:

1. Leader處理PRODUCE請求時

2. Leader處理FETCH請求時。

Leader如何更新自己的HW值?Leader broker上保存了一套Follower副本的LEO以及自己的LEO。當(dāng)嘗試確定分區(qū)HW時,它會選出所有滿足條件的副本,比較它們的LEO(包括Leader的LEO),并選擇最小的LEO值作為HW值。

需要滿足的條件,(二選一):

1. 處于ISR中

2. 副本LEO落后于Leader LEO的時長不大于 replica.lag.time.max.ms 參數(shù)值(默認是10s)

如果Kafka只判斷第一個條件的話,確定分區(qū)HW值時就不會考慮這些未在ISR中的副本,但這些副本已經(jīng)具備了“立刻進入ISR”的資格,因此就可能出現(xiàn)分區(qū)HW值越過ISR中副本LEO的情況——不允許。因為分區(qū)HW定義就是ISR中所有副本LEO的最小值

消息重復(fù)的場景及解決方案

消息重復(fù)和丟失是kafka中很常見的問題,主要發(fā)生在以下三個階段:

1. 生產(chǎn)者階段

2. broke階段

3. 消費者階段

生產(chǎn)者階段重復(fù)場景

生產(chǎn)發(fā)送的消息沒有收到正確的broke響應(yīng),導(dǎo)致生產(chǎn)者重試。

生產(chǎn)者發(fā)出一條消息,broke落盤以后因為網(wǎng)絡(luò)等種種原因發(fā)送端得到一個發(fā)送失敗的響應(yīng)或者網(wǎng)絡(luò)中斷,然后生產(chǎn)者收到一個可恢復(fù)的Exception重試消息導(dǎo)致消息重復(fù)。

生產(chǎn)者發(fā)送重復(fù)解決方案

啟動kafka的冪等性

要啟動kafka的冪等性,設(shè)置: enable.idempotence=true ,以及 ack=all 以及 retries > 1

ack=0,不重試。 可能會丟消息,適用于吞吐量指標重要性高于數(shù)據(jù)丟失,例如:日志收集。

生產(chǎn)者和broke階段消息丟失場景

ack=0,不重試

生產(chǎn)者發(fā)送消息完,不管結(jié)果了,如果發(fā)送失敗也就丟失了。

ack=1,leader crash

生產(chǎn)者發(fā)送消息完,只等待Leader寫入成功就返回了,Leader分區(qū)丟失了,此時Follower沒來及同步,消息丟失

unclean.leader.election.enable 配置true

允許選舉ISR以外的副本作為leader,會導(dǎo)致數(shù)據(jù)丟失,默認為false。生產(chǎn)者發(fā)送異步消息,只等待Lead寫入成功就返回,Leader分區(qū)丟失,此時ISR中沒有Follower,Leader從OSR中選舉,因為OSR中本來落后于Leader造成消息丟失

解決生產(chǎn)者和broke階段消息丟失

禁用unclean選舉,ack=all

ack=all / -1,tries > 1,unclean.leader.election.enable=false

生產(chǎn)者發(fā)完消息,等待Follower同步完再返回,如果異常則重試。副本的數(shù)量可能影響吞吐量,不超過5個,一般三個。 不允許unclean Leader選舉。

配置:min.insync.replicas > 1

當(dāng)生產(chǎn)者將 acks 設(shè)置為 all (或 -1 )時, min.insync.replicas>1 。指定確認消息寫成功需要的最小副本數(shù)量。達不到這個最小值,生產(chǎn)者將引發(fā)一個異常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend)。

當(dāng)一起使用時, min.insync.replicas 和 ack 允許執(zhí)行更大的持久性保證。一個典型的場景是創(chuàng)建一個復(fù)制因子為3的主題,設(shè)置min.insync復(fù)制到2個,用 all 配置發(fā)送。將確保如果大多數(shù)副本沒有收到寫操作,則生產(chǎn)者將引發(fā)異常。

失敗的offset單獨記錄

生產(chǎn)者發(fā)送消息,會自動重試,遇到不可恢復(fù)異常會拋出,這時可以捕獲異常記錄到數(shù)據(jù)庫或緩存,進行單獨處理。

消費者數(shù)據(jù)重復(fù)場景及解決方案

數(shù)據(jù)消費完沒有及時提交offset到broker。

消息消費端在消費過程中掛掉沒有及時提交offset到broke,另一個消費端啟動拿之前記錄的offset開始消費,由于offset的滯后性可能會導(dǎo)致新啟動的客戶端有少量重復(fù)消費。

解決方案

取消自動提交

每次消費完或者程序退出時手動提交。這可能也沒法保證一條重復(fù)。

下游做冪等

一般是讓下游做冪等或者盡量每消費一條消息都記錄offset,對于少數(shù)嚴格的場景可能需要把 offset或唯一ID(例如訂單ID)和下游狀態(tài)更新放在同一個數(shù)據(jù)庫里面做事務(wù)來保證精確的一次更新或者在下游數(shù)據(jù)表里面同時記錄消費offset,然后更新下游數(shù)據(jù)的時候用消費位移做樂觀鎖拒絕舊位移的數(shù)據(jù)更新。

__consumer_offsets

Kafka 1.0.2將consumer的位移信息保存在Kafka內(nèi)部的topic中,即__consumer_offsets主題,并且默認提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。

創(chuàng)建topic “tp_test_01”

kafka-topics.sh --zookeeper node1:2181/myKafka --create -- topic tp_test_01 --partitions 5 --replication-factor 1

使用kafka-console-producer.sh腳本生產(chǎn)消息

[root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> messages.txt; done [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt

由于默認沒有指定key,所以根據(jù)round-robin方式,消息分布到不同的分區(qū)上。 (本例中生產(chǎn)了60條消息)

驗證消息生產(chǎn)成功

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list node1:9092 --topic tp_test_01 --time -1

創(chuàng)建一個console consumer group

kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic tp_test_01 --from-beginning

獲取該consumer group的group id(后面需要根據(jù)該id查詢它的位移信息)

kafka-consumer-groups.sh --bootstrap-server linux121:9092 --list

查詢__consumer_offsets topic所有內(nèi)容

注意:運行下面命令前先要在consumer.properties中設(shè)置exclude.internal.topics=false

kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

默認情況下__consumer_offsets有50個分區(qū),如果你的系統(tǒng)中consumer group也很多的話,那么這個命令的輸出結(jié)果會很多。

計算指定consumer group在__consumer_offsets topic中分區(qū)信息

這時候就用到了group.id :console-consumer-77682

Kafka會使用下面公式計算該group位移保存在__consumer_offsets的哪個分區(qū)上:

Math.abs(groupID.hashCode()) % numPartitions

__consumer_offsets的分區(qū)41保存了這個consumer group的位移信息。

獲取指定consumer group的位移信息

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 41 --broker-list linux121:9092 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

可以看到__consumer_offsets topic的每一日志項的格式都是:

[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

延時隊列

兩個follower副本都已經(jīng)拉取到了leader副本的最新位置,此時又向leader副本發(fā)送拉取請求,而leader副本并沒有新的消息寫入,那么此時leader副本該如何處理呢?可以直接返回空的拉取結(jié)果給follower副本,不過在leader副本一直沒有新消息寫入的情況下,follower副本會一直發(fā)送拉取請求,并且總收到空的拉取結(jié)果,消耗資源。

Kafka在處理拉取請求時,會先讀取一次日志文件,如果收集不到足夠多(fetchMinBytes,由參數(shù)fetch.min.bytes配置,默認值為1)的消息,那么就會創(chuàng)建一個延時拉取操作(DelayedFetch)以等待拉取到足夠數(shù)量的消息。當(dāng)延時拉取操作執(zhí)行時,會再讀取一次日志文件,然后將拉取結(jié)果返回給follower副本。

延遲操作不只是拉取消息時的特有操作,在Kafka中有多種延時操作,比如延時數(shù)據(jù)刪除、延時生產(chǎn)等。

對于延時生產(chǎn)(消息)而言,如果在使用生產(chǎn)者客戶端發(fā)送消息的時候?qū)cks參數(shù)設(shè)置為-1,那么就意味著需要等待ISR集合中的所有副本都確認收到消息之后才能正確地收到響應(yīng)的結(jié)果,或者捕獲超時異常。

假設(shè)某個分區(qū)有3個副本:leader、follower1和follower2,它們都在分區(qū)的ISR集合中。不考慮ISR變動的情況,Kafka在收到客戶端的生產(chǎn)請求后,將消息3和消息4寫入leader副本的本地日志文件。

由于客戶端設(shè)置了acks為-1,那么需要等到follower1和follower2兩個副本都收到消息3和消息4后才能告知客戶端正確地接收了所發(fā)送的消息。如果在一定的時間內(nèi),follower1副本或follower2副本沒能夠完全拉取到消息3和消息4,那么就需要返回超時異常給客戶端。生產(chǎn)請求的超時時間由參數(shù)request.timeout.ms配置,默認值為30000,即30s。

那么這里等待消息3和消息4寫入follower1副本和follower2副本,并返回相應(yīng)的響應(yīng)結(jié)果給客戶端的動作是由誰來執(zhí)行的呢?在將消息寫入leader副本的本地日志文件之后,Kafka會創(chuàng)建一個延時的生產(chǎn)操作(DelayedProduce),用來處理消息正常寫入所有副本或超時的情況,以返回相應(yīng)的響應(yīng)結(jié)果給客戶端。

延時操作需要延時返回響應(yīng)的結(jié)果,首先它必須有一個超時時間(delayMs),如果在這個超時時間內(nèi)沒有完成既定的任務(wù),那么就需要強制完成以返回響應(yīng)結(jié)果給客戶端。其次,延時操作不同于定時操作,定時操作是指在特定時間之后執(zhí)行的操作,而延時操作可以在所設(shè)定的超時時間之前完成,所以延時操作能夠支持外部事件的觸發(fā)。

就延時生產(chǎn)操作而言,它的外部事件是所要寫入消息的某個分區(qū)的HW(高水位)發(fā)生增長。也就是說,隨著follower副本不斷地與leader副本進行消息同步,進而促使HW進一步增長,HW每增長一次都會檢測是否能夠完成此次延時生產(chǎn)操作,如果可以就執(zhí)行以此返回響應(yīng)結(jié)果給客戶端;如果在超時時間內(nèi)始終無法完成,則強制執(zhí)行。

延時拉取操作,是由超時觸發(fā)或外部事件觸發(fā)而被執(zhí)行的。超時觸發(fā)很好理解,就是等到超時時間之后觸發(fā)第二次讀取日志文件的操作。外部事件觸發(fā)就稍復(fù)雜了一些,因為拉取請求不單單由follower副本發(fā)起,也可以由消費者客戶端發(fā)起,兩種情況所對應(yīng)的外部事件也是不同的。如果是follower副本的延時拉取,它的外部事件就是消息追加到了leader副本的本地日志文件中;如果是消費者客戶端的延時拉取,它的外部事件可以簡單地理解為HW的增長。

總結(jié)

以上是生活随笔為你收集整理的kafka 脚本发送_Kafka笔记归纳(第五部分:一致性保证,消息重复消费场景及解决方式)...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 欧美黄色录像 | 国产v亚洲v天堂无码久久久 | 国产精品美女 | www.天天操.com| 久久国产精品久久久久久电车 | 97视频一区 | 天天看夜夜爽 | 国内精品久久久久久久久 | 夜间福利在线 | 日韩黄色一级片 | 熟女高潮一区二区三区视频 | 经典杯子蛋糕日剧在线观看免费 | 丝袜ol美脚秘书在线播放 | 中国女人性猛交 | 一区二区三区四区在线视频 | 日韩成人av免费在线观看 | 国产.com | 69er小视频 | 秋霞av影院| 自拍偷拍第八页 | 伊人视屏 | 日日鲁鲁鲁夜夜爽爽狠狠视频97 | 重口h文| 波多野结衣高清视频 | 国产精品99久久久久久久 | 久久久免费电影 | 9色视频在线观看 | 日韩免费一级片 | 神马影院午夜伦理片 | 黄色正能量网站 | 一本色道久久综合亚洲精品 | 又大又长粗又爽又黄少妇视频 | 韩国一区二区在线播放 | 九九久久视频 | 69国产视频 | 国产老妇视频 | 99久久精品免费看国产四区 | 放荡的少妇2欧美版 | 久久人妻少妇嫩草av蜜桃 | 少妇视频一区二区三区 | 亚洲精品免费在线视频 | 日韩久久av | 亚洲精品久久夜色撩人男男小说 | 91精品国产高清一区二区三密臀 | 中国黄色在线视频 | 91久久久久久久久久久久 | 在线免费观看国产视频 | 91se在线| 国产精品视频你懂的 | 少妇喷白浆 | 九九九九九精品 | 久久91av| 色一情一交一乱一区二区三区 | 日本一本在线观看 | mm1313亚洲国产精品无码试看 | 久久久久久蜜桃 | 国产com| 黄色国产一区二区 | 成人777| 中国无码人妻丰满熟妇啪啪软件 | 国产豆花视频 | 欧美色图俺去了 | 青青操视频在线播放 | 亚洲国语 | 91碰碰| 亚洲熟女乱综合一区二区三区 | 成人区一区二区 | 日本美女性生活视频 | 欧美亚洲综合一区 | 超碰在线c | 96日本xxxxxⅹxxx70| 国产精品77 | 一区一区三区产品乱码 | 久久久久久久极品 | 亚洲69视频 | 日本a级片免费 | 中日韩黄色大片 | 国产伦精品一区二区三区四区 | 99毛片| 日本一区二区免费电影 | 成人免费视频观看 | 日日爽天天 | 亚洲天堂一级片 | 青娱乐在线视频观看 | 视频精品久久 | 黄色成人免费网站 | 亚洲欧美日本一区二区三区 | 亚洲精品aaaa | 国产精品久久久久久av | 欧美日韩一区二区在线播放 | 日本公妇乱淫免费视频一区三区 | 亚洲欧美大片 | 国产精品天干天干 | 添女人荫蒂视频 | 亚洲国产精品久久久久久久 | 香蕉久久精品日日躁夜夜躁 | 精品乱码一区二区三四区视频 | av免费看在线| 手机在线成人 |