日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > windows >内容正文

windows

Rocketmq学习3——消息发送原理源码浅析

發(fā)布時(shí)間:2024/1/21 windows 45 coder
生活随笔 收集整理的這篇文章主要介紹了 Rocketmq学习3——消息发送原理源码浅析 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一丶概述

RocketMQ 消息發(fā)送的原理流程可以分為以下幾個(gè)步驟:

1. 創(chuàng)建生產(chǎn)者

在發(fā)送消息前,客戶端首先需要?jiǎng)?chuàng)建一個(gè)消息生產(chǎn)者(Producer)實(shí)例,并設(shè)置必要的配置參數(shù),如NameServer地址、生產(chǎn)組名稱、消息發(fā)送失敗的重試次數(shù)等。

2. 啟動(dòng)生產(chǎn)者

創(chuàng)建生產(chǎn)者后,需要調(diào)用啟動(dòng)方法來(lái)初始化生產(chǎn)者實(shí)例。在這個(gè)過(guò)程中,生產(chǎn)者會(huì)與NameServer建立連接,從NameServer獲取到所有Broker的地址信息。

3. 發(fā)送消息

消息發(fā)送分為同步發(fā)送、異步發(fā)送和單向發(fā)送三種方式:

  • 同步發(fā)送(Synchronous): 生產(chǎn)者發(fā)送消息后,會(huì)在發(fā)送線程中等待服務(wù)器的響應(yīng),直到收到消息發(fā)送確認(rèn)。
  • 異步發(fā)送(Asynchronous): 生產(chǎn)者發(fā)送消息后,不會(huì)等待服務(wù)器的響應(yīng),而是通過(guò)回調(diào)接口處理服務(wù)器的響應(yīng)。
  • 單向發(fā)送(One-way): 生產(chǎn)者只負(fù)責(zé)發(fā)送消息,不等待服務(wù)器響應(yīng),也不關(guān)心消息是否到達(dá)服務(wù)器。

無(wú)論采用哪種發(fā)送方式,消息發(fā)送的主要流程如下:

  1. 消息路由: 生產(chǎn)者通過(guò)負(fù)載均衡算法選擇一個(gè)隊(duì)列,通常是根據(jù)topic和隊(duì)列選擇一個(gè)Broker的一個(gè)隊(duì)列來(lái)發(fā)送消息。
  2. 消息發(fā)送: 生產(chǎn)者向選定的Broker發(fā)送消息。消息包含了topic、tags、keys、body等信息。
  3. 消息存儲(chǔ): Broker接收到消息后,會(huì)將消息存儲(chǔ)到CommitLog(消息存儲(chǔ)文件)中。如果配置了消息重試或者高可靠性相關(guān)的配置,Broker可能會(huì)執(zhí)行額外的消息復(fù)制或持久化操作以確保消息的可靠性。
  4. 寫(xiě)入響應(yīng): Broker將消息存儲(chǔ)確認(rèn)響應(yīng)返回給生產(chǎn)者。如果是同步發(fā)送,生產(chǎn)者會(huì)在這一步等待該響應(yīng);如果是異步發(fā)送,生產(chǎn)者會(huì)在回調(diào)函數(shù)中處理該響應(yīng)。

本篇,我們就來(lái)簡(jiǎn)單看下rocketmq從生產(chǎn)者發(fā)送消息,學(xué)習(xí)一下其中優(yōu)秀的設(shè)計(jì)!

二丶生產(chǎn)者消息發(fā)送

生產(chǎn)者消息發(fā)送本質(zhì)是通過(guò)網(wǎng)絡(luò)io將消息發(fā)送到broker中,通常通過(guò)DefaultMQProducer#send(Message)進(jìn)行簡(jiǎn)單的消息發(fā)送,如下是其源碼

可看到如果設(shè)置了autoBatch并且消息本身不是一個(gè)批量消息,那么會(huì)調(diào)用sendByAccumulator(使用消息累計(jì)器進(jìn)行發(fā)送,猜測(cè)會(huì)累計(jì)到內(nèi)存中然后批量進(jìn)行發(fā)送)

