卡夫卡–一次语义学
在分布式環(huán)境中,故障是很常見(jiàn)的情況,可以隨時(shí)發(fā)生。 在Kafka環(huán)境中,代理可能會(huì)崩潰,網(wǎng)絡(luò)故障,處理故障,發(fā)布消息時(shí)失敗或無(wú)法使用消息等。這些不同的場(chǎng)景引入了不同類(lèi)型的數(shù)據(jù)丟失和重復(fù)。
失敗場(chǎng)景
A(確認(rèn)失敗):生產(chǎn)者成功發(fā)布了消息,重試> 1,但由于失敗而未收到確認(rèn)。 在這種情況下,生產(chǎn)者將重試相同的消息,可能會(huì)引入重復(fù)消息。
B(生產(chǎn)者進(jìn)程在批處理消息中失敗):生產(chǎn)者發(fā)送了一批失敗的消息,但發(fā)布的成功很少。 在這種情況下,一旦生產(chǎn)者重新啟動(dòng),它將再次批量重新發(fā)布所有消息,這將在Kafka中引入重復(fù)消息。
C(觸發(fā)并忘記失敗)生產(chǎn)者發(fā)布的消息,重試= 0(觸發(fā)并忘記)。 如果失敗,發(fā)布的消息將不知道并發(fā)送下一條消息,這將導(dǎo)致消息丟失。
D(批處理消息中的消費(fèi)者失敗)消費(fèi)者從Kafka接收到一批消息,并手動(dòng)提交其偏移量(enable.auto.commit = false)。 如果消費(fèi)者在提交給Kafka之前失敗,則下次消費(fèi)者將再次使用相同的記錄,這些記錄將在消費(fèi)者端復(fù)制副本。
精確一次語(yǔ)義
在這種情況下,即使生產(chǎn)者嘗試重新發(fā)送消息,它也導(dǎo)致消息將被消費(fèi)者發(fā)布和消費(fèi)一次。
為了在Kafka中實(shí)現(xiàn)Exactly-Once語(yǔ)義,它使用以下3個(gè)屬性
啟用冪等(enable.idempotence = true)
冪等傳遞使生產(chǎn)者可以在單個(gè)生產(chǎn)者的生命周期內(nèi),將消息僅一次寫(xiě)入Kafka到主題的特定分區(qū),而不會(huì)造成數(shù)據(jù)丟失和每個(gè)分區(qū)的訂單。
“請(qǐng)注意,啟用冪等性要求MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION小于或等于5,RETRIES_CONFIG大于0且ACKS_CONFIG為'all'。 如果用戶(hù)未明確設(shè)置這些值,則將選擇合適的值。 如果設(shè)置了不兼容的值,將拋出ConfigException”
為了實(shí)現(xiàn)冪等性,Kafka在生成消息時(shí)使用唯一的ID(稱(chēng)為產(chǎn)品ID或PID和序列號(hào))。 生產(chǎn)者在發(fā)布的每個(gè)消息上保持遞增的序列號(hào),這些消息具有唯一的PID。 代理始終將當(dāng)前序列號(hào)與前一個(gè)序列號(hào)進(jìn)行比較,如果新序列號(hào)不比上一個(gè)序列號(hào)大+1,則它會(huì)拒絕,這會(huì)避免重復(fù);如果消息中丟失了更大的序列號(hào),則會(huì)拒絕同時(shí)顯示
在失敗的情況下,代理將序列號(hào)與先前的序列號(hào)進(jìn)行比較,如果序列不增加,+ 1將拒絕該消息。
交易(隔離級(jí)別)
事務(wù)使我們能夠自動(dòng)更新多個(gè)主題分區(qū)中的數(shù)據(jù)。 事務(wù)中包含的所有記錄都將被成功保存,或者沒(méi)有保存成功,它允許您將同一個(gè)事務(wù)中的消費(fèi)者補(bǔ)償與已處理的數(shù)據(jù)一起提交,從而允許端到端的一次精確語(yǔ)義。
生產(chǎn)者不等待將消息寫(xiě)到kafka上,生產(chǎn)者使用beginTransaction,commitTransaction和abortTransaction(如果發(fā)生故障)消費(fèi)者使用isolate.level級(jí)別,無(wú)論是read_committed還是read_uncommitted
- read_committed:使用者將始終僅讀取已提交的數(shù)據(jù)。
- read_uncommitted:以偏移順序讀取所有消息,而無(wú)需等待事務(wù)提交
如果具有Isolation.level = read_committed的使用者到達(dá)尚未完成的事務(wù)的控制消息,它將不會(huì)再?gòu)脑摲謪^(qū)傳遞任何消息,直到生產(chǎn)者提交或中止該事務(wù)或發(fā)生事務(wù)超時(shí)。 事務(wù)超時(shí)由生產(chǎn)者使用配置transaction.timeout.ms(默認(rèn)為1分鐘)確定。
生產(chǎn)者和消費(fèi)者中的確切時(shí)間
在正常情況下,生產(chǎn)者和消費(fèi)者是分開(kāi)的。 生產(chǎn)者必須具有冪等性并同時(shí)管理事務(wù),以便消費(fèi)者可以使用isolation.level讀取read_committed以使整個(gè)過(guò)程成為原子操作。 這樣可以確保生產(chǎn)者將始終與源系統(tǒng)同步。 即使生產(chǎn)者崩潰或事務(wù)中止,它也始終是一致的,并且一次將消息或一批消息發(fā)布為一個(gè)單元。
同一用戶(hù)一次將收到消息或一批消息。
在Exactly-Once中,語(yǔ)義生產(chǎn)者與消費(fèi)者一起將作為原子操作出現(xiàn),它將作為一個(gè)單元進(jìn)行操作。 要么發(fā)布一次就被消耗掉,要么中止。
在Kafka Stream中恰好一次
Kafka Stream消耗來(lái)自主題A的消息,處理消息并將其發(fā)布到主題B,并在發(fā)布后使??用commit(commit主要在后臺(tái)運(yùn)行)將所有狀態(tài)存儲(chǔ)數(shù)據(jù)刷新到磁盤(pán)。
Kafka Stream中的“一次”是“讀取-處理-寫(xiě)入”模式,可確保將這些操作視為原子操作。 由于Kafka Stream可以滿(mǎn)足生產(chǎn)者,消費(fèi)者和交易的需求,因此Kafka Stream帶有特殊的參數(shù)processing.guarantee,它可以完全地_once或at_least_once使得不單獨(dú)處理所有參數(shù)變得容易。
Kafka Streams原子地更新使用者偏移量,本地狀態(tài)存儲(chǔ),狀態(tài)存儲(chǔ)changelog主題和生產(chǎn)以一起輸出所有主題。 如果這些步驟中的任何一個(gè)失敗,則所有更改都將回滾。
processing.guarantee:確切地提供一次以下參數(shù),您無(wú)需明確設(shè)置
翻譯自: https://www.javacodegeeks.com/2020/05/kafka-exactly-once-semantics.html
總結(jié)
- 上一篇: linux端口开启命令(linux 端口
- 下一篇: 清洁单元测试