日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?

發(fā)布時間:2023/12/24 windows 30 coder
生活随笔 收集整理的這篇文章主要介紹了 1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

大家好,我是三友~~

故事的開頭是這樣的

最近有個兄弟私信了我一張截圖

我一看截圖內容,好家伙,原來是我一年多前立的flag

倒不是我忘了這件事,我后來也的確寫了一篇的關于RocketMQ運行的原理的文章

只不過這篇文章是從上帝的視角去看待RocektMQ一條消息整個生命周期的過程

所以就沒有具體的分析事務和延遲消息的實現原理,也算是留下了一個小小的坑吧

不過,既然現在有兄弟問了,那么今天我這就來把這個坑填上

并且,索性咱就直接把這個坑填得滿滿的,直接盤點RocketMQ支持的11種消息類型以及背后的實現原理

本文是基于RocketMQ 4.9版本講解

前置知識

為了幫助大家更好地理解這些消息底層的實現原理,這里我就通過三個問題來講一講RocketMQ最最基本的原理

如果有什么不解的,可以看看我之前寫的RocketMQ消息短暫而又精彩的一生這篇文章

1、生產者如何發(fā)送消息

在RocketMQ中有兩個重要的角色

  • NameServer:就相當于一個注冊中心
  • Broker:RocketMQ服務端

當RocketMQ服務端,也就是Broker在啟動的時候,會往NameServer注冊自己的信息

這些信息其中就包括

  • 當前Broker所在機器的ip和端口
  • 當前Broker管理的Topic的名稱以及每個Topic有幾個隊列

當生產者和消費者啟動的時候,就會從NameServer拉取這些信息,這樣生產者和消費者就可以通過NameServer中獲取到Broker的ip和端口,跟Broker通信了

而Topic我們也都知道,是消息隊列中一個很重要的概念,代表了一類消息的集合

在RocketMQ中,每個Topic默認都會有4個隊列,并且每個隊列都有一個id,默認從0開始,依次遞增

生產者發(fā)送消息的時候,就會從消息所在Topic的隊列中,根據一定的算法選擇一個,然后攜帶這個隊列的id(queueId),再發(fā)送給Broker

攜帶的隊列的id就代表了這條消息屬于這個隊列的

所以從更細化的來說,消息雖然是在Topic底下,但是真正是分布在不同的隊列上的,每個隊列會有這個Topic下的部分消息。

2、消息存在哪

當消息被Broker接收到的時候,Broker會將消息存到本地的磁盤文件中,保證Broker重啟之后消息也不丟失

RocketMQ給這個存消息的文件起了一個高大上的名字:CommitLog

由于消息會很多,所以為了防止文件過大,CommitLog在物理磁盤文件上被分為多個磁盤文件,每個文件默認的固定大小是1G

消息在寫入到文件時,除了包含消息本身的內容數據,也還會包含其它信息,比如

  • 消息的Topic
  • 消息所在隊列的id,前面提到過
  • 消息生產者的ip和端口
  • ...

這些數據會和消息本身按照一定的順序同時寫到CommitLog文件中

上圖中黃色排列順序和實際的存的內容并非實際情況,我只是舉個例子

3、消費者如何消費消息

消費者是如何拉取消息的

在RocketMQ中,消息的消費單元是以隊列來的

所以RocketMQ為了方便快速的查找和消費消息,會為每個Topic的每個隊列也單獨創(chuàng)建一個文件

RocketMQ給這個文件也起了一個高大上的名字:ConsumeQueue

當消息被存到CommitLog之后,其實還會往這條消息所在隊列的ConsumeQueue文件中插一條數據

每個隊列的ConsumeQueue也是由多個文件組成,每個文件默認是存30萬條數據

插入ConsumeQueue中的每條數據由20個字節(jié)組成,包含3部分信息

  • 消息在CommitLog的起始位置(8個字節(jié))
  • 消息在CommitLog存儲的長度(8個字節(jié))
  • 消息tag的hashCode(4個字節(jié))

每條數據也有自己的編號(offset),默認從0開始,依次遞增

當消費者拉取消息的時候,會告訴服務端自己消費哪個隊列(queueId),哪個位置的消息(offset)的消息

