卡夫卡–一次语义学
在分布式環(huán)境中,故障是很常見的情況,可以隨時發(fā)生。 在Kafka環(huán)境中,代理可能會崩潰,網(wǎng)絡故障,處理故障,發(fā)布消息時失敗或無法使用消息等。這些不同的場景引入了不同類型的數(shù)據(jù)丟失和重復。
失敗場景
A(確認失敗):生產(chǎn)者成功發(fā)布了消息,重試> 1,但由于失敗而未收到確認。 在這種情況下,生產(chǎn)者將重試相同的消息,可能會引入重復消息。
B(生產(chǎn)者進程在批處理消息中失敗):生產(chǎn)者發(fā)送了一批失敗的消息,但發(fā)布的成功很少。 在這種情況下,一旦生產(chǎn)者重新啟動,它將再次批量重新發(fā)布所有消息,這將在Kafka中引入重復消息。
C(觸發(fā)并忘記失敗)生產(chǎn)者發(fā)布的消息,重試= 0(觸發(fā)并忘記)。 如果失敗,發(fā)布的消息將不知道并發(fā)送下一條消息,這將導致消息丟失。
D(批處理消息中的消費者失敗)消費者從Kafka接收到一批消息,并手動提交其偏移量(enable.auto.commit = false)。 如果消費者在提交給Kafka之前失敗,則下次消費者將再次使用相同的記錄,這些記錄將在消費者端復制副本。
精確一次語義
在這種情況下,即使生產(chǎn)者嘗試重新發(fā)送消息,它也導致消息將被消費者發(fā)布和消費一次。
為了在Kafka中實現(xiàn)Exactly-Once語義,它使用以下3個屬性
啟用冪等(enable.idempotence = true)
冪等傳遞使生產(chǎn)者可以在單個生產(chǎn)者的生命周期內(nèi),將消息僅一次寫入Kafka到主題的特定分區(qū),而不會造成數(shù)據(jù)丟失和每個分區(qū)的訂單。
“請注意,啟用冪等性要求MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION小于或等于5,RETRIES_CONFIG大于0且ACKS_CONFIG為'all'。 如果用戶未明確設(shè)置這些值,則將選擇合適的值。 如果設(shè)置了不兼容的值,將拋出ConfigException”
為了實現(xiàn)冪等性,Kafka在生成消息時使用唯一的ID(稱為產(chǎn)品ID或PID和序列號)。 生產(chǎn)者在發(fā)布的每個消息上保持遞增的序列號,這些消息具有唯一的PID。 代理始終將當前序列號與前一個序列號進行比較,如果新序列號不比上一個序列號大+1,則它會拒絕,這會避免重復;如果消息中丟失了更大的序列號,則會拒絕同時顯示
在失敗的情況下,代理將序列號與先前的序列號進行比較,如果序列不增加,+ 1將拒絕該消息。
交易(隔離級別)
事務使我們能夠自動更新多個主題分區(qū)中的數(shù)據(jù)。 事務中包含的所有記錄都將被成功保存,或者沒有保存成功,它允許您將同一個事務中的消費者補償與已處理的數(shù)據(jù)一起提交,從而允許端到端的一次精確語義。
生產(chǎn)者不等待將消息寫到kafka上,生產(chǎn)者使用beginTransaction,commitTransaction和abortTransaction(如果發(fā)生故障)消費者使用isolate.level級別,無論是read_committed還是read_uncommitted
- read_committed:使用者將始終僅讀取已提交的數(shù)據(jù)。
- read_uncommitted:以偏移順序讀取所有消息,而無需等待事務提交
如果具有Isolation.level = read_committed的使用者到達尚未完成的事務的控制消息,它將不會再從該分區(qū)傳遞任何消息,直到生產(chǎn)者提交或中止該事務或發(fā)生事務超時。 事務超時由生產(chǎn)者使用配置transaction.timeout.ms(默認為1分鐘)確定。
生產(chǎn)者和消費者中的確切時間
在正常情況下,生產(chǎn)者和消費者是分開的。 生產(chǎn)者必須具有冪等性并同時管理事務,以便消費者可以使用isolation.level讀取read_committed以使整個過程成為原子操作。 這樣可以確保生產(chǎn)者將始終與源系統(tǒng)同步。 即使生產(chǎn)者崩潰或事務中止,它也始終是一致的,并且一次將消息或一批消息發(fā)布為一個單元。
同一用戶一次將收到消息或一批消息。
在Exactly-Once中,語義生產(chǎn)者與消費者一起將作為原子操作出現(xiàn),它將作為一個單元進行操作。 要么發(fā)布一次就被消耗掉,要么中止。
在Kafka Stream中恰好一次
Kafka Stream消耗來自主題A的消息,處理消息并將其發(fā)布到主題B,并在發(fā)布后使??用commit(commit主要在后臺運行)將所有狀態(tài)存儲數(shù)據(jù)刷新到磁盤。
Kafka Stream中的“一次”是“讀取-處理-寫入”模式,可確保將這些操作視為原子操作。 由于Kafka Stream可以滿足生產(chǎn)者,消費者和交易的需求,因此Kafka Stream帶有特殊的參數(shù)processing.guarantee,它可以完全地_once或at_least_once使得不單獨處理所有參數(shù)變得容易。
Kafka Streams原子地更新使用者偏移量,本地狀態(tài)存儲,狀態(tài)存儲changelog主題和生產(chǎn)以一起輸出所有主題。 如果這些步驟中的任何一個失敗,則所有更改都將回滾。
processing.guarantee:確切地提供一次以下參數(shù),您無需明確設(shè)置
翻譯自: https://www.javacodegeeks.com/2020/05/kafka-exactly-once-semantics.html
總結(jié)
- 上一篇: linux端口开启命令(linux 端口
- 下一篇: 清洁单元测试