反之會(huì)調(diào)用sendDirect進(jìn)行消息發(fā)送

1.sendByAccumulator 如何累計(jì)消息發(fā)送

rocketmq抽象出ProduceAccumulator進(jìn)行消息的累計(jì)發(fā)送

ProduceAccumulator會(huì)將消息根據(jù)Topic和tag進(jìn)行分組存儲(chǔ),然后包裝為MessageBatch調(diào)用DefaultMQProducer進(jìn)行發(fā)送

2.sendDirect

DefaultMQProducer消息發(fā)送會(huì)委托給DefaultMQProducerImpl進(jìn)行發(fā)送,這兩個(gè)類名稱很像但是DefaultMQProducerImpl不是DefaultMQProducer的實(shí)現(xiàn),二者是不同維度的:

  • DefaultMQProducer是給調(diào)用方使用的,相當(dāng)于門(mén)面
  • DefaultMQProducerImpl:實(shí)現(xiàn)了MQProducerInner,真正實(shí)現(xiàn)消息發(fā)送機(jī)制

  1. 指定SendCallback:當(dāng)異步發(fā)送消息的時(shí)候,可以實(shí)現(xiàn)此接口,實(shí)現(xiàn)消息發(fā)送成功or失敗后的回調(diào)
  2. 指定MessageQueue:MessageQueue是由TopicbrokerqueueId組成,一個(gè)topic可以分布在多個(gè)Broker上(橫向擴(kuò)展),一個(gè)broker上可以由多個(gè)queue(多個(gè)queue并行消費(fèi)提升吞吐量),因此通過(guò)發(fā)送消息指定MessageQueue可以實(shí)現(xiàn)消息的局部有序(消費(fèi)者使用MessageListenerOrderly單線程進(jìn)行消費(fèi))

下面我們來(lái)看看消息發(fā)送的具體實(shí)現(xiàn),這部分代碼在DefaultMQProducerImpl#sendDefaultImpl

1.獲取路由信息

獲取路由信息,即從生產(chǎn)者向 NameServer 查詢特定 Topic 的路由信息。這個(gè)路由信息包括了這個(gè) Topic 有哪些 Broker 持有,以及這些 Broker 上各自的 Queue 數(shù)據(jù)

  • NameServer 是 RocketMQ 中的一個(gè)關(guān)鍵組件,起到了服務(wù)注冊(cè)中心的作用。所有的 Broker 啟動(dòng)時(shí)會(huì)向所有的 NameServer 注冊(cè),包括其 IP 地址、端口、存活狀態(tài)以及所持有的 Topic 信息。NameServer 會(huì)持有整個(gè)消息系統(tǒng)的 Broker 服務(wù)器列表及其路由信息。
  • 當(dāng)生產(chǎn)者啟動(dòng)時(shí),它會(huì)根據(jù)配置好的 NameServer 地址列表與 NameServer 集群建立連接。
  • 生產(chǎn)者會(huì)在本地緩存從 NameServer 獲取到的路由信息,以便快速選擇目標(biāo) Queue 進(jìn)行消息發(fā)送。為了確保路由信息的準(zhǔn)確性,生產(chǎn)者會(huì)定期(如每隔30秒)或在發(fā)送消息時(shí)發(fā)現(xiàn)路由信息不可用時(shí),重新從 NameServer 更新這些信息,并且生產(chǎn)者發(fā)送消息的時(shí)候根據(jù)本地緩存的路由信息選擇一個(gè) Queue 來(lái)發(fā)送消息。

通過(guò)這種方式,RocketMQ 確保生產(chǎn)者能夠及時(shí)獲取和更新路由信息,以及將消息發(fā)送到正確的 Broker 和 Queue。這個(gè)機(jī)制也使得 RocketMQ 能夠在 Broker 或隊(duì)列變化時(shí)動(dòng)態(tài)適應(yīng),保證消息傳輸?shù)母呖捎眯院涂蓴U(kuò)展性。

生產(chǎn)者會(huì)優(yōu)先從ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable中獲取路由,反之使用rpc請(qǐng)求nameServer獲取路由信息

