日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

QMQ顺序消息设计与实现

發(fā)布時(shí)間:2025/3/21 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 QMQ顺序消息设计与实现 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

背景

在MQ里,順序消息的意思是消費(fèi)消息的順序和消息發(fā)送時(shí)(單機(jī)發(fā)送)的順序保持一致。比如ProducerA按照順序發(fā)送msga, msgb, msgc三條消息,那么consumer消費(fèi)的時(shí)候也應(yīng)該按照msga, msgb, msgc來消費(fèi)。

對于順序消息,在我們實(shí)際使用中發(fā)現(xiàn),大部分業(yè)務(wù)系統(tǒng)并不需要或者并不依賴MQ提供的順序機(jī)制,這些業(yè)務(wù)本身往往就能處理無序的消息,比如很多系統(tǒng)中都有狀態(tài)機(jī),是否消費(fèi)消息必須根據(jù)狀態(tài)機(jī)當(dāng)前的狀態(tài)。

但是在一些場景中順序消息也有其必要性:比如日志收集和依賴binlog同步驅(qū)動(dòng)業(yè)務(wù)等。就這兩個(gè)場景而言,同樣是順序消息但對順序的需求卻不一定一樣:比如日志收集中我們一般認(rèn)為對順序的要求比較弱,即絕大多數(shù)時(shí)是有序即可,遇到一些極端情況,比如Server宕機(jī),容量調(diào)整的時(shí)候我們可以暫時(shí)容忍一些無序。但是對于一個(gè)依賴MySQL binlog同步來驅(qū)動(dòng)的業(yè)務(wù),短暫的無序都將會(huì)導(dǎo)致整個(gè)業(yè)務(wù)的錯(cuò)亂。

分析現(xiàn)有的一些MQ后發(fā)現(xiàn),它們并不能在所有情況下提供可靠的順序支持?,F(xiàn)在市面上的MQ基本上都是以partition - based模型來提供順序支持。我們以Kafka為例:topic分為一個(gè)或多個(gè)partition,partition可以理解為一個(gè)順序文件,producer發(fā)送消息的時(shí)候,按照一定的策略選擇partition,比如partition = hash(key) % partition num來選擇該消息發(fā)送給哪個(gè)partition,那么具有相同key的消息就會(huì)落到相同的partition上,而consumer消費(fèi)的時(shí)候一個(gè)consumer獨(dú)占地綁定在一個(gè)partition上。這樣一來,消息就是順序消費(fèi)的了:

