日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > windows >内容正文

windows

Rocketmq学习2——Rocketmq消息过滤&事务消息&延迟消息原理源码浅析

發(fā)布時(shí)間:2024/1/16 windows 34 coder
生活随笔 收集整理的這篇文章主要介紹了 Rocketmq学习2——Rocketmq消息过滤&事务消息&延迟消息原理源码浅析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

系列文章目錄和關(guān)于我

零丶引入

在《Rocketmq學(xué)習(xí)1——Rocketmq架構(gòu)&消息存儲(chǔ)&刷盤(pán)機(jī)制》中我們學(xué)習(xí)了rocketmq的架構(gòu),以及消息存儲(chǔ)設(shè)計(jì),在此消息存儲(chǔ)設(shè)計(jì)之上,rocketmq提供了諸如:延時(shí)消息、事務(wù)消息、消息過(guò)濾、消息回溯等高級(jí)特性。這一篇將對(duì)這些高級(jí)特性的原理進(jìn)行淺顯地學(xué)習(xí)。

這一篇不會(huì)展示這些高級(jí)特性怎么使用,如何使用可用查看rocketmq-example源碼

一丶消息過(guò)濾

RocketMQ分布式消息隊(duì)列的消息過(guò)濾方式有別于其它MQ中間件,在kafka中,如果想實(shí)現(xiàn)消息過(guò)濾,需要消費(fèi)者拿到消息后,反序列化消息識(shí)別其中的tag進(jìn)行過(guò)濾。

但是RocketMQ是在Consumer端訂閱消息時(shí)再做消息過(guò)濾的。RocketMQ這么做是在于其Producer端寫(xiě)入消息和Consumer端訂閱消息采用分離存儲(chǔ)的機(jī)制來(lái)實(shí)現(xiàn)的,Consumer端訂閱消息是需要通過(guò)ConsumeQueue這個(gè)消息消費(fèi)的邏輯隊(duì)列拿到一個(gè)索引,然后再?gòu)腃ommitLog里面讀取真正的消息實(shí)體內(nèi)容,所以說(shuō)到底也是還繞不開(kāi)其存儲(chǔ)結(jié)構(gòu)。其ConsumeQueue的存儲(chǔ)結(jié)構(gòu)如下,可以看到其中有8個(gè)字節(jié)存儲(chǔ)的Message Tag的哈希值,基于Tag的消息過(guò)濾正是基于這個(gè)字段值的。

主要支持如下2種的過(guò)濾方式
(1) Tag過(guò)濾方式:Consumer端在訂閱消息時(shí)除了指定Topic還可以指定TAG,如果一個(gè)消息有多個(gè)TAG,可以用||分隔。其中,Consumer端會(huì)將這個(gè)訂閱請(qǐng)求構(gòu)建成一個(gè) SubscriptionData,發(fā)送一個(gè)Pull消息的請(qǐng)求給Broker端。Broker端從RocketMQ的文件存儲(chǔ)層—Store讀取數(shù)據(jù)之前,會(huì)用這些數(shù)據(jù)先構(gòu)建一個(gè)MessageFilter,然后傳給Store。Store從 ConsumeQueue讀取到一條記錄后,會(huì)用它記錄的消息tag hash值去做過(guò)濾,由于在服務(wù)端只是根據(jù)hashcode進(jìn)行判斷,無(wú)法精確對(duì)tag原始字符串進(jìn)行過(guò)濾,故在消息消費(fèi)端拉取到消息后,還需要對(duì)消息的原始tag字符串進(jìn)行比對(duì),如果不同,則丟棄該消息,不進(jìn)行消息消費(fèi)

如上是tag消息過(guò)濾的大致邏輯,可用看到最終還是從commitLog中根據(jù)偏移量獲取消息,那么為什么rocketmq不解析一下消息內(nèi)容,再次根據(jù)tag字符串進(jìn)行比較昵?

