日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

kafka异步推送设置重试_一篇文章了解 Kafka 幂等性的原理及实践

發布時間:2023/11/27 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka异步推送设置重试_一篇文章了解 Kafka 幂等性的原理及实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

01 冪等性如此重要

Kafka作為分布式MQ,大量用于分布式系統中,如消息推送系統、業務平臺系統(如結算平臺),就拿結算來說,業務方作為上游把數據打到結算平臺,如果一份數據被計算、處理了多次,產生的后果將會特別嚴重。

02 哪些因素影響冪等性

使用Kafka時,需要保證exactly-once語義。要知道在分布式系統中,出現網絡分區是不可避免的,如果kafka broker 在回復ack時,出現網絡故障或者是full gc導致ack timeout,producer將會重發,如何保證producer重試時不造成重復or亂序?又或者producer 掛了,新的producer并沒有old producer的狀態數據,這個時候如何保證冪等?即使Kafka 發送消息滿足了冪等,consumer拉取到消息后,把消息交給線程池workers,workers線程對message的處理可能包含異步操作,又會出現以下情況:

  • 先commit,再執行業務邏輯:提交成功,處理失敗 。造成丟失
  • 先執行業務邏輯,再commit:提交失敗,執行成功。造成重復執行
  • 先執行業務邏輯,再commit:提交成功,異步執行fail。造成丟失

本文將針對以上問題作出討論

03 Kafka保證發送冪等性

針對以上的問題,kafka在0.11版新增了冪等型producer和事務型producer。前者解決了單會話冪等性等問題,后者解決了多會話冪等性。

單會話冪等性

為解決producer重試引起的亂序和重復。Kafka增加了pid和seq。Producer中每個RecordBatch都有一個單調遞增的seq; Broker上每個tp也會維護pid-seq的映射,并且每Commit都會更新lastSeq。這樣recordBatch到來時,broker會先檢查RecordBatch再保存數據:如果batch中 baseSeq(第一條消息的seq)比Broker維護的序號(lastSeq)大1,則保存數據,否則不保存(inSequence方法)。

ProducerStateManager.scala

ne throw mew UnknownProducerIdException(s"Local producer state matches expected epoch $producerEpoch " +

引申:Kafka producer 對有序性做了哪些處理

假設我們有5個請求,batch1、batch2、batch3、batch4、batch5;如果只有batch2 ack failed,3、4、5都保存了,那2將會隨下次batch重發而造成重復。我們可以設置 max.in.flight.requests.per.connection =1(客戶端在單個連接上能夠發送的未響應請求的個數)來解決亂序,但降低了系統吞吐。

新版本kafka設置enable.idempotence=true后能夠動態調整max-in-flight-request。正常情況下 max.in.flight.requests.per.connection 大于1。 當重試請求到來且時,batch 會根據 seq重新添加到隊列的合適位置,并把 max.in.flight.requests.per.connection設為1, 這樣它 前面的 batch序號都比它小,只有前面的都發完了,它才能發。

多會話冪等性

在單會話冪等性中介紹,kafka通過引入pid和seq來實現單會話冪等性,但正是引入了pid,當應用重啟時,新的producer并沒有old producer的狀態數據。可能重復保存。

Kafka事務通過隔離機制來實現多會話冪等性

kafka事務引入了transactionId 和Epoch,設置transactional.id后,一個transactionId只對應一個pid, 且Server 端會記錄最新的 Epoch 值。這樣有新的producer初始化時,會向TransactionCoordinator發送InitPIDRequest請求, TransactionCoordinator 已經有了這個 transactionId對應的 meta,會返回之前分配的 PID,并把 Epoch 自增 1 返回,這樣當old producer恢復過來請求操作時,將被認為是無效producer拋出異常。 如果沒有開啟事務,TransactionCoordinator會為新的producer返回new pid,這樣就起不到隔離效果,因此無法實現多會話冪等。

04 Consumer端冪等性

如上所述,consumer拉取到消息后,把消息交給線程池workers,workers對message的handle可能包含異步操作,又會出現以下情況:

  • 先commit,再執行業務邏輯:提交成功,處理失敗 。造成丟失
  • 先執行業務邏輯,再commit:提交失敗,執行成功。造成重復執行
  • 先執行業務邏輯,再commit:提交成功,異步執行fail。造成丟失

對此我們常用的方法時,works取到消息后先執行如下code

end:如果你覺得本文對你有幫助的話,記得關注點贊轉發,你的支持就是我更新動力。

總結

以上是生活随笔為你收集整理的kafka异步推送设置重试_一篇文章了解 Kafka 幂等性的原理及实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。