日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

【RocketMQ】MQ消息发送总结

發布時間:2023/10/11 综合教程 98 老码农
生活随笔 收集整理的這篇文章主要介紹了 【RocketMQ】MQ消息发送总结 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ是通過DefaultMQProducer進行消息發送的,它實現了MQProducer接口,MQProducer接口中定義了消息發送的方法,方法主要分為三大類:

  1. send同步進行消息發送,向Broker發送消息之后等待響應結果;
  2. send異步進行消息發送,向Broker發送消息之后立刻返回,當消息發送成功/失敗之后觸發回調函數;
  3. sendOneway單向發送,也是異步消息發送,向Broker發送消息之后立刻返回,但是沒有回調函數;
public interface MQProducer extends MQAdmin {

    // 同步發送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException; // 異步發送消息,SendCallback為回調函數
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException; // 異步發送消息,沒有回調函數
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
}

接下來以同步發送為例,看下生產者發送消息的過程。

生產者發送消息

Topic主題

一般會為同一業務類型設定一個Topic,將不同的業務類型的數據放到不同的Topic中管理,不過主題只是一個邏輯概念,并不是實際的消息容器。

主題內部由多個消息隊列(MessageQueue)組成,消息隊列是消息存儲的實際容器,消息隊列與Kafka的分區(Partition)類似。

獲取主題的發布信息(TopicPublishInfo)

Broker在啟動時會向NameServer發送注冊信息并定時向NameServer發送心跳包,Broker向NameServer注冊信息中包括了該Broker的IP、Name以及Topic的配置信息,

生產者和消費者默認每30s從NameServer更新一次路由信息,可以知道消息所在的Topic分布在哪些Broker上。

生產者有一個主題路由信息表topicPublishInfoTable,緩存從NameServer拉取到的路由信息,它是ConcurrentMap類型的,KEY為topic主題名稱, value為該Topic的發布信息,是TopicPublishInfo類型。

當生產者向Broker發送消息之前,首先需要知道消息所屬的Topic的路由信息,有了Topic的路由信息才能知道Topic分布在哪個Broker上,生產者往哪個Broker上發,而topicPublishInfoTable中記錄了每個主題的相關信息,可以從topicPublishInfoTable中查找Topic的路由信息。

如果從topicPublishInfoTable中查找成功,就可以繼續后續的步驟,如果查找失敗,此時生產者需要從NameServer中查詢該Topic的路由信息:

  • 如果查詢成功,會判斷路由信息是否發生了變化,如果發生變化,生產者會更新本地緩存的該Topic的路由信息;
  • 如果依舊未查詢到,它會有一個默認的主題,會使用這個默認的主題進行消息發送;

選取消息隊列

前面知道,一個Topic一般由多個消息隊列組成,所以主題的發布信息數據TopicPublishInfo獲取到之后,需要從中選取一個消息隊列,然后獲取此消息隊列所屬的Broker,與Broker通信將消息投遞到對應的消息隊列中。

未啟用故障延遲機制

在每個Topic內部,設置了一個計數器sendWhichQueue用于輪詢從消息隊列集合中選取隊列。

在未啟用故障延遲機制的時候,如果上一次選擇的BrokerName為空,也就是首次發送消息時,處理邏輯如下:

  1. 對計數器增一;
  2. 根據計數器的值對消息隊列列表的長度取余得到下標值pos,從隊列列表中獲取pos位置的元素,以此達到輪詢從消息隊列列表中選擇消息隊列的目的;
  3. 返回第2步中獲取到的消息隊列;
  4. 在調用獲取消息隊列的地方,會記錄本次選擇消息隊列所在的BrokerName;