服務端接收到消息之后,會找到queueId對應的ConsumeQueue,然后找到offset位置的數據,最后根據這條數據到CommitLog文件查找真正的消息內容

所以,從這可以看出,ConsumeQueue其實就相當于是一個索引文件,方便我們快速查找在CommitLog中的消息

所以,記住下面這個非常重要的結論,有助于后面的文章內容的理解

要想查找到某個Topic下的消息,那么一定是先找這個Topic隊列對應的ConsumeQueue,之后再通過ConsumeQueue中的數據去CommitLog文件查找真正的消息內容

消費者組和消費模式

在RocketMQ,消費者是有個消費者組的概念,在啟動消費者的時候會指定該消費者屬于哪個消費者組。

//創(chuàng)建一個消費者,指定消費者組的名稱為sanyouConsumer
DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("sanyouConsumer");

一個消費者組中可以有多個消費者,不同消費者組之間消費消息是互不干擾的

在同一個消費者組中,消息消費有兩種模式

  • 集群模式
  • 廣播模式

同一條消息在同一個消費者組底下只會被消費一次,這就叫集群模式

集群消費的實現就是將隊列按照一定的算法分配給消費者,默認是按照平均分配的

廣播模式剛好相反,同一條消息能被同一個消費者組底下所有的消費者消費一次

RocketMQ默認是集群模式,如果你想用廣播模式,只需設置一下即可

consumer.setMessageModel(MessageModel.BROADCASTING);

好了,到這就講完了前置知識,這些前置知識后面或多或少都有提到

如果你覺得看的不過癮,更詳細的文章奉上RocketMQ消息短暫而又精彩的一生

普通消息

普通消息其實就很簡單,如下面代碼所示,就是發(fā)送一條普通的消息

public?class?Producer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//創(chuàng)建一個生產者,指定生產者組為?sanyouProducer
????????DefaultMQProducer?producer?=?new?DefaultMQProducer("sanyouProducer");
????????//?指定NameServer的地址
????????producer.setNamesrvAddr("192.168.200.143:9876");
????????//?啟動生產者
????????producer.start();

????????//創(chuàng)建一條消息?topic為?sanyouTopic?消息內容為?三友的java日記
????????Message?msg?=?new?Message("sanyouTopic",?"三友的java日記".getBytes(RemotingHelper.DEFAULT_CHARSET));
????????//?發(fā)送消息并得到消息的發(fā)送結果,然后打印
????????SendResult?sendResult?=?producer.send(msg);
????????System.out.printf("%s%n",?sendResult);

????????//?關閉生產者
????????producer.shutdown();
????}

}

構建的消息的topic為sanyouTopic,內容為三友的java日記,這就是一條很普通的消息

批量消息

批量消息從名字也可以看出來,就是將多個消息同時發(fā)過去,減少網絡請求的次數

public?class?Producer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//創(chuàng)建一個生產者,指定生產者組為?sanyouProducer
????????DefaultMQProducer?producer?=?new?DefaultMQProducer("sanyouProducer");
????????//?指定NameServer的地址
????????producer.setNamesrvAddr("192.168.200.143:9876");
????????//?啟動生產者
????????producer.start();

????????//用以及集合保存多個消息
????????List<Message>?messages?=?new?ArrayList<>();
????????messages.add(new?Message("sanyouTopic",?"三友的java日記?0".getBytes()));
????????messages.add(new?Message("sanyouTopic",?"三友的java日記?1".getBytes()));
????????messages.add(new?Message("sanyouTopic",?"三友的java日記?2".getBytes()));
????????//?發(fā)送消息并得到消息的發(fā)送結果,然后打印
????????SendResult?sendResult?=?producer.send(messages);
????????System.out.printf("%s%n",?sendResult);

????????//?關閉生產者
????????producer.shutdown();
????}

}

多個普通消息同時發(fā)送,這就是批量消息

不過在使用批量消息的時候,需要注意以下兩點

  • 每條消息的Topic必須都得是一樣的
  • 不支持延遲消息和事務消息

普通消息和批量消息比較簡單,沒有復雜的邏輯,就是將消息發(fā)送過去,在ConsumeQueue和CommitLog存上對應的數據就可以了

順序消息

所謂的順序消息就是指

生產者發(fā)送消息的順序跟消費者消費消息的順序是一致的

