日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

聊一聊顺序消息(RocketMQ顺序消息的实现机制)

發(fā)布時(shí)間:2023/12/4 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊一聊顺序消息(RocketMQ顺序消息的实现机制) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文來(lái)自:https://www.cnblogs.com/hzmark/p/orderly_message.html

當(dāng)我們說(shuō)順序時(shí),我們?cè)谡f(shuō)什么?

日常思維中,順序大部分情況會(huì)和時(shí)間關(guān)聯(lián)起來(lái),即時(shí)間的先后表示事件的順序關(guān)系。

比如事件A發(fā)生在下午3點(diǎn)一刻,而事件B發(fā)生在下午4點(diǎn),那么我們認(rèn)為事件A發(fā)生在事件B之前,他們的順序關(guān)系為先A后B。

上面的例子之所以成立是因?yàn)樗麄冇邢嗤膮⒖枷?#xff0c;即他們的時(shí)間是對(duì)應(yīng)的同一個(gè)物理時(shí)鐘的時(shí)間。如果A發(fā)生的時(shí)間是北京時(shí)間,而B依賴的時(shí)間是東京時(shí)間,那么先A后B的順序關(guān)系還成立嗎?

如果沒(méi)有一個(gè)絕對(duì)的時(shí)間參考,那么A和B之間還有順序嗎,或者說(shuō)怎么斷定A和B的順序?

顯而易見(jiàn)的,如果A、B兩個(gè)事件之間如果是有因果關(guān)系的,那么A一定發(fā)生在B之前(前因后果,有因才有果)。相反,在沒(méi)有一個(gè)絕對(duì)的時(shí)間的參考的情況下,若A、B之間沒(méi)有因果關(guān)系,那么A、B之間就沒(méi)有順序關(guān)系。

那么,我們?cè)谡f(shuō)順序時(shí),其實(shí)說(shuō)的是

  • 有絕對(duì)時(shí)間參考的情況下,事件的發(fā)生時(shí)間的關(guān)系;

  • 和沒(méi)有時(shí)間參考下的,一種由因果關(guān)系推斷出來(lái)的happening before的關(guān)系;

在分布式環(huán)境中討論順序
當(dāng)把順序放到分布式環(huán)境(多線程、多進(jìn)程都可以認(rèn)為是一個(gè)分布式的環(huán)境)中去討論時(shí):

  • 同一線程上的事件順序是確定的,可以認(rèn)為他們有相同的時(shí)間作為參考

  • 不同線程間的順序只能通過(guò)因果關(guān)系去推斷


(點(diǎn)表示事件,波浪線箭頭表示事件間的消息)

上圖中,進(jìn)程P中的事件順序?yàn)閜1->p2->p3->p4(時(shí)間推斷)。而因?yàn)閜1給進(jìn)程Q的q2發(fā)了消息,那么p1一定在q2之前(因果推斷)。但是無(wú)法確定p1和q1之間的順序關(guān)系。

推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會(huì)透徹的分析分布式系統(tǒng)中的順序問(wèn)題。

消息中間件中的順序消息

什么是順序消息

有了上述的基礎(chǔ)之后,我們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。

順序消息(FIFO 消息)是 MQ 提供的一種嚴(yán)格按照順序進(jìn)行發(fā)布和消費(fèi)的消息類型。順序消息由兩個(gè)部分組成:順序發(fā)布和順序消費(fèi)。

順序消息包含兩種類型:

分區(qū)順序:一個(gè)Partition內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)

全局順序:一個(gè)Topic內(nèi)所有的消息按照先進(jìn)先出的順序進(jìn)行發(fā)布和消費(fèi)

這是阿里云上對(duì)順序消息的定義,把順序消息拆分成了順序發(fā)布和順序消費(fèi)。那么多線程中發(fā)送消息算不算順序發(fā)布?