如果上一次選擇的BrokerName不為空,表示上次發送消息時就發送給了此Broker,此時的處理邏輯與上面的不同點在第3步,通過從隊列列表中獲取pos位置的元素之后,并沒有直接把選取到的消息隊列返回,而是再增加一個判斷,判斷當前選取到的Broker是否與上次選擇的Broker名稱一致,如果一致會繼續循環,輪詢選擇下一個消息隊列,如果不一致則直接返回:

  1. 對計數器增一;
  2. 根據計數器的值對消息隊列列表的長度取余得到下標值pos,從隊列列表中獲取pos位置的元素;
  3. 對第2步獲取到的消息隊列進行判斷:
    • 如果本次選取到的隊列與上次發送消息的Broker一致,回到第1步繼續選擇下一個隊列,如果一直未選出滿足要求的消息隊列,則不作判斷,使用上面的方式輪詢選擇一個隊列返回;
    • 如果本次選取到的隊列與上次發送消息的Broker不一致,返回當前的隊列;
  4. 在調用獲取消息隊列的地方,會記錄本次選擇消息隊列所在的BrokerName;

總結

在未啟用故障延遲機制時,從該消息所屬的Topic下的所有消息隊列集合中,輪詢選擇消息隊列進行發送,如果上一次選擇了某個Broker發送消息,本次將不會再選擇這個Broker,當然如果最后仍未找到滿足要求的消息隊列,將會跳過這個判斷,直接從隊列中輪詢獲取消息隊列返回。

開啟故障延遲機制

在生產者進行發送消息的時候,無論消息是否發送成功與否都會記錄向每個Broker的發送消息的條目信息FaultItem,有一個失敗條目表faultItemTable,faultItemTable記錄了每個Broker對應的失敗條目FaultItem,FaultItem中主要有以下信息:

  • name:Broker的名稱;
  • currentLatency:延遲時間,可以理解為是本次向該Broker發送消息耗時時間:發送消息結束時間 - 消息發送開始時間;
  • startTimestamp:規避故障時間,一般為當前時間 + 不可用的持續時間,不可用的持續時間有兩種情況,分別為30000ms或者使用currentLatency延遲時間(也就是上次發送消息所用的時間),一般在出現異常的時候,會將不可用的持續時間設置為30000ms,消息正常發送的時候使用currentLatency延遲時間。

設置規避故障時間主要是為了在某個時間段內規避某個Broker,假設向某個Broker發送失敗/或者向此Broker發生消息的耗時比較長,生產者認為此Broker可能暫時處于異常狀態/或者該時間段內此Broker的性能不高,在下次發送消息時盡量規避這個Broker,避免向此Broker上投遞消息。

每次消息發送之后會更新該Broker的失敗條目的處理邏輯如下:

  1. 根據Broker名稱從faultItemTable獲取對應的FaultItem對象;
  2. 如果上一步獲取為空,說明之前沒有記錄過該Broker的信息,需要新建對應FaultItem對象,此時需要設置name、currentLatency延遲時間、startTimestamp規避故障時間;
  3. 如果第1步中獲取到該Broker對應的FaultItem對象,直接更新里面的currentLatency延遲時間、startTimestamp規避故障時間即可;

接下來看如何使用FaultItem中記錄的信息,來實現故障規避。