RocketMQ可以保證同一個隊列的消息絕對順序,先進入隊列的消息會先被消費者拉取到,但是無法保證一個Topic內消息的絕對順序

所以要想通過RocketMQ實現順序消費,需要保證兩點

  • 生產者將需要保證順序的消息發(fā)送到同一個隊列
  • 消費者按照順序消費拉取到的消息

那么,第一個問題,如何消息發(fā)送到同一個隊列

前面有提到,RocketMQ發(fā)送消息的時候會選擇一個隊列進行發(fā)送

而RocketMQ默認是通過輪詢算法來選擇隊列的,這就無法保證需要順序消費的消息會存到同一個隊列底下

所以,默認情況下是不行了,我們需要自定義隊列的選擇算法,才能保證消息都在同一個隊列中

RocketMQ提供了自定義隊列選擇的接口MessageQueueSelector

比如我們可以實現這個接口,保證相同訂單id的消息都選擇同一個隊列,在消息發(fā)送的時候指定一下就可以了

SendResult?sendResult?=?producer.send(msg,?new?MessageQueueSelector()?{
????@Override
????public?MessageQueue?select(List<MessageQueue>?mqs,?Message?msg,?Object?arg)?{
????????//可以根據業(yè)務的id從mqs中選擇一個隊列
????????return?null;
????}
},?new?Object());

保證消息順序發(fā)送之后,第二個問題,消費者怎么按照順序消費拉取到的消息?

這個問題RocketMQ已經考慮到了,看看RocketMQ多么地貼心

RocketMQ在消費消息的時候,提供了兩種方式:

  • 并發(fā)消費
  • 順序消費

并發(fā)消費,多個線程同時處理同一個隊列拉取到的消息

順序消費,同一時間只有一個線程會處理同一個隊列拉取到的消息

至于是并發(fā)消費還是順序消費,需要我們自己去指定

對于順序處理,只需要實現MessageListenerOrderly接口,處理消息就可以了

public?class?Consumer?{
????public?static?void?main(String[]?args)?throws?InterruptedException,?MQClientException?{

????????//?創(chuàng)建一個消費者
????????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("sanyouConsumer");
????????//?指定NameServer的地址
????????consumer.setNamesrvAddr("192.168.200.143:9876");

????????//?訂閱sanyouTopic這個topic下的所有的消息
????????consumer.subscribe("sanyouTopic",?"*");
????????//?注冊一個消費的監(jiān)聽器,當有消息的時候,會回調這個監(jiān)聽器來消費消息
????????consumer.registerMessageListener(new?MessageListenerOrderly()?{
????????????@Override
????????????public?ConsumeOrderlyStatus?consumeMessage(List<MessageExt>?msgs,?ConsumeOrderlyContext?context)?{
????????????????for?(MessageExt?msg?:?msgs)?{
????????????????????System.out.printf("消費消息:%s",?new?String(msg.getBody())?+?"\n");
????????????????}

????????????????return?ConsumeOrderlyStatus.SUCCESS;
????????????}
????????});

????????//?啟動消費者
????????consumer.start();

????????System.out.printf("Consumer?Started.%n");
????}
}

如果想并發(fā)消費,換成實現MessageListenerConcurrently即可

到這你可能會有一個疑問

并發(fā)消費和順序消費跟前面提到的集群消費和廣播消費有什么區(qū)別?

集群消費和廣播消費指的是一個消費者組里的每個消費者是去拉取全部隊列的消息還是部分隊列的消息,也就是選擇需要拉取的隊列

而并發(fā)和順序消費的意思是,是對已經拉到的同一個隊列的消息,是并發(fā)處理還是按照消息的順序去處理

延遲消息

延遲消息就是指生產者發(fā)送消息之后,消息不會立馬被消費,而是等待一定的時間之后再被消息

RocketMQ的延遲消息用起來非常簡單,只需要在創(chuàng)建消息的時候指定延遲級別,之后這條消息就成為延遲消息了

Message?message?=?new?Message("sanyouTopic",?"三友的java日記?0".getBytes());
//延遲級別
message.setDelayTimeLevel(1);

雖然用起來簡單,但是背后的實現原理還是有點意思,我們接著往下看