這是因?yàn)檫@里使用了MappedByteBuffer避免將整個(gè)CommitLog讀取到內(nèi)存中,如果試圖將消息讀取到內(nèi)存中,比較tag的話,maybe出現(xiàn)磁盤(pán)IO和內(nèi)核態(tài)和用戶(hù)態(tài)的切換(如果這個(gè)消息沒(méi)有被預(yù)先加載到物理內(nèi)存中,操作系統(tǒng)會(huì)觸發(fā)一個(gè)缺頁(yè)中斷,這時(shí)候會(huì)從用戶(hù)態(tài)切換到內(nèi)核態(tài),從磁盤(pán)上讀取消息,然后加載到物理內(nèi)容,然后再?gòu)膬?nèi)核態(tài)切換到用戶(hù)態(tài))

(2) SQL92的過(guò)濾方式:這種方式的大致做法和上面的Tag過(guò)濾方式一樣,只是在Store層的具體過(guò)濾過(guò)程不太一樣,真正的 SQL expression 的構(gòu)建和執(zhí)行由rocketmq-filter模塊負(fù)責(zé)的。每次過(guò)濾都去執(zhí)行SQL表達(dá)式會(huì)影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執(zhí)行。SQL92的表達(dá)式上下文為消息的屬性。

大致原理是,根據(jù)消息屬性中獲取序列化的布隆過(guò)濾器數(shù)據(jù),如果布隆過(guò)濾器表示不符合那么肯定是不符合,如果符合那么需要進(jìn)一步進(jìn)行過(guò)濾。

二丶事務(wù)消息

1.事務(wù)消息大致流程

RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息,如下圖所示。

上圖說(shuō)明了事務(wù)消息的大致方案,其中分為兩個(gè)流程:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。

  1. 事務(wù)消息發(fā)送及提交:

    1. 發(fā)送消息(half消息):這一階段的消息對(duì)消費(fèi)者來(lái)說(shuō)是不可見(jiàn)的,RocketMQ事務(wù)消息是這樣實(shí)現(xiàn)half消息不可見(jiàn)的:

      如果消息是half消息,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費(fèi)組未訂閱該主題,故消費(fèi)端無(wú)法消費(fèi)half類(lèi)型的消息,然后RocketMQ會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個(gè)服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請(qǐng)求,根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾消息。

      這里可看到生產(chǎn)者組的作用:如果生產(chǎn)者服務(wù)器A和B是一個(gè)生產(chǎn)者組,生產(chǎn)者A掛了,rocketmq會(huì)請(qǐng)求生產(chǎn)者B來(lái)回程事務(wù)提交狀態(tài)

    2. 服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果。

    3. 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫(xiě)入失敗,此時(shí)half消息對(duì)業(yè)務(wù)不可見(jiàn),本地邏輯不執(zhí)行)。

    4. 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引,消息對(duì)消費(fèi)者可見(jiàn))

  2. 補(bǔ)償流程:補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況

    1. 對(duì)沒(méi)有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”
    2. Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)
    3. 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback

可用看到rocketmq通過(guò)主動(dòng)會(huì)查實(shí)現(xiàn)最終一致性,但是不會(huì)無(wú)限制的重試下去,默認(rèn)回查15次,如果15次回查還 是無(wú)法得知事務(wù)狀態(tài),rocketmq默認(rèn)回滾該消息。

如下如果發(fā)送事務(wù)消息,那么會(huì)在消息中標(biāo)記是一個(gè)事務(wù)消息

在Broker端,如果根據(jù)此字段可得知是否時(shí)事務(wù)消息,如果是,那么會(huì)有存儲(chǔ)為half消息

如上,可看到如果是事務(wù)消息會(huì)備份原topic,然后替換為事務(wù)topic,然后使用Store進(jìn)行存儲(chǔ)。

2.Commit和Rollback操作以及Op消息的引入

在完成一階段寫(xiě)入一條對(duì)用戶(hù)不可見(jiàn)的消息后,二階段如果是Commit操作,則需要讓消息對(duì)用戶(hù)可見(jiàn);如果是Rollback則需要撤銷(xiāo)一階段的消息。先說(shuō)Rollback的情況。對(duì)于Rollback,本身一階段的消息對(duì)用戶(hù)是不可見(jiàn)的,其實(shí)不需要真正撤銷(xiāo)消息(實(shí)際上RocketMQ也無(wú)法去真正的刪除一條消息,因?yàn)槭琼樞驅(qū)懳募模5菂^(qū)別于這條消息沒(méi)有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個(gè)操作來(lái)標(biāo)識(shí)這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念用Op消息標(biāo)識(shí)事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。如果一條事務(wù)消息沒(méi)有對(duì)應(yīng)的Op消息,說(shuō)明這個(gè)事務(wù)的狀態(tài)還無(wú)法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無(wú)論是Commit或者Rollback都會(huì)記錄一個(gè)Op操作。Commit相對(duì)于Rollback只是在寫(xiě)入Op消息前創(chuàng)建Half消息的索引。

