一种低延迟的超时中心实现方式
簡(jiǎn)介:?在很多產(chǎn)品中都存在生命周期相關(guān)的設(shè)計(jì),時(shí)間節(jié)點(diǎn)到了之后需要做對(duì)應(yīng)的事情。超時(shí)中心(TimeOutCenter,TOC)負(fù)責(zé)存儲(chǔ)和調(diào)度生命周期節(jié)點(diǎn)上面的超時(shí)任務(wù),當(dāng)超時(shí)任務(wù)設(shè)置的超時(shí)時(shí)間到期后,超時(shí)中心需要立即調(diào)度處理這些超時(shí)任務(wù)。對(duì)于一些需要低延遲的超時(shí)場(chǎng)景,超時(shí)中心調(diào)度延遲會(huì)給產(chǎn)品帶來(lái)不可估量的影響。
作者 | 默達(dá)
來(lái)源 | 阿里技術(shù)公眾號(hào)
一 背景
在很多產(chǎn)品中都存在生命周期相關(guān)的設(shè)計(jì),時(shí)間節(jié)點(diǎn)到了之后需要做對(duì)應(yīng)的事情。
超時(shí)中心(TimeOutCenter,TOC)負(fù)責(zé)存儲(chǔ)和調(diào)度生命周期節(jié)點(diǎn)上面的超時(shí)任務(wù),當(dāng)超時(shí)任務(wù)設(shè)置的超時(shí)時(shí)間到期后,超時(shí)中心需要立即調(diào)度處理這些超時(shí)任務(wù)。對(duì)于一些需要低延遲的超時(shí)場(chǎng)景,超時(shí)中心調(diào)度延遲會(huì)給產(chǎn)品帶來(lái)不可估量的影響。
因此本文提出一種低延遲的超時(shí)中心實(shí)現(xiàn)方式,首先介紹傳統(tǒng)的超時(shí)中心的實(shí)現(xiàn)方案,以及傳統(tǒng)方案中的缺點(diǎn),然后介紹低延遲的方案,說(shuō)明如何解決傳統(tǒng)方案中的延遲問(wèn)題。
二 傳統(tǒng)高延遲方案
1 整體框架
傳統(tǒng)的超時(shí)中心整體框架如下所示,任務(wù)輸入后存儲(chǔ)在超時(shí)任務(wù)庫(kù)中,定時(shí)器觸發(fā)運(yùn)行數(shù)據(jù)庫(kù)掃描器,數(shù)據(jù)庫(kù)掃描器從超時(shí)任務(wù)庫(kù)中掃描已經(jīng)到達(dá)超時(shí)時(shí)間的任務(wù),已經(jīng)到達(dá)超時(shí)時(shí)間的任務(wù)存儲(chǔ)在機(jī)器的內(nèi)存隊(duì)列中,等待交給業(yè)務(wù)處理器進(jìn)行處理,業(yè)務(wù)處理器處理完成后更新任務(wù)狀態(tài)。
在大數(shù)據(jù)時(shí)代,超時(shí)任務(wù)數(shù)量肯定是很大的,傳統(tǒng)的超時(shí)中心通過(guò)分庫(kù)分表支持存儲(chǔ)海量的超時(shí)任務(wù),定時(shí)器觸發(fā)也需要做相應(yīng)的改變,需要充分利用集群的能力,下面分別從超時(shí)任務(wù)庫(kù)和定時(shí)器觸發(fā)兩方面詳細(xì)介紹。
2 任務(wù)庫(kù)設(shè)計(jì)
任務(wù)庫(kù)數(shù)據(jù)模型如下所示,采用分庫(kù)分表存儲(chǔ),一般可設(shè)計(jì)為8個(gè)庫(kù)1024個(gè)表,具體可以根據(jù)業(yè)務(wù)需求調(diào)整。biz_id為分表鍵,job_id為全局唯一的任務(wù)ID,status為超時(shí)任務(wù)的狀態(tài),action_time為任務(wù)的執(zhí)行時(shí)間,attribute存儲(chǔ)額外的數(shù)據(jù)。只有當(dāng)action_time小于當(dāng)前時(shí)間且status為待處理時(shí),任務(wù)才能被掃描器加載到內(nèi)存隊(duì)列。任務(wù)被處理完成后,任務(wù)的狀態(tài)被更新成已處理。
job_id bigint unsigned 超時(shí)任務(wù)的ID,全局唯一 gmt_create datetime 創(chuàng)建時(shí)間 gmt_modified datetime 修改時(shí)間 biz_id bigint unsigned 業(yè)務(wù)id,一般為關(guān)聯(lián)的主訂單或子訂單id biz_type bigint unsigned 業(yè)務(wù)類型 status tinyint 超時(shí)任務(wù)狀態(tài)(0待處理,2已處理,3取消) action_time datetime 超時(shí)任務(wù)執(zhí)行時(shí)間 attribute varchar 額外數(shù)據(jù)3 定時(shí)調(diào)度設(shè)計(jì)
定時(shí)調(diào)度流程圖如下所示,定時(shí)器每間隔10秒觸發(fā)一次調(diào)度,從集群configserver中獲取集群ip列表并為當(dāng)前機(jī)器編號(hào),然后給所有ip分配表。分配表時(shí)需要考慮好幾件事:一張表只屬于一臺(tái)機(jī)器,不會(huì)出現(xiàn)重復(fù)掃描;機(jī)器上線下線需要重新分配表。當(dāng)前機(jī)器從所分配的表中掃描出所有狀態(tài)為待處理的超時(shí)任務(wù),遍歷掃描出的待處理超時(shí)任務(wù)。對(duì)于每個(gè)超時(shí)任務(wù),當(dāng)內(nèi)存隊(duì)列不存在該任務(wù)且內(nèi)存隊(duì)列未滿時(shí),超時(shí)任務(wù)才加入內(nèi)存隊(duì)列,否則循環(huán)檢查等待。
4 缺點(diǎn)
- 需要定時(shí)器定時(shí)調(diào)度,定時(shí)器調(diào)度間隔時(shí)間加長(zhǎng)了超時(shí)任務(wù)處理的延遲時(shí)間;
- 數(shù)據(jù)庫(kù)掃描器為避免重復(fù)掃描數(shù)據(jù),一張表只能屬于一臺(tái)機(jī)器,任務(wù)庫(kù)分表的數(shù)量就是任務(wù)處理的并發(fā)度,并發(fā)度受限制;
- 當(dāng)單表數(shù)據(jù)量龐大時(shí),即使從單張表中掃描所有待處理的超時(shí)任務(wù)也需要花費(fèi)很長(zhǎng)的時(shí)間;
- 本方案總體處理步驟為:先掃描出所有超時(shí)任務(wù),再對(duì)單個(gè)超時(shí)任務(wù)進(jìn)行處理;超時(shí)任務(wù)處理延遲時(shí)間需要加上超時(shí)任務(wù)掃描時(shí)間;
- 本方案處理超時(shí)任務(wù)的最小延遲為定時(shí)器的定時(shí)間隔時(shí)間,在任務(wù)數(shù)量龐大的情況下,本方案可能存在較大延遲。
三 低延遲方案
1 整體框架
任務(wù)輸入后分為兩個(gè)步驟。第一個(gè)步驟是將任務(wù)存儲(chǔ)到任務(wù)庫(kù),本方案的任務(wù)庫(kù)模型設(shè)計(jì)和上面方案中的任務(wù)庫(kù)模型設(shè)計(jì)一樣;第二步驟是任務(wù)定時(shí),將任務(wù)的jobId和actionTime以一定方式設(shè)置到Redis集群中,當(dāng)定時(shí)任務(wù)的超時(shí)時(shí)間到了之后,從Redis集群pop超時(shí)任務(wù)的jobId,根據(jù)jobId從任務(wù)庫(kù)中查詢?cè)敿?xì)的任務(wù)信息交給業(yè)務(wù)處理器進(jìn)行處理,最后更新任務(wù)庫(kù)中任務(wù)的狀態(tài)。
本方案與上述方案最大的不同點(diǎn)就是超時(shí)任務(wù)的獲取部分,上述方案采用定時(shí)調(diào)度掃描任務(wù)庫(kù),本方案采用基于Redis的任務(wù)定時(shí)系統(tǒng),接下來(lái)將具體講解任務(wù)定時(shí)的設(shè)計(jì)。
2 Redis存儲(chǔ)設(shè)計(jì)
Topic的設(shè)計(jì)
Topic的定義有三部分組成,topic表示主題名稱,slotAmount表示消息存儲(chǔ)劃分的槽數(shù)量,topicType表示消息的類型。主題名稱是一個(gè)Topic的唯一標(biāo)示,相同主題名稱Topic的slotAmount和topicType一定是一樣的。消息存儲(chǔ)采用Redis的Sorted Set結(jié)構(gòu),為了支持大量消息的堆積,需要把消息分散存儲(chǔ)到很多個(gè)槽中,slotAmount表示該Topic消息存儲(chǔ)共使用的槽數(shù)量,槽數(shù)量一定需要是2的n次冪。在消息存儲(chǔ)的時(shí)候,采用對(duì)指定數(shù)據(jù)或者消息體哈希求余得到槽位置。
StoreQueue的設(shè)計(jì)
上圖中topic劃分了8個(gè)槽位,編號(hào)0-7。計(jì)算消息體對(duì)應(yīng)的CRC32值,CRC32值對(duì)槽數(shù)量進(jìn)行取模得到槽序號(hào),SlotKey設(shè)計(jì)為#{topic}_#{index}(也即Redis的鍵),其中#{}表示占位符。
StoreQueue結(jié)構(gòu)采用Redis的Sorted Set,Redis的Sorted Set中的數(shù)據(jù)按照分?jǐn)?shù)排序,實(shí)現(xiàn)定時(shí)消息的關(guān)鍵就在于如何利用分?jǐn)?shù)、如何添加消息到Sorted Set、如何從Sorted Set中彈出消息。定時(shí)消息將時(shí)間戳作為分?jǐn)?shù),消費(fèi)時(shí)每次彈出分?jǐn)?shù)大于當(dāng)前時(shí)間戳的一個(gè)消息。
PrepareQueue的設(shè)計(jì)
為了保障每條消息至少消費(fèi)一次,消費(fèi)者不是直接pop有序集合中的元素,而是將元素從StoreQueue移動(dòng)到PrepareQueue并返回消息給消費(fèi)者,等消費(fèi)成功后再?gòu)腜repareQueue從刪除,或者消費(fèi)失敗后從PreapreQueue重新移動(dòng)到StoreQueue,這便是根據(jù)二階段提交的思想實(shí)現(xiàn)的二階段消費(fèi)。
在后面將會(huì)詳細(xì)介紹二階段消費(fèi)的實(shí)現(xiàn)思路,這里重點(diǎn)介紹下PrepareQueue的存儲(chǔ)設(shè)計(jì)。StoreQueue中每一個(gè)Slot對(duì)應(yīng)PrepareQueue中的Slot,PrepareQueue的SlotKey設(shè)計(jì)為prepare_{#{topic}#{index}}。PrepareQueue采用Sorted Set作為存儲(chǔ),消息移動(dòng)到PrepareQueue時(shí)刻對(duì)應(yīng)的(秒級(jí)時(shí)間戳*1000+重試次數(shù))作為分?jǐn)?shù),字符串存儲(chǔ)的是消息體內(nèi)容。這里分?jǐn)?shù)的設(shè)計(jì)與重試次數(shù)的設(shè)計(jì)密切相關(guān),所以在重試次數(shù)設(shè)計(jì)章節(jié)詳細(xì)介紹。
PrepareQueue的SlotKey設(shè)計(jì)中需要注意的一點(diǎn),由于消息從StoreQueue移動(dòng)到PrepareQueue是通過(guò)Lua腳本操作的,因此需要保證Lua腳本操作的Slot在同一個(gè)Redis節(jié)點(diǎn)上,如何保證PrepareQueue的SlotKey和對(duì)應(yīng)的StoreQueue的SlotKey被hash到同一個(gè)Redis槽中呢。Redis的hash tag功能可以指定SlotKey中只有某一部分參與計(jì)算hash,這一部分采用{}包括,因此PrepareQueue的SlotKey中采用{}包括了StoreQueue的SlotKey。
DeadQueue的設(shè)計(jì)
消息重試消費(fèi)16次后,消息將進(jìn)入DeadQueue。DeadQueue的SlotKey設(shè)計(jì)為prepare{#{topic}#{index}},這里同樣采用hash tag功能保證DeadQueue的SlotKey與對(duì)應(yīng)StoreQueue的SlotKey存儲(chǔ)在同一Redis節(jié)點(diǎn)。
定時(shí)消息生產(chǎn)
生產(chǎn)者的任務(wù)就是將消息添加到StoreQueue中。首先,需要計(jì)算出消息添加到Redis的SlotKey,如果發(fā)送方指定了消息的slotBasis(否則采用content代替),則計(jì)算slotBasis的CRC32值,CRC32值對(duì)槽數(shù)量進(jìn)行取模得到槽序號(hào),SlotKey設(shè)計(jì)為#{topic}_#{index},其中#{}表示占位符。發(fā)送定時(shí)消息時(shí)需要設(shè)置actionTime,actionTime必須大于當(dāng)前時(shí)間,表示消費(fèi)時(shí)間戳,當(dāng)前時(shí)間大于該消費(fèi)時(shí)間戳的時(shí)候,消息才會(huì)被消費(fèi)。因此在存儲(chǔ)該類型消息的時(shí)候,采用actionTime作為分?jǐn)?shù),采用命令zadd添加到Redis。
超時(shí)消息消費(fèi)
每臺(tái)機(jī)器將啟動(dòng)多個(gè)Woker進(jìn)行超時(shí)消息消費(fèi),Woker即表示線程,定時(shí)消息被存儲(chǔ)到Redis的多個(gè)Slot中,因此需要zookeeper維護(hù)集群中Woker與slot的關(guān)系,一個(gè)Slot只分配給一個(gè)Woker進(jìn)行消費(fèi),一個(gè)Woker可以消費(fèi)多個(gè)Slot。Woker與Slot的關(guān)系在每臺(tái)機(jī)器啟動(dòng)與停止時(shí)重新分配,超時(shí)消息消費(fèi)集群監(jiān)聽(tīng)了zookeeper節(jié)點(diǎn)的變化。
Woker與Slot關(guān)系確定后,Woker則循環(huán)不斷地從Redis拉取訂閱的Slot中的超時(shí)消息。在StoreQueue存儲(chǔ)設(shè)計(jì)中說(shuō)明了定時(shí)消息存儲(chǔ)時(shí)采用Sorted Set結(jié)構(gòu),采用定時(shí)時(shí)間actionTime作為分?jǐn)?shù),因此定時(shí)消息按照時(shí)間大小存儲(chǔ)在Sorted Set中。因此在拉取超時(shí)消息進(jìn)行只需采用Redis命令ZRANGEBYSCORE彈出分?jǐn)?shù)小于當(dāng)前時(shí)間戳的一條消息。
為了保證系統(tǒng)的可用性,還需要考慮保證定時(shí)消息至少被消費(fèi)一次以及消費(fèi)的重試次數(shù),下面將具體介紹如何保證至少消費(fèi)一次和消費(fèi)重試次數(shù)控制。
?
?
至少消費(fèi)一次
至少消費(fèi)一次的問(wèn)題比較類似銀行轉(zhuǎn)賬問(wèn)題,A向B賬戶轉(zhuǎn)賬100元,如何保障A賬戶扣減100同時(shí)B賬戶增加100,因此我們可以想到二階段提交的思想。第一個(gè)準(zhǔn)備階段,A、B分別進(jìn)行資源凍結(jié)并持久化undo和redo日志,A、B分別告訴協(xié)調(diào)者已經(jīng)準(zhǔn)備好;第二個(gè)提交階段,協(xié)調(diào)者告訴A、B進(jìn)行提交,A、B分別提交事務(wù)。本方案基于二階段提交的思想來(lái)實(shí)現(xiàn)至少消費(fèi)一次。
Redis存儲(chǔ)設(shè)計(jì)中PrepareQueue的作用就是用來(lái)凍結(jié)資源并記錄事務(wù)日志,消費(fèi)者端即是參與者也是協(xié)調(diào)者。第一個(gè)準(zhǔn)備階段,消費(fèi)者端通過(guò)執(zhí)行Lua腳本從StoreQueue中Pop消息并存儲(chǔ)到PrepareQueue,同時(shí)消息傳輸?shù)较M(fèi)者端,消費(fèi)者端消費(fèi)該消息;第二個(gè)提交階段,消費(fèi)者端根據(jù)消費(fèi)結(jié)果是否成功協(xié)調(diào)消息隊(duì)列服務(wù)是提交還是回滾,如果消費(fèi)成功則提交事務(wù),該消息從PrepareQueue中刪除,如果消費(fèi)失敗則回滾事務(wù),消費(fèi)者端將該消息從PrepareQueue移動(dòng)到StoreQueue,如果因?yàn)楦鞣N異常導(dǎo)致PrepareQueue中消息滯留超時(shí),超時(shí)后將自動(dòng)執(zhí)行回滾操作。二階段消費(fèi)的流程圖如下所示。
?
?
消費(fèi)重試次數(shù)控制
采用二階段消費(fèi)方式,需要將消息在StoreQueue和PrepareQueue之間移動(dòng),如何實(shí)現(xiàn)重試次數(shù)控制呢,其關(guān)鍵在StoreQueue和PrepareQueue的分?jǐn)?shù)設(shè)計(jì)。
PrepareQueue的分?jǐn)?shù)需要與時(shí)間相關(guān),正常情況下,消費(fèi)者不管消費(fèi)失敗還是消費(fèi)成功,都會(huì)從PrepareQueue刪除消息,當(dāng)消費(fèi)者系統(tǒng)發(fā)生異常或者宕機(jī)的時(shí)候,消息就無(wú)法從PrepareQueue中刪除,我們也不知道消費(fèi)者是否消費(fèi)成功,為保障消息至少被消費(fèi)一次,我們需要做到超時(shí)回滾,因此分?jǐn)?shù)需要與消費(fèi)時(shí)間相關(guān)。當(dāng)PrepareQueue中的消息發(fā)生超時(shí)的時(shí)候,將消息從PrepareQueue移動(dòng)到StoreQueue。
因此PrepareQueue的分?jǐn)?shù)設(shè)計(jì)為:秒級(jí)時(shí)間戳*1000+重試次數(shù)。定時(shí)消息首次存儲(chǔ)到StoreQueue中的分?jǐn)?shù)表示消費(fèi)時(shí)間戳,如果消息消費(fèi)失敗,消息從PrepareQueue回滾到StoreQueue,定時(shí)消息存儲(chǔ)時(shí)的分?jǐn)?shù)都表示剩余重試次數(shù),剩余重試次數(shù)從16次不斷降低最后為0,消息進(jìn)入死信隊(duì)列。消息在StoreQueue和PrepareQueue之間移動(dòng)流程如下:
5 優(yōu)點(diǎn)
- 消費(fèi)低延遲:采用基于Redis的定時(shí)方案直接從Redis中pop超時(shí)任務(wù),避免掃描任務(wù)庫(kù),大大減少了延遲時(shí)間。
- 可控并發(fā)度:并發(fā)度取決于消息存儲(chǔ)的Slot數(shù)量以及集群Worker數(shù)量,這兩個(gè)數(shù)量都可以根據(jù)業(yè)務(wù)需要進(jìn)行調(diào)控,傳統(tǒng)方案中并發(fā)度為分庫(kù)分表的數(shù)量。
- 高性能:Redis單機(jī)的QPS可以達(dá)到10w,Redis集群的QPS可以達(dá)到更高的水平,本方案沒(méi)有復(fù)雜查詢,消費(fèi)過(guò)程中從Redis拉取超時(shí)消息的時(shí)間復(fù)雜度為O(1)。
- 高可用:至少消費(fèi)一次保障了定時(shí)消息一定被消費(fèi),重試次數(shù)控制保證消費(fèi)不被阻塞。
原文鏈接
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的一种低延迟的超时中心实现方式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 排查指南 | 当 mPaaS 小程序提示
- 下一篇: 为了让盲人也能追剧,优酷做了哪些努力?