你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你
我們將消息隊(duì)列這個(gè)組件加入到了我們的商城系統(tǒng)里,并且通過秒殺這個(gè)實(shí)際的案例進(jìn)行了實(shí)際演練,知道了它對(duì)高并發(fā)寫流量做削峰填谷,對(duì)非關(guān)鍵業(yè)務(wù)邏輯做異步處理,對(duì)不同的業(yè)務(wù)系統(tǒng)做解耦合。
場(chǎng)景:
現(xiàn)在我們的電商系統(tǒng)中上了一個(gè)新產(chǎn)品發(fā)紅包的功能,即當(dāng)用戶在我們商城消費(fèi)了一定的額度之后,我們系統(tǒng)就給用戶發(fā)送一個(gè)現(xiàn)金紅包,用來答謝用戶并且促進(jìn)用戶消費(fèi)。
前面我們說到過,由于這個(gè)發(fā)紅包的動(dòng)作并不屬于當(dāng)前下單的主流程,所以我們就使用消息隊(duì)列來異步處理。這個(gè)時(shí)候,就會(huì)有個(gè)隱藏問題:
我們?cè)谕哆f消息的過程中消息可能會(huì)丟失,那我們的用戶就來打客服電話投訴我們說沒有得到紅包,甚至于有關(guān)部門投訴我們。
另一個(gè)問題,就是如果我們將消息重復(fù)發(fā)送了,那么用戶就會(huì)得到兩個(gè)紅包,這樣會(huì)造成我們公司的損失。
所以,現(xiàn)在我們要確保,系統(tǒng)生產(chǎn)的消息一定要被消費(fèi)到,并且只能被消費(fèi)一次,這個(gè)到底該怎么做呢?接下來,我們就來深入學(xué)習(xí)下。
01 為何消息會(huì)丟失?
要想保證消息只被消費(fèi)一次,那么首先就得要保證消息不丟失。我們先來看看,消息從被寫入消息隊(duì)列,到被消費(fèi)完成,這整個(gè)鏈路上會(huì)有哪些地方可能會(huì)導(dǎo)致消息丟失?我們不難看出,其實(shí)主要有三個(gè)地方:
消息從生產(chǎn)者到消息隊(duì)列的過程。
消息在消息隊(duì)列存儲(chǔ)的過程。
消息在被消費(fèi)的過程。
如上,我們分析了共有 3 個(gè)消息可能丟失的地方,接下來,我們就具體來分析下每一種情況。
1. 消息在寫到消息隊(duì)列的過程中丟失
消息生產(chǎn)者一般就是業(yè)務(wù)系統(tǒng),消息隊(duì)列是單獨(dú)部署了在獨(dú)立的服務(wù)器上的,所以業(yè)務(wù)服務(wù)器和消息隊(duì)列服務(wù)器可能會(huì)出現(xiàn)網(wǎng)絡(luò)抖動(dòng),當(dāng)出現(xiàn)了網(wǎng)絡(luò)抖動(dòng),消息就會(huì)丟失。
一般這種情況,我們可以采用消息重傳的方案,即當(dāng)我們發(fā)現(xiàn)發(fā)送的消息超時(shí)后,我們就重新發(fā)送一次,但是不能一直無限制的重傳消息。按照經(jīng)驗(yàn)來說,如果不是消息隊(duì)列本身故障,或者是網(wǎng)絡(luò)斷開了,一般重試個(gè) 2 到 3 次就行了。
但是,這種方案就有可能造成消息的重復(fù),這樣就會(huì)導(dǎo)致消費(fèi)者消費(fèi)到重復(fù)的消息。
例如,消息發(fā)送到消息隊(duì)列中,但是由于消息隊(duì)列處理消息較慢或者網(wǎng)絡(luò)抖動(dòng),這個(gè)時(shí)候,其實(shí)消息是寫入成功的,但是對(duì)于生產(chǎn)端就認(rèn)為超時(shí)了,那么生產(chǎn)者就會(huì)重傳當(dāng)前消息,則會(huì)出現(xiàn)消息重復(fù)。對(duì)于我們上面案例中,就是用戶會(huì)收到兩個(gè)紅包。
2. 消息在消息隊(duì)列中丟失
即使消息發(fā)送到了消息隊(duì)列,消息也不會(huì)萬無一失,還是會(huì)面臨丟失的風(fēng)險(xiǎn)。
我們以 Kafka 為例,消息在Kafka 中是存儲(chǔ)在本地磁盤上的, 為了減少消息存儲(chǔ)對(duì)磁盤的隨機(jī) I/O,一般我們會(huì)將消息寫入到操作系統(tǒng)的 Page Cache 中,然后在合適的時(shí)間將消息刷新到磁盤上。
例如,Kafka 可以配置當(dāng)達(dá)到某一時(shí)間間隔,或者累積一定的消息數(shù)量的時(shí)候再刷盤,也就是所謂的異步刷盤。
不過,如果發(fā)生機(jī)器掉電或者機(jī)器異常重啟,那么 Page Cache 中還沒有來得及刷盤的消息就會(huì)丟失了。那么怎么解決呢?你可能會(huì)把刷盤的間隔設(shè)置很短,或者設(shè)置累積一條消息就就刷盤。
但這樣頻繁刷盤會(huì)對(duì)性能有比較大的影響,而且從經(jīng)驗(yàn)來看,出現(xiàn)機(jī)器宕機(jī)或者掉電的幾率也不高,所以我不建議你這樣做。
如果你的電商系統(tǒng)對(duì)消息丟失的容忍度很低,那么你可以考慮以集群方式部署 Kafka 服務(wù),通過部署多個(gè)副本備份數(shù)據(jù),保證消息盡量不丟失。
那么它是怎么實(shí)現(xiàn)的呢?Kafka 集群中有一個(gè) Leader 負(fù)責(zé)消息的寫入和消費(fèi),可以有多個(gè) Follower 負(fù)責(zé)數(shù)據(jù)的備份。Follower 中有一個(gè)特殊的集合叫做 ISR(in-sync replicas),當(dāng) Leader 故障時(shí),新選舉出來的 Leader 會(huì)從 ISR 中選擇,默認(rèn) Leader 的數(shù)據(jù)會(huì)異步地復(fù)制給 Follower,這樣在 Leader 發(fā)生掉電或者宕機(jī)時(shí),Kafka 會(huì)從 Follower 中消費(fèi)消息,減少消息丟失的可能。
由于默認(rèn)消息是異步地從 Leader 復(fù)制到 Follower 的,所以一旦 Leader 宕機(jī),那些還沒有來得及復(fù)制到 Follower 的消息還是會(huì)丟失。
為了解決這個(gè)問題,Kafka 為生產(chǎn)者提供一個(gè)選項(xiàng)叫做“acks”,當(dāng)這個(gè)選項(xiàng)被設(shè)置為“all”時(shí),生產(chǎn)者發(fā)送的每一條消息除了發(fā)給 Leader 外還會(huì)發(fā)給所有的 ISR,并且必須得到 Leader 和所有 ISR 的確認(rèn)后才被認(rèn)為發(fā)送成功。這樣,只有 Leader 和所有的 ISR 都掛了,消息才會(huì)丟失。
從上面這張圖來看,當(dāng)設(shè)置“acks=all”時(shí),需要同步執(zhí)行 1,3,4 三個(gè)步驟,對(duì)于消息生產(chǎn)的性能來說也是有比較大的影響的,所以你在實(shí)際應(yīng)用中需要仔細(xì)地權(quán)衡考量。這里建議是:
如果你需要確保消息一條都不能丟失,那么建議不要開啟消息隊(duì)列的同步刷盤,而是需要使用集群的方式來解決,可以配置當(dāng)所有 ISR Follower 都接收到消息才返回成功。
如果對(duì)消息的丟失有一定的容忍度,那么建議不部署集群,即使以集群方式部署,也建議配置只發(fā)送給一個(gè) Follower 就可以返回成功了。
我們的業(yè)務(wù)系統(tǒng)一般對(duì)于消息的丟失有一定的容忍度,比如說以上面的紅包系統(tǒng)為例,如果紅包消息丟失了,我們只要后續(xù)給沒有發(fā)送紅包的用戶補(bǔ)發(fā)紅包就好了。
3. 在消費(fèi)的過程中存在消息丟失的可能
還是以 Kafka 為例來說明。一個(gè)消費(fèi)者消費(fèi)消息的進(jìn)度是記錄在消息隊(duì)列集群中的,而消費(fèi)的過程分為三步:接收消息、處理消息、更新消費(fèi)進(jìn)度。
這里面接收消息和處理消息的過程都可能會(huì)發(fā)生異常或者失敗,比如說,消息接收時(shí)網(wǎng)絡(luò)發(fā)生抖動(dòng),導(dǎo)致消息并沒有被正確的接收到;處理消息時(shí)可能發(fā)生一些業(yè)務(wù)的異常導(dǎo)致處理流程未執(zhí)行完成,這時(shí)如果更新消費(fèi)進(jìn)度,那么這條失敗的消息就永遠(yuǎn)不會(huì)被處理了,也可以認(rèn)為是丟失了。
所以,在這里你需要注意的是,一定要等到消息接收和處理完成后才能更新消費(fèi)進(jìn)度,但是這也會(huì)造成消息重復(fù)的問題,比方說某一條消息在處理之后,消費(fèi)者恰好宕機(jī)了,那么因?yàn)闆]有更新消費(fèi)進(jìn)度,所以當(dāng)這個(gè)消費(fèi)者重啟之后,還會(huì)重復(fù)地消費(fèi)這條消息。
02 如何保證消息只被消費(fèi)一次
從上面的分析中,你能發(fā)現(xiàn),為了避免消息丟失,我們需要付出兩方面的代價(jià):一方面是性能的損耗;一方面可能造成消息重復(fù)消費(fèi)。
性能的損耗我們還可以接受,因?yàn)橐话銟I(yè)務(wù)系統(tǒng)只有在寫請(qǐng)求時(shí)才會(huì)有發(fā)送消息隊(duì)列的操作,而一般系統(tǒng)的寫請(qǐng)求的量級(jí)并不高,但是消息一旦被重復(fù)消費(fèi),就會(huì)造成業(yè)務(wù)邏輯處理的錯(cuò)誤。那么我們要如何避免消息的重復(fù)呢?
想要完全的避免消息重復(fù)的發(fā)生是很難做到的,因?yàn)榫W(wǎng)絡(luò)的抖動(dòng)、機(jī)器的宕機(jī)和處理的異常都是比較難以避免的,在工業(yè)上并沒有成熟的方法,因此我們會(huì)把要求放寬,只要保證即使消費(fèi)到了重復(fù)的消息,從消費(fèi)的最終結(jié)果來看和只消費(fèi)一次是等同的就好了,也就是保證在消息的生產(chǎn)和消費(fèi)的過程是“冪等”的。
1. 什么是冪等
冪等是一個(gè)數(shù)學(xué)上的概念,它的含義是多次執(zhí)行同一個(gè)操作和執(zhí)行一次操作,最終得到的結(jié)果是相同的,說起來可能有些抽象,我給你舉個(gè)例子:
比如,男生和女生吵架,女生抓住一個(gè)點(diǎn)不放,傳遞“你不在乎我了嗎?”(生產(chǎn)消息)的信息。那么當(dāng)多次埋怨“你不在乎我了嗎?”的時(shí)候(多次生產(chǎn)相同消息),她不知道的是,男生的耳朵(消息處理)會(huì)自動(dòng)把 N 多次的信息屏蔽,就像只聽到一次一樣,這就是冪等性。
如果我們消費(fèi)一條消息的時(shí)候,要給現(xiàn)有的庫存數(shù)量減 1,那么如果消費(fèi)兩條相同的消息就會(huì)給庫存數(shù)量減 2,這就不是冪等的。而如果消費(fèi)一條消息后,處理邏輯是將庫存的數(shù)量設(shè)置為 0,或者是如果當(dāng)前庫存數(shù)量是 10 時(shí)則減 1,這樣在消費(fèi)多條消息時(shí),所得到的結(jié)果就是相同的,這就是冪等的。
說白了,你可以這么理解“冪等”:一件事兒無論做多少次都和做一次產(chǎn)生的結(jié)果是一樣的,那么這件事兒就具有冪等性。
2. 在生產(chǎn)、消費(fèi)過程中增加消息冪等性的保證
消息在生產(chǎn)和消費(fèi)的過程中都可能會(huì)產(chǎn)生重復(fù),所以你要做的是,在生產(chǎn)過程和消費(fèi)過程中增加消息冪等性的保證,這樣就可以認(rèn)為從“最終結(jié)果上來看”,消息實(shí)際上是只被消費(fèi)了一次的。
在消息生產(chǎn)過程中,在 Kafka0.11 版本和 Pulsar 中都支持“producer idempotency”的特性,翻譯過來就是生產(chǎn)過程的冪等性,這種特性保證消息雖然可能在生產(chǎn)端產(chǎn)生重復(fù),但是最終在消息隊(duì)列存儲(chǔ)時(shí)只會(huì)存儲(chǔ)一份。
它的做法是給每一個(gè)生產(chǎn)者一個(gè)唯一的 ID,并且為生產(chǎn)的每一條消息賦予一個(gè)唯一 ID,消息隊(duì)列的服務(wù)端會(huì)存儲(chǔ) < 生產(chǎn)者 ID,最后一條消息 ID> 的映射。當(dāng)某一個(gè)生產(chǎn)者產(chǎn)生新的消息時(shí),消息隊(duì)列服務(wù)端會(huì)比對(duì)消息 ID 是否與存儲(chǔ)的最后一條 ID 一致,如果一致,就認(rèn)為是重復(fù)的消息,服務(wù)端會(huì)自動(dòng)丟棄。
而在消費(fèi)端,冪等性的保證會(huì)稍微復(fù)雜一些,你可以從通用層和業(yè)務(wù)層兩個(gè)層面來考慮。
你可以看到,無論是生產(chǎn)端的冪等性保證方式,還是消費(fèi)端通用的冪等性保證方式,它們的共同特點(diǎn)都是為每一個(gè)消息生成一個(gè)唯一的 ID,然后在使用這個(gè)消息的時(shí)候,先比對(duì)這個(gè) ID 是否已經(jīng)存在,如果存在,則認(rèn)為消息已經(jīng)被使用過。
所以這種方式是一種標(biāo)準(zhǔn)的實(shí)現(xiàn)冪等的方式,你在項(xiàng)目之中可以拿來直接使用,它在邏輯上的偽代碼就像下面這樣:
boolean isIDExisted = selectByID(ID); // 判斷ID是否存在 if(isIDExisted) { return; //存在則直接返回 } else { process(message); //不存在,則處理消息 saveID(ID); //存儲(chǔ)ID }不過這樣會(huì)有一個(gè)問題:如果消息在處理之后,還沒有來得及寫入數(shù)據(jù)庫,消費(fèi)者宕機(jī)了重啟之后發(fā)現(xiàn)數(shù)據(jù)庫中并沒有這條消息,還是會(huì)重復(fù)執(zhí)行兩次消費(fèi)邏輯。
這時(shí)你就需要引入事務(wù)機(jī)制,保證消息處理和寫入數(shù)據(jù)庫必須同時(shí)成功或者同時(shí)失敗,但是這樣消息處理的成本就更高了,所以,如果對(duì)于消息重復(fù)沒有特別嚴(yán)格的要求,可以直接使用這種通用的方案,而不考慮引入事務(wù)。
在業(yè)務(wù)層面怎么處理呢?這里有很多種處理方式,其中有一種是增加樂觀鎖的方式。比如,你的消息處理程序需要給一個(gè)人的賬號(hào)加錢,那么你可以通過樂觀鎖的方式來解決。
具體的操作方式是這樣的:你給每個(gè)人的賬號(hào)數(shù)據(jù)中增加一個(gè)版本號(hào)的字段,在生產(chǎn)消息時(shí)先查詢這個(gè)賬戶的版本號(hào),并且將版本號(hào)連同消息一起發(fā)送給消息隊(duì)列。消費(fèi)端在拿到消息和版本號(hào)后,在執(zhí)行更新賬戶金額 SQL 的時(shí)候帶上版本號(hào),類似于執(zhí)行:
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;你看,我們?cè)诟聰?shù)據(jù)時(shí)給數(shù)據(jù)加了樂觀鎖,這樣在消費(fèi)第一條消息時(shí),version 值為 1,SQL 可以執(zhí)行成功,并且同時(shí)把 version 值改為了 2;在執(zhí)行第二條相同的消息時(shí),由于 version 值不再是 1,所以這條 SQL 不能執(zhí)行成功,也就保證了消息的冪等性。
總結(jié),今天我們主要學(xué)習(xí)了在消息隊(duì)列中,消息可能會(huì)發(fā)生丟失的場(chǎng)景,和我們的應(yīng)對(duì)方法,以及在消息重復(fù)的場(chǎng)景下,我們要如何保證,盡量不影響消息最終的處理結(jié)果。
有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
總結(jié)
以上是生活随笔為你收集整理的你的消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。