RocketMQ延遲消息的延遲時間默認有18個級別,不同的延遲級別對應的延遲時間不同

RocketMQ內部有一個Topic,專門用來表示是延遲消息的,叫SCHEDULE_TOPIC_XXXX,XXXX不是占位符,就是XXXX

RocketMQ會根據延遲級別的個數為SCHEDULE_TOPIC_XXXX這個Topic創(chuàng)建相對應數量的隊列

比如默認延遲級別是18,那么SCHEDULE_TOPIC_XXXX就有18個隊列,隊列的id從0開始,所以延遲級別為1時,對應的隊列id就是0,為2時對應的就是1,依次類推

SCHEDULE_TOPIC_XXXX這個Topic有什么作用呢?

這就得從消息存儲時的一波偷梁換柱的騷操作了說起了

當服務端接收到消息的時候,判斷延遲級別大于0的時候,說明是延遲消息,此時會干下面三件事:

  • 將消息的Topic改成SCHEDULE_TOPIC_XXXX
  • 將消息的隊列id設置為延遲級別對應的隊列id
  • 將消息真正的Topic和隊列id存到前面提到的消息存儲時的額外信息中

之后消息就按照正常存儲的步驟存到CommitLog文件中

由于消息存到的是SCHEDULE_TOPIC_XXXX這個Topic中,而不是消息真正的目標Topic中,所以消費者此時是消費不到消息的

舉個例子,比如有條消息,Topic為sanyou,所在的隊列id = 1,延遲級別 = 1,那么偷梁換柱之后的結果如下圖所示

代碼如下

所以從上分析可以得出一個結論

所有RocketMQ的延遲消息,最終都會存儲到SCHEDULE_TOPIC_XXXX這個Topic中,并且同一個延遲級別的消息在同一個隊列中

在存消息偷梁換柱之后,實現延遲消費的最關鍵的一個步驟來了

BocketMQ在啟動的時候,除了為每個延遲級別創(chuàng)建一個隊列之后,還會為每個延遲級別創(chuàng)建一個延遲任務,也就相當于一個定時任務,每隔100ms執(zhí)行一次

這個延遲任務會去檢查這個隊列中的消息有沒有到達延遲時間,也就是不是可以消費了

前面的結論,每個隊列都有一個ConsumeQueue文件,可以通過ConsumeQueue找到這個隊列中的消息

一旦發(fā)現到達延遲時間,可以消費了,此時就會從這條消息額外存儲的消息中拿到真正的Topic和隊列id,重新構建一條新的消息,將新的消息的Topic和隊列id設置成真正的Topic和隊列id,內容還是原來消息的內容

之后再一次將新構建的消息存儲到CommitLog中

由于新消息的Topic變成消息真正的Topic了,所以之后消費者就能夠消費到這條消息了

所以,從整體來說,RocketMQ延遲消息的實現本質上就是最開始消息是存在SCHEDULE_TOPIC_XXXX這個中轉的Topic中

然后會有一個類似定時任務的東西,不停地去找到這個Topic中的消息

一旦發(fā)現這個消息達到了延遲任務,說明可以消費了,那么就重新構建一條消息,這條消息的Topic和隊列id都是實際上的Topic和隊列id,然后存到CommitLog

之后消費者就能夠在目標的Topic獲取到消息了

事務消息

事務消息用起來也比較簡單,如下所示:

public?class?TransactionMessageDemo?{

????public?static?void?main(String[]?args)?throws?Exception?{
????????TransactionMQProducer?transactionMQProducer?=?new?TransactionMQProducer("sanyouProducer");
????????transactionMQProducer.setNamesrvAddr("192.168.200.143:9876");

????????//設置事務監(jiān)聽器
????????transactionMQProducer.setTransactionListener(new?TransactionListener()?{

????????????@Override
????????????public?LocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{
????????????????//處理本次事務
????????????????return?LocalTransactionState.COMMIT_MESSAGE;
????????????}

????????????@Override
????????????public?LocalTransactionState?checkLocalTransaction(MessageExt?msg)?{
????????????????//檢查本地事務
????????????????return?LocalTransactionState.COMMIT_MESSAGE;
????????????}
????????});

????????transactionMQProducer.start();

????????Message?message?=?new?Message("sanyouTopic",?"三友的java日記".getBytes());

????????//發(fā)送消息
????????transactionMQProducer.sendMessageInTransaction(message,?new?Object());
????}

}