3.Op消息的存儲(chǔ)和對(duì)應(yīng)關(guān)系

RocketMQ將Op消息寫(xiě)入到全局一個(gè)特定的Topic中通過(guò)源碼中的方法—TransactionalMessageUtil.buildOpTopic();這個(gè)Topic是一個(gè)內(nèi)部的Topic(像Half消息的Topic一樣),不會(huì)被用戶(hù)消費(fèi)。Op消息的內(nèi)容為對(duì)應(yīng)的Half消息的存儲(chǔ)的Offset,這樣通過(guò)Op消息能索引到Half消息進(jìn)行后續(xù)的回查操作。

4.Half消息的索引構(gòu)建

在執(zhí)行二階段Commit操作時(shí),需要構(gòu)建出Half消息的索引。一階段的Half消息由于是寫(xiě)到一個(gè)特殊的Topic,所以二階段構(gòu)建索引時(shí)需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過(guò)一次普通消息的寫(xiě)入操作來(lái)生成一條對(duì)用戶(hù)可見(jiàn)的消息。所以RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲(chǔ)的消息的內(nèi)容,在二階段時(shí)恢復(fù)出一條完整的普通消息,然后走一遍消息寫(xiě)入流程。

5.如何處理二階段失敗的消息?

如果在RocketMQ事務(wù)消息的二階段過(guò)程中失敗了,例如在做Commit操作時(shí),出現(xiàn)網(wǎng)絡(luò)問(wèn)題導(dǎo)致Commit失敗,那么需要通過(guò)一定的策略使這條消息最終被Commit。RocketMQ采用了一種補(bǔ)償機(jī)制,稱(chēng)為“回查”。Broker端對(duì)未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對(duì)應(yīng)的Producer端(同一個(gè)Group的Producer),由Producer根據(jù)消息來(lái)檢查本地事務(wù)的狀態(tài),進(jìn)而執(zhí)行Commit或者Rollback。Broker端通過(guò)對(duì)比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。

值得注意的是,rocketmq并不會(huì)無(wú)休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無(wú)法得知事務(wù)狀態(tài),rocketmq默認(rèn)回滾該消息。

三丶延遲消息

定時(shí)消息(延遲隊(duì)列)是指消息發(fā)送到broker后,不會(huì)立即被消費(fèi),等待特定時(shí)間投遞給真正的topic。基本實(shí)現(xiàn)方式和事務(wù)消息類(lèi)似

broker有配置項(xiàng)messageDelayLevel,默認(rèn)值為“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18個(gè)level。可以配置自定義messageDelayLevel。注意, messageDelayLevel是broker的屬性,不屬于某個(gè)topic。發(fā)消息時(shí),設(shè)置delayLevel等級(jí)即可: msg.setDelayLevel(level)。level有以下三種情況:

  • level == 0,消息為非延遲消息
  • 1<=level<=maxLevel,消息延遲特定時(shí)間,例如level==1,延遲1s
  • level > maxLevel,則level== maxLevel,例如level==20,延遲2h

定時(shí)消息會(huì)暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據(jù)delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一個(gè)queue只存相同延遲的消息,保證具有相同發(fā)送延遲 的消息能夠順序消費(fèi)。broker會(huì)調(diào)度地消費(fèi)SCHEDULE_TOPIC_XXXX,將消息寫(xiě)入真實(shí)的topic。

如下是rocketmq基于調(diào)度線程池,實(shí)現(xiàn)定時(shí)任務(wù)處理延遲消息

總結(jié)

以上是生活随笔為你收集整理的Rocketmq学习2——Rocketmq消息过滤&事务消息&延迟消息原理源码浅析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。