聊一聊顺序消息(RocketMQ顺序消息的实现机制)
本文來(lái)自:https://www.cnblogs.com/hzmark/p/orderly_message.html
當(dāng)我們說(shuō)順序時(shí),我們?cè)谡f(shuō)什么?
日常思維中,順序大部分情況會(huì)和時(shí)間關(guān)聯(lián)起來(lái),即時(shí)間的先后表示事件的順序關(guān)系。
比如事件A發(fā)生在下午3點(diǎn)一刻,而事件B發(fā)生在下午4點(diǎn),那么我們認(rèn)為事件A發(fā)生在事件B之前,他們的順序關(guān)系為先A后B。
上面的例子之所以成立是因?yàn)樗麄冇邢嗤膮⒖枷?#xff0c;即他們的時(shí)間是對(duì)應(yīng)的同一個(gè)物理時(shí)鐘的時(shí)間。如果A發(fā)生的時(shí)間是北京時(shí)間,而B依賴的時(shí)間是東京時(shí)間,那么先A后B的順序關(guān)系還成立嗎?
如果沒(méi)有一個(gè)絕對(duì)的時(shí)間參考,那么A和B之間還有順序嗎,或者說(shuō)怎么斷定A和B的順序?
顯而易見(jiàn)的,如果A、B兩個(gè)事件之間如果是有因果關(guān)系的,那么A一定發(fā)生在B之前(前因后果,有因才有果)。相反,在沒(méi)有一個(gè)絕對(duì)的時(shí)間的參考的情況下,若A、B之間沒(méi)有因果關(guān)系,那么A、B之間就沒(méi)有順序關(guān)系。
那么,我們?cè)谡f(shuō)順序時(shí),其實(shí)說(shuō)的是:
-
有絕對(duì)時(shí)間參考的情況下,事件的發(fā)生時(shí)間的關(guān)系;
-
和沒(méi)有時(shí)間參考下的,一種由因果關(guān)系推斷出來(lái)的happening before的關(guān)系;
在分布式環(huán)境中討論順序
當(dāng)把順序放到分布式環(huán)境(多線程、多進(jìn)程都可以認(rèn)為是一個(gè)分布式的環(huán)境)中去討論時(shí):
-
同一線程上的事件順序是確定的,可以認(rèn)為他們有相同的時(shí)間作為參考
-
不同線程間的順序只能通過(guò)因果關(guān)系去推斷
(點(diǎn)表示事件,波浪線箭頭表示事件間的消息)
上圖中,進(jìn)程P中的事件順序?yàn)閜1->p2->p3->p4(時(shí)間推斷)。而因?yàn)閜1給進(jìn)程Q的q2發(fā)了消息,那么p1一定在q2之前(因果推斷)。但是無(wú)法確定p1和q1之間的順序關(guān)系。
推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會(huì)透徹的分析分布式系統(tǒng)中的順序問(wèn)題。
消息中間件中的順序消息
什么是順序消息
有了上述的基礎(chǔ)之后,我們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。
順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)。
順序消息包含兩種類型:
分區(qū)順序:一個(gè)Partition內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)
全局順序:一個(gè)Topic內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)
這是阿里云上對(duì)順序消息的定義,把順序消息拆分成了順序發(fā)布和順序消費(fèi)。那么多線程中發(fā)送消息算不算順序發(fā)布?
如上一部分介紹的,多線程中若沒(méi)有因果關(guān)系則沒(méi)有順序。那么用戶在多線程中去發(fā)消息就意味著用戶不關(guān)心那些在不同線程中被發(fā)送的消息的順序。即多線程發(fā)送的消息,不同線程間的消息不是順序發(fā)布的,同一線程的消息是順序發(fā)布的。這是需要用戶自己去保障的。
而對(duì)于順序消費(fèi),則需要保證哪些來(lái)自同一個(gè)發(fā)送線程的消息在消費(fèi)時(shí)是按照相同的順序被處理的(為什么不說(shuō)他們應(yīng)該在一個(gè)線程中被消費(fèi)呢?)。
全局順序其實(shí)是分區(qū)順序的一個(gè)特例,即使Topic只有一個(gè)分區(qū)(以下不在討論全局順序,因?yàn)槿猪樞驅(qū)⒚媾R性能的問(wèn)題,而且絕大多數(shù)場(chǎng)景都不需要全局順序)。
如何保證順序
在MQ的模型中,順序需要由3個(gè)階段去保障:
消息被發(fā)送時(shí)保持順序
消息被存儲(chǔ)時(shí)保持和發(fā)送的順序一致
消息被消費(fèi)時(shí)保持和存儲(chǔ)的順序一致
發(fā)送時(shí)保持順序意味著對(duì)于有順序要求的消息,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲(chǔ)保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來(lái)的消息A和B,存儲(chǔ)時(shí)在空間上A一定在B之前。而消費(fèi)保持和存儲(chǔ)一致則要求消息A、B到達(dá)Consumer之后必須按照先A后B的順序被處理。
如下圖所示:
對(duì)于兩個(gè)訂單的消息的原始數(shù)據(jù):a1、b1、b2、a2、a3、b3(絕對(duì)時(shí)間下發(fā)生的順序):
-
在發(fā)送時(shí),a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒(méi)有順序關(guān)系,這意味著a、b訂單的消息可以在不同的線程中被發(fā)送出去
-
在存儲(chǔ)時(shí),需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證
-
a1、b1、b2、a2、a3、b3是可以接受的
-
a1、a2、b1、b2、a3、b3也是可以接受的
-
a1、a3、b1、b2、a2、b3是不能接受的
-
-
消費(fèi)時(shí)保證順序的簡(jiǎn)單方式就是“什么都不做”,不對(duì)收到的消息的順序進(jìn)行調(diào)整,即只要一個(gè)分區(qū)的消息只由一個(gè)線程處理即可;當(dāng)然,如果a、b在一個(gè)分區(qū)中,在收到消息后也可以將他們拆分到不同線程中處理,不過(guò)要權(quán)衡一下收益
開源RocketMQ中順序的實(shí)現(xiàn)
上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區(qū)中。文檔只是給出了Producer順序的處理,Consumer消費(fèi)時(shí)通過(guò)一個(gè)分區(qū)只能有一個(gè)線程消費(fèi)的方式來(lái)保證消息順序,具體實(shí)現(xiàn)如下。
Producer端
Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區(qū),在RocketMQ中,通過(guò)MessageQueueSelector來(lái)實(shí)現(xiàn)分區(qū)的選擇。
-
List mqs:消息要發(fā)送的Topic下所有的分區(qū)
-
Message msg:消息對(duì)象
-
額外的參數(shù):用戶可以傳遞自己的參數(shù)
比如如下實(shí)現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):
long orderId = ((Order) object).getOrderId; return mqs.get(orderId % mqs.size());Consumer端
RocketMQ消費(fèi)端有兩種類型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用戶控制線程,主動(dòng)從服務(wù)端獲取消息,每次獲取到的是一個(gè)MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲(chǔ)順序一致,用戶需要再拿到這批消息后自己保證消費(fèi)的順序。
對(duì)于PushConsumer,由用戶注冊(cè)MessageListener來(lái)消費(fèi)消息,在客戶端中需要保證調(diào)用MessageListener時(shí)消息的順序性。RocketMQ中的實(shí)現(xiàn)如下:
PullMessageService單線程的從Broker獲取消息
PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個(gè)消息的緩存),之后提交一個(gè)消費(fèi)任務(wù)到ConsumeMessageOrderService
ConsumeMessageOrderService多線程執(zhí)行,每個(gè)線程在消費(fèi)消息時(shí)需要拿到MessageQueue的鎖
拿到鎖之后從ProcessQueue中獲取消息
保證消費(fèi)順序的核心思想是:
-
獲取到消息后添加到ProcessQueue中,單線程執(zhí)行,所以ProcessQueue中的消息是順序的
-
提交的消費(fèi)任務(wù)時(shí)提交的是“對(duì)某個(gè)MQ進(jìn)行一次消費(fèi)”,這次消費(fèi)請(qǐng)求是從ProcessQueue中獲取消息消費(fèi),所以也是順序的(無(wú)論哪個(gè)線程獲取到鎖,都是按照ProcessQueue中消息的順序進(jìn)行消費(fèi))
順序和異常的關(guān)系
順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區(qū),消息需要保證每個(gè)分區(qū)的數(shù)據(jù)只有一個(gè)線程消息,那么就會(huì)有一些缺陷:
-
發(fā)送順序消息無(wú)法利用集群的Failover特性,因?yàn)椴荒芨鼡QMessageQueue進(jìn)行重試
-
因?yàn)榘l(fā)送的路由策略導(dǎo)致的熱點(diǎn)問(wèn)題,可能某一些MessageQueue的數(shù)據(jù)量特別大
-
消費(fèi)的并行讀依賴于分區(qū)數(shù)量
-
消費(fèi)失敗時(shí)無(wú)法跳過(guò)
不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過(guò)Raft、Paxos之類的算法保證有可用的副本,或者通過(guò)其他高可用的存儲(chǔ)設(shè)備來(lái)存儲(chǔ)MessageQueue。
熱點(diǎn)問(wèn)題好像沒(méi)有什么好的解決辦法,只能通過(guò)拆分MessageQueue和優(yōu)化路由方法來(lái)盡量均衡的將消息分配到不同的MessageQueue。
消費(fèi)并行度理論上不會(huì)有太大問(wèn)題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整。
消費(fèi)失敗的無(wú)法跳過(guò)是不可避免的,因?yàn)樘^(guò)可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯(cuò)誤的。不過(guò)可以提供一些策略,由用戶根據(jù)錯(cuò)誤類型來(lái)決定是否跳過(guò),并且提供重試隊(duì)列之類的功能,在跳過(guò)之后用戶可以在“其他”地方重新消費(fèi)到這條消息。
總結(jié)
以上是生活随笔為你收集整理的聊一聊顺序消息(RocketMQ顺序消息的实现机制)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 速效救心丸能随便吃吗
- 下一篇: RocketMq重试及消息不丢失机制