事務消息發(fā)送相對于前面的例子主要有以下不同:

  • 將前面的DefaultMQProducer換成TransactionMQProducer
  • 需要設置事務的監(jiān)聽器TransactionListener,來執(zhí)行本地事務
  • 發(fā)送方法改成 sendMessageInTransaction

為什么要這么改,接下來我們來講講背后的實現原理

上一節(jié)在說延遲消息的時候提到,RocketMQ使用到了SCHEDULE_TOPIC_XXXX這個中轉Topic,來偷梁換柱實現延遲消息

不僅僅是延遲消息,事務消息其實也是這么干的,它也會進行偷梁換柱,將消息先存在RMQ_SYS_TRANS_HALF_TOPIC這個Topic下,同時也會將消息真正的Topic和隊列id存到額外信息中,操作都是一樣滴

由于消息不在真正目標的Topic下,所以這條消息消費者也是消費不到滴

當消息成功存儲之后,服務端會向生產者響應,告訴生產者我消息存儲成功了,你可以執(zhí)行本地事務了

之后生產者就會執(zhí)行本地執(zhí)行事務,也就是執(zhí)行如下方法

TransactionListener#executeLocalTransaction

當本地事務執(zhí)行完之后,會將執(zhí)行的結果發(fā)送給服務端

服務端會根據事務的執(zhí)行狀態(tài)來執(zhí)行對應的處理結果

  • commit:提交事務消息,跟延遲消息一樣,重新構建一條消息,Topic和隊列id都設置成消息真正的Topic和隊列id,然后重新存到CommitLog文件,這樣消費者就可以消費到消息了
  • rollback:回滾消息,其實并沒有實際的操作,因為消息本身就不在真正的Topic下,所以消費者壓根就消費不到,什么都不做就可以了
  • unknown:本地事務執(zhí)行異常時就是這個狀態(tài),這個狀態(tài)下會干一些事,咱們后面再說

所以在正常情況下,事務消息整個運行流程如下圖所示

既然有正常情況下,那么就有非正常情況下

比如前面提到的拋異常導致unknown,又或者什么亂七八糟的原因,導致無法正常提交本地事務的執(zhí)行狀態(tài),那么此時該怎么辦呢?

RocketMQ當然也想到了,他有自己的一套補償機制

RocketMQ內部會起動一個線程,默認每隔1分鐘去檢查沒有被commit或者rollback的事務消息

RocketMQ內部有一套機制,可以找出哪些事務消息沒有commit或者rollback,這里就不細說了

當發(fā)現這條消息超過6s沒有提交事務狀態(tài),那么此時就會向生產者發(fā)送一個請求,讓生產者去檢查一下本地的事務執(zhí)行的狀態(tài),就是執(zhí)行下面這行代碼

TransactionListener#checkLocalTransaction

之后會將這個方法返回的事務狀態(tài)提交給服務端,服務端就可以知道事務的執(zhí)行狀態(tài)了

這里有一個細節(jié)需要注意,事務消息檢查次數不是無限的,默認最大為15次,一旦超過15次,那么就不會再被檢查了,而是會直接把這個消息存到TRANS_CHECK_MAX_TIME_TOPIC

所以你可以從這個Topic讀取那些無法正常提交事務的消息

這就是RocketMQ事務消息的原理

小總結

RocketMQ事務消息的實現主要是先將消息存到RMQ_SYS_TRANS_HALF_TOPIC這個中間Topic,有些資料會把這個消息稱為半消息(half消息),這是因為這個消息不能被消費

之后會執(zhí)行本地的事務,提交本地事務的執(zhí)行狀態(tài)

RocketMQ會根據事務的執(zhí)行狀態(tài)去判斷commit或者是rollback消息,也就是是不是可以讓消費者消費這條消息的意思

在一些異常情況下,生產者無法及時正確提交事務執(zhí)行狀態(tài)

RocketMQ會向生產者發(fā)送消息,讓生產者去檢查本地的事務,之后再提交事務狀態(tài)

當然,這個檢查次數默認不超過15次,如果超過15次還未成功提交事務狀態(tài),RocketMQ就會直接把這個消息存到TRANS_CHECK_MAX_TIME_TOPIC