另外在生產(chǎn)者啟動(dòng)的時(shí)候,會(huì)觸發(fā)MQClientInstance的start,其中會(huì)使用juc調(diào)度線程池進(jìn)行路由信息的定期更新(默認(rèn)30秒一次)。

這里居然沒(méi)有使用長(zhǎng)輪詢,理論上長(zhǎng)輪詢相比于這種周期請(qǐng)求有更好的及時(shí)性,rocketmq可能是考慮到

  • 長(zhǎng)輪詢的方式需nameServer維護(hù)連接狀態(tài),而周期輪詢對(duì)于nameServer負(fù)擔(dān)更小
  • 周期請(qǐng)求可以讓生產(chǎn)者設(shè)置周期頻率
  • 流量更加均勻:長(zhǎng)輪詢?cè)诼酚尚畔l(fā)生變化的時(shí)候,nameServer需要立馬將變化后的信息發(fā)送給hang住的producer,不如周期輪詢來(lái)得流量均衡。
  • 大部分情況下,路由信息不會(huì)頻繁變化的,定期輪詢可滿足需要,不像配置中心配置變更是比較頻繁的,并且配置中心對(duì)于配置變更及時(shí)性有比較高的要求。

2.負(fù)載均衡的選擇一個(gè)MessageQueue

如下,如果是同步發(fā)送消息,一般會(huì)嘗試3次,在獲取到路由信息后會(huì)負(fù)載均衡的選擇一個(gè)MessageQueue進(jìn)行發(fā)送。

RocketMq支持三種選擇MessageQueue的方式

  • 發(fā)送消息的時(shí)候,傳入MessageQueueSelector的實(shí)現(xiàn)選擇隊(duì)列;

  • 未開(kāi)啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:false),會(huì)采用默認(rèn)輪訓(xùn)機(jī)制(默認(rèn)是此種實(shí)現(xiàn)方式)

  • 開(kāi)啟Broker故障延遲機(jī)制(sendLatencyFaultEnable:true),會(huì)根據(jù)brokerName的可用性選擇隊(duì)列發(fā)送(當(dāng)需要順序消息的時(shí)候不建議打開(kāi),會(huì)影響到消息的順序性)

    其中是否可用,是否可達(dá),依賴LatencyFaultTolerance進(jìn)行實(shí)現(xiàn):

    LatencyFaultTolerance 實(shí)現(xiàn)了一個(gè)基于延遲的容錯(cuò)策略。它記錄了每個(gè) Broker 的歷史網(wǎng)絡(luò)延遲記錄和可用性狀態(tài),并根據(jù)這些信息智能選擇最佳的 Broker 進(jìn)行消息發(fā)送。原理包括以下幾個(gè)關(guān)鍵點(diǎn):

    • 延遲記錄:每次發(fā)送消息時(shí),LatencyFaultTolerance 都會(huì)記錄下發(fā)送操作的延遲時(shí)間。如果發(fā)送成功,那么這次操作的延遲時(shí)間就會(huì)被記錄下來(lái)。
    • 故障切換:如果發(fā)送消息時(shí)發(fā)生超時(shí)或異常,LatencyFaultTolerance 會(huì)將該 Broker 標(biāo)記為不可用,并計(jì)算一個(gè)“不可用時(shí)長(zhǎng)”。在該時(shí)長(zhǎng)內(nèi),Broker 將不會(huì)被選中發(fā)送消息。
    • 動(dòng)態(tài)容錯(cuò):
      LatencyFaultTolerance 會(huì)根據(jù)之前記錄的延遲時(shí)間,動(dòng)態(tài)計(jì)算每個(gè) Broker 的權(quán)重,并選擇權(quán)重最小(表示網(wǎng)絡(luò)狀態(tài)最好)的 Broker 進(jìn)行消息發(fā)送。
    • 自動(dòng)恢復(fù):
      被標(biāo)記為不可用的 Broker 不是永久性的。隨著時(shí)間的推移,Broker 的狀態(tài)可以從不可用恢復(fù)到可用,這通常是通過(guò)“不可用時(shí)長(zhǎng)”來(lái)確定的。一旦超過(guò)這個(gè)時(shí)長(zhǎng),Broker 將重新參與到Broker選擇過(guò)程中。
    • Broker選擇:
      生產(chǎn)者在發(fā)送消息前會(huì)從 LatencyFaultTolerance 中獲取一個(gè)推薦的 Broker。選擇過(guò)程排除了不可用的 Broker,并考慮了網(wǎng)絡(luò)延遲和Broker的歷史表現(xiàn)。