但是這種模型存在一些問題:

  • partition的個(gè)數(shù)就是消費(fèi)的并行度,那么如果現(xiàn)在consumer處理不過來需要增加consumer則需要對應(yīng)地增加partition。而根據(jù)上面的描述partition的個(gè)數(shù)一旦改變,則順序?qū)o法保證(partition = hash(key) % partition num 公式里partition num發(fā)生了改變,則選擇的partition也會(huì)發(fā)生變化)。

    所以我們一般在業(yè)務(wù)上線之前,就要做出合理的容量規(guī)劃,預(yù)先創(chuàng)建出足夠的partition,但有的時(shí)候容量規(guī)劃是困難的,實(shí)踐中往往是預(yù)先分配大量的partition,比如幾百甚至幾千,然而大量的partition對性能以及運(yùn)維都帶來麻煩。

  • 擴(kuò)容partition后,如果高峰期已過,想進(jìn)行縮容則基本上不可行(比如Kafka就不允許減少partition),除了縮容帶來順序變化外,還有一點(diǎn)是怎么保證被縮容的partition上的消息已經(jīng)完全消費(fèi)完成了呢?

  • partition的移動(dòng)問題,partition如果分配在某臺broker上之后再移動(dòng)就很麻煩,一旦這臺broker容量不足,需要進(jìn)行負(fù)載均衡就很困難了,這可能需要在不同的機(jī)器上傳輸大量的數(shù)據(jù)。

  • 對可用性的挑戰(zhàn),順序發(fā)送的時(shí)候某個(gè)key的消息必須總是發(fā)送給指定的partition的,如果一旦某臺server掛掉,或者正常的停機(jī)維護(hù),那么位于這臺server的partition就不能收消息了,但是也不能發(fā)送給其他partition,否則順序就會(huì)錯(cuò)亂。

    雖然我們可以通過多副本機(jī)制(Replication)來確保即使該partition所在機(jī)器出現(xiàn)故障時(shí)候仍然有其他副本提供服務(wù),但是一般選舉出一個(gè)新的副本通常需要花費(fèi)幾秒到幾分鐘不等(比如早期的Kafka版本Leader遷移是串行執(zhí)行的,在分區(qū)特別多的時(shí)候,選舉出新的leader可能需要分鐘級時(shí)間),在此期間發(fā)送到該partition的所有消息都無法發(fā)送。

  • 堆積問題,如果預(yù)分配時(shí)候的partition過少,這個(gè)時(shí)候堆積了大量的消息,那么即使擴(kuò)容也沒有辦法了:

  • 所以我們認(rèn)為現(xiàn)有的一些所謂順序消息機(jī)制并不是簡單可依賴的。你以為MQ給你提供了順序保障,但實(shí)際上在一些時(shí)候并不是這樣,那么這個(gè)時(shí)候使用方為了應(yīng)對這種異常情況就需要做出各種應(yīng)對措施,增加了使用的復(fù)雜度。而我們希望提供一種簡單可依賴的順序消息,也就是使用方可以放心的將順序保證交給MQ。

    方案設(shè)計(jì)

    首先我們來分析無法保證順序的根源是什么。我們選擇partition所使用的公式是 partition = hash(key) % partition num。正是因?yàn)閜artition num發(fā)生了變化導(dǎo)致公式的結(jié)果發(fā)生了變化,進(jìn)而打破了順序保證。

    其實(shí)對于這個(gè)公式我們可能并不陌生,除了在MQ中使用,我們在數(shù)據(jù)庫分庫分表中往往也有這種套路。

    在數(shù)據(jù)庫分庫分表中我們會(huì)通過一個(gè)分區(qū)鍵計(jì)算其分區(qū),然后得到表名或庫名(如下偽代碼所示,user_id是分表鍵,總共分為100張表):

    而且在分庫分表中前期因?yàn)闃I(yè)務(wù)量不大,我們往往不會(huì)分很多庫(或者我們也分了多個(gè)庫,但是這些庫都落在相同的機(jī)器上),但是為了后期添加分庫方便(擴(kuò)容)我們會(huì)預(yù)先分出很多表。比如我們前期分成100張表,但是這100張表都在相同的庫里,待到業(yè)務(wù)增長之后,單庫無法支撐,我們會(huì)將100張表劃分到不同的DB里。

    比如我們將表0 - 50落在DB1, 50 - 100落到DB2,這樣我們的處理能力就翻倍了,但是因?yàn)槌绦蚶镞€是按照100進(jìn)行分表的,所以對應(yīng)用沒有感知。

    這種機(jī)制相當(dāng)于引入了一個(gè)中間層,程序面對的是的分表,最后這個(gè)表是落在什么DB上通過中間層進(jìn)行映射過去就可以了。

    那么其實(shí)我們是可以借鑒這種思路應(yīng)用在MQ的擴(kuò)容縮容中的。為此我們引入了logic partition的概念。也就是Producer發(fā)送消息的時(shí)候,我們并不決定它發(fā)送到哪個(gè)具體的Server上的具體的partition里(后文將其稱之為物理partition, physical partition)。我們只是先得到logic partition,使用這個(gè)計(jì)算公式: logic partition = hash(key) % logic partition num。而logic partition num我們會(huì)固定住,永不改變。比如我們將logic partition num固定為1000。但是這里跟分庫分表中的分1000張表不同,logic partition僅僅是邏輯上的,不存在任何存儲實(shí)體,所以即使分配的再大也沒有性能上的開銷。計(jì)算得到logic partition后,我們根據(jù)logic partition的映射再來決定該消息應(yīng)該落到具體哪個(gè)physical partition上。我們會(huì)根據(jù)logic partition的范圍進(jìn)行映射,比如logic partition 0 - 500 映射到 physcial partition 1上,500 - 1000 映射到physcial partition 2上。

    接下來我們來看看這種措施如何應(yīng)對本文開頭所提出的一序列問題呢:

  • 擴(kuò)容 在這里擴(kuò)容其實(shí)就是對physical partition的分裂過程。比如開始時(shí)我們創(chuàng)建了兩個(gè)分區(qū): physical partition 1, physical partition 2,因?yàn)橄M(fèi)不過來,我們要將physcial partiton 1擴(kuò)容,那么我們將會(huì)得到 logic partiton 0 - 250 映射到physical partition 3,logic partition 250 - 500 映射到physical partition 4(注:范圍的分裂不一定是平均的,比如我們也可以按照[0 - 200)和[200 - 500)進(jìn)行劃分 )。

  • 縮容 縮容其實(shí)就是對physical partition的合并過程,我們將physical partiton 3和physical partition 4合并得到physical partition 5。那么現(xiàn)在logic partiton 0 - 500就映射到physical partition 5。

  • 負(fù)載均衡 負(fù)載均衡其實(shí)就是logic partition到physical partiton的重新映射過程。也就是原來0 - 500 映射到 physical partition 5,現(xiàn)在我們將其映射到physical partition 6,而physical partition 6可以分配在一臺空閑的Server上。不僅如此,重新映射也可以解決可用性問題:一臺server停機(jī)維護(hù)時(shí)將落在上面的logic partition進(jìn)行重新映射,分配到另外一臺Server上即可,這樣我們就可以打造Always writtable ordered message queue。

  • 這里借鑒分庫分表中的預(yù)先分表的方法,提出logic partition的抽象層解決物理partition擴(kuò)容縮容時(shí)無法保證順序的問題。但是實(shí)際實(shí)現(xiàn)時(shí)候我們會(huì)發(fā)現(xiàn)MQ的這種logic partition分法要比數(shù)據(jù)庫中分表復(fù)雜得多。因?yàn)镸Q是的消費(fèi)是持續(xù)性的,也就是我可以讀取歷史數(shù)據(jù)。數(shù)據(jù)庫中分庫分表一旦調(diào)整之后,那么它呈現(xiàn)的就是最終視圖,而MQ里昨天我們可能還只有一個(gè)physical partition,今天我們劃分為兩個(gè),那么我們消費(fèi)昨天的數(shù)據(jù)和今天的數(shù)據(jù)的時(shí)候如何進(jìn)行無縫的切換呢?

    我們先簡單總結(jié)一下上面對擴(kuò)容縮容移動(dòng)的描述:

    • 擴(kuò)容即對physical partition按照logic partition的范圍進(jìn)行分裂的過程

    • 縮容即按照logic partition的范圍對physical partition進(jìn)行合并的過程

    • 移動(dòng)即改變logic partition與physical partition的映射的過程

    雖然我們從Database的分庫分表思想中學(xué)習(xí)到了logic partition,但是Message Queue和Database究竟是兩種不同的模型。在DB里,reader是無狀態(tài)的,也就是每次讀取傳入的查詢條件都是獨(dú)立的。而MQ的reader(consumer)當(dāng)前的讀取位置(offset)是依賴上次的讀取位置,一旦partition發(fā)生改變,則這個(gè)offset將無法繼續(xù)保持,那消費(fèi)就會(huì)錯(cuò)亂了,順序也無從談起。另外因?yàn)閿?shù)據(jù)量太大,我們在執(zhí)行擴(kuò)容縮容移動(dòng)的時(shí)候并不想對數(shù)據(jù)進(jìn)行移動(dòng)。

    接下來以實(shí)際的例子來進(jìn)行說明,下面是一個(gè)擴(kuò)容的實(shí)例。order.changed這個(gè)主題,原來分配了P1, P2兩個(gè)分區(qū),現(xiàn)在因?yàn)槿萘坎粔?#xff0c;需要對P2進(jìn)行擴(kuò)容(分裂)。也就是將physical partition P2進(jìn)行分裂,分裂成P3, P4兩個(gè)分區(qū)。分裂的原則是按照logic partition的范圍進(jìn)行,logic partition [500, 1000)原來映射到P1,現(xiàn)在logic partition [500, 750)映射到P3, [750, 1000)映射到P4。也就是分裂以后producer發(fā)送新的消息就會(huì)按照新的映射關(guān)系將消息append到P1, P3或P4,P2不再接收新的消息了。

    接下來具體描述一下實(shí)現(xiàn)步驟。在QMQ里有個(gè)metaserver的組件,它管理所有元數(shù)據(jù)信息,比如某topic分配到哪些partition上(我們將其稱之為路由):

    metaserver還管理partition分配在哪些server上,以及l(fā)ogic partition與physical partition的映射關(guān)系。

    在需要對P2進(jìn)行分裂的時(shí)候,metaserver會(huì)發(fā)送一條消息給P2所在的server,這條消息會(huì)被append到P2上,該消息稱之為指令消息(command message),對客戶端不可見,也就是業(yè)務(wù)代碼不會(huì)消費(fèi)到這條消息。P2收到這條指令消息后將不再接收新的消息了,所有業(yè)務(wù)消息均被拒絕,那么這條指令消息就是P2上的最后一條消息,相當(dāng)將P2關(guān)閉了。

    metaserver發(fā)送完指令消息后會(huì)變更對應(yīng)topic的路由信息:

    注意看上面的表格的特點(diǎn),這個(gè)路由信息表與眾不同的地方在于它有一個(gè)version字段。對于producer而言它總是獲取最新版本的路由信息,也就是路由發(fā)生變更后,producer就會(huì)獲得更高版本的路由信息,然后向這些分區(qū)上發(fā)送消息。

    但是對于consumer來講,它必須將前面的消息消費(fèi)完成才能消費(fèi)后面的,否則順序就亂了。比如前面分裂的示例,P2分裂為P3, P4了,這個(gè)時(shí)候P3, P4并不是立即對consumer可見的(只要對consumer不可見,就沒有consumer來消費(fèi)它)。只有當(dāng)consumer消費(fèi)到指令消息時(shí),才會(huì)觸發(fā)consumer的路由變更。并且指令消息里攜帶了路由的版本信息,假設(shè)路由已經(jīng)發(fā)生了多次變更,consumer消費(fèi)到某個(gè)指令消息的時(shí)候,只會(huì)將consumer的路由變更到該指令的下一個(gè)版本,而不會(huì)跳到其他版本,這里觸發(fā)路由變更的時(shí)候會(huì)使用樂觀鎖去更新版本(偽代碼):

    總結(jié)起來就是producer總是使用最新版本的路由,而consumer使用指定版本的路由,路由的版本由指令消息進(jìn)行同步。

    其實(shí)這個(gè)流程中最有趣的不是擴(kuò)容(分裂)和縮容(合并),而是移動(dòng)。比如我們現(xiàn)在發(fā)現(xiàn)P4分區(qū)所在機(jī)器負(fù)載比較高或磁盤就要滿了,現(xiàn)在給集群加了幾臺機(jī)器,怎么做能在繼續(xù)保持順序的基礎(chǔ)上又能將負(fù)載分散過去呢?那么只需要發(fā)送一個(gè)移動(dòng)的指令消息給P4,然后P4就會(huì)關(guān)閉,然后變更路由,order.changed的路由現(xiàn)在是P1, P3, P5,這次路由變更分區(qū)的個(gè)數(shù)沒有發(fā)生改變,改變的只是logic partition和physical partition的映射關(guān)系:

    因?yàn)镻5是新分區(qū),所以他可以分配在新機(jī)器上了。而且這個(gè)特性可以用在提高順序消息的可用性上,比如需要對某臺server停機(jī),那么我們只需要對其上面所有分區(qū)發(fā)送移動(dòng)指令即可。

    另外,在實(shí)現(xiàn)的時(shí)候我們還增加了如下約束條件:

    • 版本必須是連續(xù)遞增的

    • 每次只能執(zhí)行一項(xiàng)變更,比如只能對一個(gè)partition分裂,不能對多個(gè)partition進(jìn)行分裂

    • 對logic partition范圍的每次操作必須是連續(xù)的,比如合并的時(shí)候只能將[0, 100) 與[100, 200)合并,而不能將[0, 100)與[200, 300)合并

    • 路由變更必須是本次變更分區(qū)所有的消費(fèi)者都確認(rèn)執(zhí)行到指令消息才能觸發(fā)。比如將多個(gè)分區(qū)合并的時(shí)候,必須是這幾個(gè)分區(qū)都消費(fèi)到了指令消息的時(shí)候觸發(fā)。

    總結(jié)

    上面以示例的方式描述了QMQ如何進(jìn)行擴(kuò)容(分裂),那么只需要按照這個(gè)步驟進(jìn)行,consumer在沒有將更早的消息消費(fèi)完成的情況下就不會(huì)拿到更新的路由。

    至于如何確保順序的消費(fèi)這些分區(qū)的消息那就跟其他MQ一樣了,只需要將分區(qū)分配給指定的consumer實(shí)例,只允許指定的實(shí)例獨(dú)占消費(fèi)該分區(qū)即可。

    QMQ是去哪兒網(wǎng)開源的分布式消息中間件,在去哪兒網(wǎng)內(nèi)部應(yīng)用十分廣泛,提供了很獨(dú)特的存儲模型,延時(shí)消息,事務(wù)消息等。點(diǎn)擊原文鏈接就會(huì)跳到github地址(https://github.com/qunarcorp/qmq),歡迎給我們提交PR, Star。

    總結(jié)

    以上是生活随笔為你收集整理的QMQ顺序消息设计与实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。