請求-應答消息

這個消息類型比較有意思,類似一種RPC的模式

生產者發(fā)送消息之后可以阻塞等待消費者消費這個消息的之后返回的結果

生產者通過過調用request方法發(fā)送消息,接收回復消息

public?class?Producer?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//創(chuàng)建一個生產者,指定生產者組為?sanyouProducer
????????DefaultMQProducer?producer?=?new?DefaultMQProducer("sanyouProducer");
????????//?指定NameServer的地址
????????producer.setNamesrvAddr("192.168.200.143:9876");
????????//?啟動生產者
????????producer.start();


????????Message?message?=?new?Message("sanyouTopic",?"三友的java日記".getBytes());
????????
????????//發(fā)送消息,拿到響應結果,?3000代表超時時間,3s內未拿到響應結果,就超時,會拋出RequestTimeoutException異常
????????Message?result?=?producer.request(message,?3000);
????????System.out.println("接收到響應消息:"?+?result);

????????//?關閉生產者
????????producer.shutdown();
????}

}

而對于消費者來著,當消費完消息之后,也要作為生產者,將響應的消息發(fā)送出去

public?class?Consumer?{
????public?static?void?main(String[]?args)?throws?InterruptedException,?MQClientException?{

????????//創(chuàng)建一個生產者,指定生產者組為?sanyouProducer
????????DefaultMQProducer?producer?=?new?DefaultMQProducer("sanyouProducer");
????????//?指定NameServer的地址
????????producer.setNamesrvAddr("192.168.200.143:9876");
????????//?啟動生產者
????????producer.start();


????????//?通過push模式消費消息,指定消費者組
????????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("sanyouConsumer");
????????//?指定NameServer的地址
????????consumer.setNamesrvAddr("192.168.200.143:9876");

????????//?訂閱這個topic下的所有的消息
????????consumer.subscribe("sanyouTopic",?"*");
????????//?注冊一個消費的監(jiān)聽器,當有消息的時候,會回調這個監(jiān)聽器來消費消息
????????consumer.registerMessageListener(new?MessageListenerConcurrently()?{

????????????@Override
????????????public?ConsumeConcurrentlyStatus?consumeMessage(List<MessageExt>?msgs,
????????????????????????????????????????????????????????????ConsumeConcurrentlyContext?context)
?
{
????????????????for?(MessageExt?msg?:?msgs)?{
????????????????????System.out.printf("消費消息:%s",?new?String(msg.getBody())?+?"\n");

????????????????????try?{
????????????????????????//?用RocketMQ自帶的工具類創(chuàng)建響應消息
????????????????????????Message?replyMessage?=?MessageUtil.createReplyMessage(msg,?"這是響應消息內容".getBytes(StandardCharsets.UTF_8));
????????????????????????//?將響應消息發(fā)送出去,拿到發(fā)送結果
????????????????????????SendResult?replyResult?=?producer.send(replyMessage,?3000);
????????????????????????System.out.println("響應消息的結果?=?"?+?replyResult);
????????????????????}?catch?(Exception?e)?{
????????????????????????e.printStackTrace();
????????????????????}

????????????????}

????????????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????}
????????});

????????//?啟動消費者
????????consumer.start();

????????System.out.printf("Consumer?Started.%n");
????}
}

這種請求-應答消息實現原理也比較簡單,如下圖所示

生產者和消費者,會跟RocketMQ服務端進行網絡連接

所以他們都是通過這個連接來發(fā)送和拉取消息的

當服務端接收到回復消息之后,有個專門處理回復消息的類

這個類就會直接找到發(fā)送消息的生產者的連接,之后會通過這個連接將回復消息發(fā)送給生產者

RocketMQ底層是基于Netty通信的,所以如果你有用過Netty的話,應該都知道,就是通過Channel來發(fā)送的

重試消息

重試消息并不是我們業(yè)務中主動發(fā)送的,而是指當消費者消費消息失敗之后,會間隔一段時間之后再次消費這條消息

重試的機制在并發(fā)消費模式和順序消費模式下實現的原理并不相同

并發(fā)消費模式重試實現原理

RocetMQ會為每個消費者組創(chuàng)建一個重試消息所在的Topic,名字格式為