3.消息發(fā)送

至此,我們以及選擇了一個(gè)MessageQueue接下來(lái)就是發(fā)送消息了。

在發(fā)生之前會(huì)從路由信息中獲取發(fā)送的地址,這里只會(huì)選擇master角色的broker進(jìn)行發(fā)送

接下來(lái)會(huì)回調(diào)一些擴(kuò)展性的鉤子,如CheckForbiddenHook,SendMessageHook。
然后調(diào)用MQClientAPIImpl#sendMessage進(jìn)行發(fā)送,最終調(diào)用RemotingClient進(jìn)行消息發(fā)送,RemotingClient是rocketmq對(duì)網(wǎng)絡(luò)通信的實(shí)現(xiàn)

無(wú)論是單向,還是異步,還是同步,最終都是使用tcp協(xié)議進(jìn)行發(fā)送,這里rocketmq使用了netty提供高效的網(wǎng)絡(luò)通信。源碼如下:

netty的部分,不做過(guò)多贅述,詳細(xì)學(xué)習(xí):Netty源碼學(xué)習(xí)7——netty是如何發(fā)送數(shù)據(jù)的 - Cuzzz - 博客園 (cnblogs.com)

三丶總結(jié)

感覺(jué)學(xué)到了什么,又感覺(jué)什么都沒(méi)學(xué)到

  • NameServer:實(shí)現(xiàn)producor,broker,consumer的解耦合,互相不需要感知彼此的村子,本質(zhì)是一個(gè)注冊(cè)中心。
  • 路由信息使用定期輪詢,而不是長(zhǎng)輪詢
    • 長(zhǎng)輪詢的方式需nameServer維護(hù)連接狀態(tài),而周期輪詢對(duì)于nameServer負(fù)擔(dān)更小
    • 周期請(qǐng)求可以讓生產(chǎn)者設(shè)置周期頻率
    • 流量更加均勻:長(zhǎng)輪詢?cè)诼酚尚畔l(fā)生變化的時(shí)候,nameServer需要立馬將變化后的信息發(fā)送給hang住的producer,不如周期輪詢來(lái)得流量均衡。
    • 大部分情況下,路由信息不會(huì)頻繁變化的,定期輪詢可滿足需要,不像配置中心配置變更是比較頻繁的,并且配置中心對(duì)于配置變更及時(shí)性有比較高的要求。
  • 負(fù)載均衡:
    • rocketmq的負(fù)載均衡,大多實(shí)在客戶端做的,在消息發(fā)送中的體現(xiàn)就是,producer自己實(shí)現(xiàn)負(fù)載均衡,而不是由一個(gè)中心化的網(wǎng)關(guān)實(shí)現(xiàn),這樣去中心化的設(shè)計(jì),利于producer的橫向擴(kuò)展!
    • 默認(rèn)情況下使用輪詢,而且使用ThreadLocal記錄輪詢到的index,一定程度上減少大量消息發(fā)送時(shí)候的鎖競(jìng)爭(zhēng)
    • LatencyFaultTolerance:基于每一次發(fā)送消息的統(tǒng)計(jì)信息,如果發(fā)送消息時(shí)發(fā)生超時(shí)或異常,LatencyFaultTolerance 會(huì)將該 Broker 標(biāo)記為不可用,并計(jì)算一個(gè)“不可用時(shí)長(zhǎng)”。在該時(shí)長(zhǎng)內(nèi),Broker 將不會(huì)被選中發(fā)送消息

總結(jié)

以上是生活随笔為你收集整理的Rocketmq学习3——消息发送原理源码浅析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。