如上一部分介紹的,多線程中若沒(méi)有因果關(guān)系則沒(méi)有順序。那么用戶在多線程中去發(fā)消息就意味著用戶不關(guān)心那些在不同線程中被發(fā)送的消息的順序。即多線程發(fā)送的消息,不同線程間的消息不是順序發(fā)布的,同一線程的消息是順序發(fā)布的。這是需要用戶自己去保障的。

而對(duì)于順序消費(fèi),則需要保證哪些來(lái)自同一個(gè)發(fā)送線程的消息在消費(fèi)時(shí)是按照相同的順序被處理的(為什么不說(shuō)他們應(yīng)該在一個(gè)線程中被消費(fèi)呢?)。

全局順序其實(shí)是分區(qū)順序的一個(gè)特例,即使Topic只有一個(gè)分區(qū)(以下不在討論全局順序,因?yàn)槿猪樞驅(qū)⒚媾R性能的問(wèn)題,而且絕大多數(shù)場(chǎng)景都不需要全局順序)。

如何保證順序

在MQ的模型中,順序需要由3個(gè)階段去保障:

  • 消息被發(fā)送時(shí)保持順序

  • 消息被存儲(chǔ)時(shí)保持和發(fā)送的順序一致

  • 消息被消費(fèi)時(shí)保持和存儲(chǔ)的順序一致

  • 發(fā)送時(shí)保持順序意味著對(duì)于有順序要求的消息,用戶應(yīng)該在同一個(gè)線程中采用同步的方式發(fā)送。存儲(chǔ)保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來(lái)的消息A和B,存儲(chǔ)時(shí)在空間上A一定在B之前。而消費(fèi)保持和存儲(chǔ)一致則要求消息A、B到達(dá)Consumer之后必須按照先A后B的順序被處理。

    如下圖所示:

    對(duì)于兩個(gè)訂單的消息的原始數(shù)據(jù):a1、b1、b2、a2、a3、b3(絕對(duì)時(shí)間下發(fā)生的順序):

    • 在發(fā)送時(shí),a訂單的消息需要保持a1、a2、a3的順序,b訂單的消息也相同,但是a、b訂單之間的消息沒(méi)有順序關(guān)系,這意味著a、b訂單的消息可以在不同的線程中被發(fā)送出去

    • 在存儲(chǔ)時(shí),需要分別保證a、b訂單的消息的順序,但是a、b訂單之間的消息的順序可以不保證

      • a1、b1、b2、a2、a3、b3是可以接受的

      • a1、a2、b1、b2、a3、b3也是可以接受的

      • a1、a3、b1、b2、a2、b3是不能接受的

    • 消費(fèi)時(shí)保證順序的簡(jiǎn)單方式就是“什么都不做”,不對(duì)收到的消息的順序進(jìn)行調(diào)整,即只要一個(gè)分區(qū)的消息只由一個(gè)線程處理即可;當(dāng)然,如果a、b在一個(gè)分區(qū)中,在收到消息后也可以將他們拆分到不同線程中處理,不過(guò)要權(quán)衡一下收益

    開源RocketMQ中順序的實(shí)現(xiàn)


    上圖是RocketMQ順序消息原理的介紹,將不同訂單的消息路由到不同的分區(qū)中。文檔只是給出了Producer順序的處理,Consumer消費(fèi)時(shí)通過(guò)一個(gè)分區(qū)只能有一個(gè)線程消費(fèi)的方式來(lái)保證消息順序,具體實(shí)現(xiàn)如下。

    Producer端

    Producer端確保消息順序唯一要做的事情就是將消息路由到特定的分區(qū),在RocketMQ中,通過(guò)MessageQueueSelector來(lái)實(shí)現(xiàn)分區(qū)的選擇。

    • List mqs:消息要發(fā)送的Topic下所有的分區(qū)

    • Message msg:消息對(duì)象

    • 額外的參數(shù):用戶可以傳遞自己的參數(shù)

    比如如下實(shí)現(xiàn)就可以保證相同的訂單的消息被路由到相同的分區(qū):

    long orderId = ((Order) object).getOrderId; return mqs.get(orderId % mqs.size());

    Consumer端

    RocketMQ消費(fèi)端有兩種類型:MQPullConsumer和MQPushConsumer。

    MQPullConsumer由用戶控制線程,主動(dòng)從服務(wù)端獲取消息,每次獲取到的是一個(gè)MessageQueue中的消息。PullResult中的List msgFoundList自然和存儲(chǔ)順序一致,用戶需要再拿到這批消息后自己保證消費(fèi)的順序。

    對(duì)于PushConsumer,由用戶注冊(cè)MessageListener來(lái)消費(fèi)消息,在客戶端中需要保證調(diào)用MessageListener時(shí)消息的順序性。RocketMQ中的實(shí)現(xiàn)如下:

  • PullMessageService單線程的從Broker獲取消息

  • PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個(gè)消息的緩存),之后提交一個(gè)消費(fèi)任務(wù)到ConsumeMessageOrderService

  • ConsumeMessageOrderService多線程執(zhí)行,每個(gè)線程在消費(fèi)消息時(shí)需要拿到MessageQueue的鎖

  • 拿到鎖之后從ProcessQueue中獲取消息

  • 保證消費(fèi)順序的核心思想是:

    • 獲取到消息后添加到ProcessQueue中,單線程執(zhí)行,所以ProcessQueue中的消息是順序的

    • 提交的消費(fèi)任務(wù)時(shí)提交的是“對(duì)某個(gè)MQ進(jìn)行一次消費(fèi)”,這次消費(fèi)請(qǐng)求是從ProcessQueue中獲取消息消費(fèi),所以也是順序的(無(wú)論哪個(gè)線程獲取到鎖,都是按照ProcessQueue中消息的順序進(jìn)行消費(fèi))

    順序和異常的關(guān)系

    順序消息需要Producer和Consumer都保證順序。Producer需要保證消息被路由到正確的分區(qū),消息需要保證每個(gè)分區(qū)的數(shù)據(jù)只有一個(gè)線程消息,那么就會(huì)有一些缺陷:

    • 發(fā)送順序消息無(wú)法利用集群的Failover特性,因?yàn)椴荒芨鼡QMessageQueue進(jìn)行重試

    • 因?yàn)榘l(fā)送的路由策略導(dǎo)致的熱點(diǎn)問(wèn)題,可能某一些MessageQueue的數(shù)據(jù)量特別大

    • 消費(fèi)的并行讀依賴于分區(qū)數(shù)量

    • 消費(fèi)失敗時(shí)無(wú)法跳過(guò)

    不能更換MessageQueue重試就需要MessageQueue有自己的副本,通過(guò)Raft、Paxos之類的算法保證有可用的副本,或者通過(guò)其他高可用的存儲(chǔ)設(shè)備來(lái)存儲(chǔ)MessageQueue。

    熱點(diǎn)問(wèn)題好像沒(méi)有什么好的解決辦法,只能通過(guò)拆分MessageQueue和優(yōu)化路由方法來(lái)盡量均衡的將消息分配到不同的MessageQueue。

    消費(fèi)并行度理論上不會(huì)有太大問(wèn)題,因?yàn)镸essageQueue的數(shù)量可以調(diào)整。

    消費(fèi)失敗的無(wú)法跳過(guò)是不可避免的,因?yàn)樘^(guò)可能導(dǎo)致后續(xù)的數(shù)據(jù)處理都是錯(cuò)誤的。不過(guò)可以提供一些策略,由用戶根據(jù)錯(cuò)誤類型來(lái)決定是否跳過(guò),并且提供重試隊(duì)列之類的功能,在跳過(guò)之后用戶可以在“其他”地方重新消費(fèi)到這條消息。

    總結(jié)

    以上是生活随笔為你收集整理的聊一聊顺序消息(RocketMQ顺序消息的实现机制)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。