%RETRY% + 消費者組名稱

舉個例子,假設消費者組為sanyouConsumer,那么重試Topic的名稱為:%RETRY%sanyouConsumer

當消息消費失敗后,RocketMQ會把消息存到這個Topic底下

消費者在啟動的時候會主動去訂閱這個Topic,那么自然而然就能消費到消費失敗的消息了

為什么要為每個消費者組創(chuàng)建一個重試Topic呢?

其實我前面已經說過,每個消費者組的消費是隔離的,互不影響

所以,每個消費者組消費失敗的消息可能就不一樣,自然要放到不同的Topic下了

重試消息是如何實現間隔一段時間來消費呢?

說到間隔一段時間消費,你有沒有覺得似曾相識?

不錯,間隔一段時間消費說白了不就是延遲消費么!

所以,并發(fā)消費模式下間隔一段時間底層就是使用的延遲消息來實現的

RocetMQ會為重試消息設置一個延遲級別

并且延遲級別與重試次數的關系為

delayLevel = 3 + 已經重試次數

比如第一次消費失敗,那么已經重試次數就是0,那么此時延遲級別就是3

對應的默認的延遲時間就是10s,也就是一次消息重試消費間隔時間是10s

隨著重試次數越多,延遲級別也越來越高,重試的間隔也就越來越長,但是最大也是最大延遲級別的時間

不過需要注意的是,在并發(fā)消費模式下,只有集群消費才支持消息重試,對于廣播消費模式來說,是不支持消息重試的,消費失敗就失敗了,不會管

順序消費模式重試實現原理

順序消費模式下重試就比較簡單了

當消費失敗的時候,他并不會將消息發(fā)送到服務端,而是直接在本地等1s鐘之后重試

在這個等待的期間其它消息是不能被消費的

這是因為保證消息消費的順序性,即使前面的消息消費失敗了,它也需要等待前面的消息處理完畢才能處理后面的消息

順序消費模式下,并發(fā)消費和集群消費均支持重試消息

死信消息

死信消息就是指如果消息最終無法被正常消費,那么這條消息就會成為死信消息

RocketMQ中,消息會變成死信消息有兩種情況

第一種就是消息重試次數已經達到了最大重試次數

最大重試次數取決于并發(fā)消費還是順序消費

  • 順序消費,默認最大重試次數就是 Integer.MAX_VALUE,基本上就是無限次重試,所以默認情況下順序消費的消息幾乎不可能成為死信消息
  • 并發(fā)消費的話,那么最大重試次數默認就是16次

當然可以通過如下的方法來設置最大重試次數

DefaultMQPushConsumer#setMaxReconsumeTimes

除了上面的情況之外,當在并發(fā)消費模式下,你可以在消息消費失敗之后手動指定,直接讓消息變成死信消息

在并發(fā)消費消息的模式下,處理消息的方法有這么一個參數

ConsumeConcurrentlyContext

這個類中有這么一個屬性

這個參數值有三種情況,注釋也有寫:

  • 小于0,那么直接會把消息放到死信隊列,成為死信消息。注釋寫的是=-1,其實只要小于0就可以成為死信消息,不一定非得是-1
  • 0,默認就是0,這個代表消息重試消費,并且重試的時間間隔(也就是延遲級別)由服務端決定,也即是前面重試消息提到的 delayLevel = 3 + 已經重試次數
  • 大于0,此時就表示客戶端指定消息重試的時間間隔,是幾就代表延遲級別為幾,比如設置成1,那么延遲級別就為1

所以,在并發(fā)消費模式下,可以通過設置這個參數值為-1,直接讓處理失敗的消息成為死信消息

當消息成為死信消息之后,消息并不會丟失

RocketMQ會將死信消息保存在死信Topic底下,Topic格式為

%DLQ% + 消費者組名稱

跟重試Topic的格式有點像,只是將%RETRY%換成了%DLQ%

如果你想知道有哪些死信消息,只需要訂閱這個Topic即可獲得

小總結