使用故障規避,需要啟用故障延遲機制,此時從隊列集合中選擇消息隊列的處理邏輯如下:

  1. 對計數器增一;

  2. 根據計數器的值對消息隊列列表的長度取余得到下標值pos,從隊列列表中獲取pos位置的元素,依舊輪詢從消息隊列列表中選擇消息隊列,這兩步與未開啟故障時邏輯一致;

  3. 選擇出消息隊列之后,會獲取該隊列所在的Broker名稱,上面說到,生產者每次與Broker通信發送消息時,會記錄消息發發送情況,此時可以根據Broker的名稱,從失敗條目表faultItemTable中獲取該Broker的FaultItem,用來判斷當前選擇的消息隊列是否可用,FaultItem中有一個規避故障時間,來看兩種情況:

    • 情況一:上次向此Broker發送消息失敗,那么這個時間的值為發送消息失敗時的時間 + 30000ms,判斷當前時間有沒有超過故障規避設置的時間,如果超過了當前選擇的消息隊列可用,那么就會返回當前選擇的這個消息隊列,如果未超過表示該Broker暫時不可用所以不能使用當前選擇的消息隊列,需要回到第1步繼續選擇下一個隊列;
    • 情況二:上次向此Broker發送消息成功,那么這個時間的值為發送消息失敗時的時間 + 上次發送消息的耗時時間,判斷當前時間有沒有超過故障規避設置的時間,這個依賴于上次發送消息的耗時時間的長短,如果耗時比較長,可能還未超過規避時間,本次就不能選擇向此Broker發送消息同樣需要回到第1步選擇下一個隊列,如果耗時比較短,可能現在已經過了規避時間,那么就可以選擇當前的消息隊列返回;
  4. 如果進行到這一步,以上步驟沒有選擇到可用的消息隊列,此時需要通過以下方式再次選擇消息隊列:

    (1)遍歷faultItemTable失敗條目表,將每一個Broker對應的FaultItem加入一個LinkedList鏈表;

    (2)對鏈表進行排序,FaultItem實現Comparable就是為了在這里進行排序,值小的排在鏈表前面,值的大小判斷規則如下:

    • 對比是否有超過規避時間的Broker(調用isAvailable可以判斷),如果有表示值比較小,會排在前面,之后被優先選擇,如果所有的Broker都為超過規避時間,進入下一個對比條件;
    • 對比currentLatency的值,值越小排序的時候越靠前,也就是盡量選擇發送消息耗時短的那個Broker,如果值相等進入下一個對比條件;
    • 對比startTimestamp的值,同樣值越小排序的時候越靠前,盡量選擇規避時間較短的那個Broker;

    (3)經過以上的規則進行排序后,會根據鏈表的總大小,計算一個中間值:

    • 如果half值小于等于0,取鏈表中的第一個元素;
    • 如果half值大于0,從前half個元素中輪詢選擇元素;

    (4)在鏈表中越靠前的元素,表示發送消息的延遲越低,在選擇時優先級就越高,如果half值小于等于0的時候,取鏈表中的第一個元素,half值大于0的時候,處于鏈表前half個的Broker,延遲都是相對較低的,此時輪詢從前haft個Broker中選擇一個Broker,總之經過這么多處理就是為了選擇一個延遲相對較低的Broker

    (5)獲取上一步選取到的那個Broker,獲取Broker可寫的隊列數量:

    • 如果數量小于0表示該Broker不可用,需要移除然后進入下一步;
    • 如果數量大于0,表示該Broker可用,然后重新輪詢從消息隊列列表中選取一個隊列,將本次選取到的消息隊列所屬的Broker設置為第(4)步中選取到的那個Broker,也就是將這個消息隊列及Topic重置到新的Broker中(認為原本所屬的Broker不可用,需要設置一個新的Broker),然后返回當前選取的消息隊列;
  5. 如果經過第4步依舊未選出可用的消息隊列,那么就跳過故障延遲機制,直接從該Topic的所有隊列中輪詢選擇一個返回;

總結

故障延遲機制指的是在發送消息時記錄每個Broker的耗時時間,如果某個Broker發生故障,但是生產者還未感知(NameServer 30s檢測一次心跳,有可能Broker已經發生故障但未到檢測時間,所以會有一定的延遲),用耗時時間做為一個故障規避時間(也可以是30000ms),此時消息會發送失敗,在重試或者下次選擇消息隊列的時候,如果在規避時間內,可以避免再次選擇到此Broker,以此達到故障規避的目的。

如果某個Topic所在的所有Broker都處于不可用狀態,此時盡量選擇延遲時間最短、規避時間最短(排序后的失敗條目中靠前的元素)的Broker作為此次發送消息消息的Broker。

對應的相關源碼可參考:

【RocketMQ】【源碼】MQ消息發送

參考

RocketMQ官方文檔

總結

以上是生活随笔為你收集整理的【RocketMQ】MQ消息发送总结的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。