所以總的來說,兩種情況會讓消息成為死信消息:

  • 消息重試次數超過最大次數,跟消息的處理方式有關,默認情況下順序處理最大次數是幾乎是無限次,也就是幾乎不可能成為死信消息;并發(fā)處理的情況下,最大重試次數默認就是16次。最大重試次數是可以設置的。
  • 在并發(fā)處理的情況下,通過ConsumeConcurrentlyContextdelayLevelWhenNextConsume屬性設置成-1,讓消息直接變成死信消息

當消息成為死信消息的時候,會被存到%DLQ% + 消費者組名稱這個Topic下

用戶可以通過這個Topic獲取到死信消息,手動干預處理這些消息

同步消息

同步消息是指,當生產者發(fā)送消息的時候,需要阻塞等待服務端響應消息存儲的結果

同步消息跟前面提到的消息類型并不是互斥的

比如前面說的普通消息時舉的例子,他就是同步發(fā)送的,那么它也是一個同步消息

這種模式用于對數據一致性要求較高的場景中,但是等待也會消耗一定的時間

異步消息

既然有了同步消息,那么相對應的就有異步消息

異步消息就是指生產者發(fā)送消息后,不需要阻塞等待服務端存儲消息的結果

所以異步消息的好處就是可以減少等待響應過程消耗的時間

如果你想知道有沒有發(fā)送成功,可以在發(fā)送消息的時候傳個回調的接口SendCallback的實現

Message?message?=?new?Message("sanyouTopic",?"三友的java日記".getBytes());

//異步發(fā)送消息
producer.send(message,?new?SendCallback()?{
????????????@Override
????????????public?void?onSuccess(SendResult?sendResult)?{
????????????????System.out.println("消息發(fā)送結果?=?"?+?sendResult);
????????????}

????????????@Override
????????????public?void?onException(Throwable?e)?{
????????????????System.out.println("消息發(fā)送異常?=?"?+?e.getMessage());
????????????}
????????}
);

當消息發(fā)送之后收到發(fā)送結果或者出現異常的時候,RocektMQ就會回調這個SendCallback實現類,你就可以知道消息發(fā)送的結果了

單向消息

所謂的單向消息就是指,生產者發(fā)送消息給服務端之后,就直接不管了

所以對于生產者來說,他是不會去care消息發(fā)送的結果了,即使發(fā)送失敗了,對于生產者來說也是無所謂的

所以這種方式的主要應用于那種能夠忍受丟消息的操作場景

比如像日志收集就比較適合使用這種方式

單向消息的發(fā)送是通過sendOneway來調用的

Message?message?=?new?Message("sanyouTopic",?"三友的java日記".getBytes());

//發(fā)送單向消息
producer.sendOneway(message);

總的來說,同步消息、異步消息、單向消息代表的是消息的發(fā)送方式,主要是針對消息的發(fā)送方來說,對消息的存儲之類是的沒有任何影響的

最后

ok,到這本文就結束了

本文又又是一篇非常非常肝的文章,不知道你是否堅持看到這里

我在寫的過程中也是不斷地死磕源碼,盡可能避免出現錯誤的內容

同時也在嘗試爭取把我所看到的源碼以一種最簡單的方式說出來

所以如果你堅持看到這里,并覺得文章內容還不錯,歡迎點贊、在看、收藏、轉發(fā)分享給其他需要的人

你的支持就是我更新文章最大的動力,非常地感謝!

哦,最后差點忘了,如果有對RocketMQ源碼感興趣的小伙伴可以從下面這個倉庫fork一下源碼,我在源碼中加了中文注釋,并且后面我還會持續(xù)更新注釋

https://github.com/sanyou3/rocketmq.git

往期熱門文章推薦

如何去閱讀源碼,我總結了18條心法

如何寫出漂亮代碼,我總結了45個小技巧

三萬字盤點Spring/Boot的那些常用擴展點

三萬字盤點Spring 9大核心基礎功能

萬字+20張圖剖析Spring啟動時12個核心步驟

1.5萬字+30張圖盤點索引常見的11個知識點

兩萬字盤點那些被玩爛了的設計模式

掃碼或者搜索關注公眾號 三友的java日記 ,及時干貨不錯過,公眾號致力于通過畫圖加上通俗易懂的語言講解技術,讓技術更加容易學習,回復 面試 即可獲得一套面試真題。

總結

以上是生活随笔為你收集整理的1.5万字 + 25张图盘点RocketMQ 11种消息类型,你知道几种?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。