日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ简介

發布時間:2025/3/21 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ简介 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ作為一款純java、分布式、隊列模型的開源消息中間件,支持事務消息、順序消息、批量消息、定時消息、消息回溯等。
RocketMQ優點

1 RocketMQ去除對zk的依賴

2 RocketMQ支持異步和同步兩種方式刷磁盤

3 RocketMQ單機支持的隊列或者topic數量是5w

4 RocketMQ支持消息重試

5 RocketMQ支持嚴格按照一定的順序發送消息

6 RocketMQ支持定時發送消息

7 RocketMQ支持根據消息ID來進行查詢

8 RocketMQ支持根據某個時間點進行消息的回溯

9 RocketMQ支持對消息服務端的過濾

10 RocketMQ消費并行度:順序消費 取決于queue數量,亂序消費 取決于consumer數量
RocketMQ架構原理

RocketMQ專業名詞
Producer 生產者角色—投遞消息給mq。
Producer Group 生產者組 —
Consumer 消費者 采用拉取/mq推送方式 獲取消息offset
Consumer Group 消費者組----在同一個組中,是不允許多個不同的消費者消費同一條消息。
多個消費者消費同一條消息呢? 兩個分組 多個不同的分組中可以允許有不同分組中消費者消費同一條消息的。----
以組的名義關聯該組消費的offset位置—
Topic
業務隊列存放消息
異步發送短信
異步發送郵件
郵件Topic
短信Topic
業務區分不同的隊列消息
Queue
會將一個topic主題中的消息存放在多個不同的Queue 與kafka分區模型是一樣
中。

Message
生產者投遞消息會自動對給消息生成一個全局消息id,后期的可以根據該消息全局id實現業務的防止重復執行------冪等性概念。

Tag
–區分 過濾

Broker
Mq服務器端
Name Server
與zk相同思想,作為rocketmq注冊中心 存放生產者消費者 topic主題信息。

Producer
消息生產者,位于用戶的進程內,Producer通過NameServer獲取所有Broker的路由信息,根據負載均衡策略選擇將消息發到哪個Broker,然后調用Broker接口提交消息。

Producer Group
生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組。

Consumer
消息消費者,位于用戶進程內。Consumer通過NameServer獲取所有broker的路由信息后,向Broker發送Pull請求來獲取消息數據。Consumer可以以兩種模式啟動,廣播(Broadcast)和集群(Cluster),廣播模式下,一條消息會發送給所有Consumer,集群模式下消息只會發送給一個Consumer。

Consumer Group
消費者組,和生產者類似,消費同一類消息的多個 Consumer 實例組成一個消費者組。

Topic
Topic用于將消息按主題做劃分,Producer將消息發往指定的Topic,Consumer訂閱該Topic就可以收到這條消息。Topic跟發送方和消費方都沒有強關聯關系,發送方可以同時往多個Topic投放消息,消費方也可以訂閱多個Topic的消息。在RocketMQ中,Topic是一個上邏輯概念。消息存儲不會按Topic分開。

Message
代表一條消息,使用MessageId唯一識別,用戶在發送時可以設置messageKey,便于之后查詢和跟蹤。一個 Message 必須指定 Topic,相當于寄信的地址。Message 還有一個可選的 Tag 設置,以便消費端可以基于 Tag 進行過濾消息。也可以添加額外的鍵值對,例如你需要一個業務 key 來查找 Broker 上的消息,方便在開發過程中診斷問題。

Tag
標簽可以被認為是對 Topic 進一步細化。一般在相同業務模塊中通過引入標簽來標記不同用途的消息。

Broker
Broker是RocketMQ的核心模塊,負責接收并存儲消息,同時提供Push/Pull接口來將消息發送給Consumer。Consumer可選擇從Master或者Slave讀取數據。多個主/從組成Broker集群,集群內的Master節點之間不做數據交互。Broker同時提供消息查詢的功能,可以通過MessageID和MessageKey來查詢消息。Borker會將自己的Topic配置信息實時同步到NameServer。

Queue
Topic和Queue是1對多的關系,一個Topic下可以包含多個Queue,主要用于負載均衡。發送消息時,用戶只指定Topic,Producer會根據Topic的路由信息選擇具體發到哪個Queue上。Consumer訂閱消息時,會根據負載均衡策略決定訂閱哪些Queue的消息。

Offset
RocketMQ在存儲消息時會為每個Topic下的每個Queue生成一個消息的索引文件,每個Queue都對應一個Offset記錄當前Queue中消息條數。

NameServer
NameServer可以看作是RocketMQ的注冊中心,它管理兩部分數據:集群的Topic-Queue的路由配置;Broker的實時配置信息。其它模塊通過Nameserv提供的接口獲取最新的Topic配置和路由信息。

Producer/Consumer :通過查詢接口獲取Topic對應的Broker的地址信息
Broker : 注冊配置信息到NameServer, 實時更新Topic信息到NameServe

RocketMQ環境搭建

注意:一定要配置rocketmq 環境變量 不然啟動 mqnamesrv.cmd
報錯: Please set the ROCKETMQ_HOME variable in your environment!
啟動mqnamesrv

  • 下載rocketmq安裝包
  • 解壓rocketmq安裝包
  • 配置rocketmq環境變量
    系統環境變量配置
    變量名:ROCKETMQ_HOME
    變量值:MQ解壓路徑\MQ文件夾名
    eg、ROCKETMQ_HOME=D:\rocketmq-all-4.3.0-bin-release
    4.啟動 mqnamesrv.cmd
  • 啟動mqBroker
    Cmd命令框執行進入至‘MQ文件夾\bin’下,然后執行‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’,啟動BROKER。成功后會彈出提示框,此框勿關閉。
    啟動Rocketmq-console
    1.下載rocketmq-externals-master
    ,進入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夾,打開‘application.properties’進行配置。
    2. 新增:rocketmq.config.namesrvAddr=127.0.0.1:9876
    3. 執行
    用CMD進入‘\rocketmq-externals\rocketmq-console’文件夾,執行‘mvn clean package -Dmaven.test.skip=true’,編譯生成。
    編譯成功之后,Cmd進入‘target’文件夾,執行‘java -jar rocketmq-console-ng-2.0.0.jar’,啟動‘rocketmq-console-ng-1.0.0.jar’。
    4.瀏覽器中輸入‘127.0.0.1:配置端口’,成功后即可查看。
    eg:http://127.0.0.1:8088

    RocketMQ集群環境
    1.集群支持:
      RocketMQ天生對集群的支持非常友好
    2.單Master:
      優點:除了配置簡單沒什么優點
      缺點:不可靠,該機器重啟或宕機,將導致整個服務不可用
    3.多Master:
      優點:配置簡單,性能最高
    缺點:可能會有少量消息丟失(配置相關),單臺機器重啟或宕機期間,該機器下未被消費的消息在機器恢復前不可訂閱,影響消息實時性
    4.多Master多Slave異步模式:
      每個Master配一個Slave,有多對Master-Slave,集群采用異步復制方式,主備有短消息延遲,毫秒級
      優點:性能同多Master幾乎一樣,實時性高,主備間切換對應用透明,不需人工干預
      缺點:Master宕機或磁盤損壞時會有少量消息丟失

    Rocketmq偽集群搭建

    集群:Master節點、Slave節點

    Master節點:讀寫 生產者可以投遞消息到Master節點、消費者讀取該Master節點消費消息。
    Slave節點:只能讀 不能夠寫 消費者只能夠讀,生產者不能夠將消息投遞到Slave節點中。

  • 新增兩個broker-a.conf broker-b.conf
    broker-a.conf 配置
    brokerClusterName=DefaultCluster
    #從節點的brokerName必須和主節點一樣
    brokerName=broker-a
    #0表示是一個主節點, >0表示Slave
    brokerId=0
    deleteWhen=04
    fileReservedTime=48
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    #nameServer地址,分號分割
    namesrvAddr=127.0.0.1:9876
    #允許自動創建主題
    autoCreateTopicEnable=true
    #注意需改端口,并且要和默認的10911相差5以上
    listenPort=10911
    #存儲路徑
    storePathRootDir=D:\rocketmq\store_master
    #commitLog存儲路徑
    storePathCommitLog=D:\rocketmq\store_master\commitLog
    #消費隊列存儲路徑
    storePathConsumerQueue=D:\rocketmq\store_master\consumerqueue
    #消息索引存儲路徑
    storePathIndex=D:\rocketmq\store_master\index
    #checkpoint 文件存儲路徑
    storeCheckpoint=D:\rocketmq\store_master\checkpoint
    #abort 文件存儲路徑
    abortFile=D:\rocketmq\store_master\abort
  • broker-b.conf配置
    brokerClusterName=DefaultCluster
    #從節點的brokerName必須和主節點一樣
    brokerName=broker-a
    #0表示是一個主節點, >0表示Slave
    brokerId=1
    deleteWhen=04
    fileReservedTime=48
    brokerRole=SLAVE
    flushDiskType=ASYNC_FLUSH
    #nameServer地址,分號分割
    namesrvAddr=127.0.0.1:9876
    #允許自動創建主題
    autoCreateTopicEnable=true
    #注意需改端口,并且要和默認的10911相差5以上
    listenPort=10921
    #存儲路徑
    storePathRootDir=D:\rocketmq\store_slave
    #commitLog存儲路徑
    storePathCommitLog=D:\rocketmq\store_slave\commitLog
    #消費隊列存儲路徑
    storePathConsumerQueue=D:\rocketmq\store_slave\consumerqueue
    #消息索引存儲路徑
    storePathIndex=D:\rocketmq\store_slave\index
    #checkpoint 文件存儲路徑
    storeCheckpoint=D:\rocketmq\store_slave\checkpoint
    #abort 文件存儲路徑
    abortFile=D:\rocketmq\store_slave\abort

    啟動broker集群

    啟動broker-a—
    F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
    babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-a.conf
    The broker[broker-a, 192.168.31.1:10911] boot success. serializeType=JSON and na
    me server is 127.0.0.1:9876

    啟動broker-b—
    F:\path\alibabamq\rocketmq-all-4.3.2-bin-release\bin>mqbroker.cmd -c F:\path\ali
    babamq\rocketmq-all-4.3.2-bin-release\conf\cluster\broker-b.conf
    The broker[broker-a, 192.168.31.1:10921] boot success. serializeType=JSON and na
    me server is 127.0.0.1:9876

    展示界面

    Rocketmq集群同步方案

    在RocketMQ中 Broker需要實現集群保證高可用(HA)

  • 構建環境為 一主和一從
    生產者投遞消息都是存放在主的Broker中
    從Broker每次定時同步主Brokercommitlog日志文件。
  • 消費者獲取消息— 利弊:
    A.如果實現一主一從讀寫分離消費模型----
    消費者訂閱從節點消費消息,可能會存在延遲問題。(網絡數據傳輸過程中,延遲必然)
    B. 消費者直接訂閱我們主Broker消費消息 延遲概率比較低
    默認的情況下,消費者:訂閱我們主的Broker消費消息,如果主的Broker節點物理內存占用達到40%,開始采用訂閱從節點實現消費,可以提高讀寫性能。----讀寫分離消費模型架構物理內存占用達到40%----消息堆積—
  • 如果主Broker宕機的情況下,生產者是無法投遞消息,而我們消費者可以
    訂閱我們從節點實現數據的消費。

    消費者 消費進度同步問題

    在每個Broker服務器中, 在C:\Users\Administrator\store\config

    consumerOffset.json 記錄每個分組消費主題消息隊列 消費進度。

    “topic_2020_mayikt@mayikt-group23”:{0:2,1:1,2:3,3:0

    topic_2020_mayikt 主題名稱
    mayikt-group23消費者分組名稱

    0:2,1:1,
    0:隊列id 消費進度 offset為2的位置
    1:隊列id 消費者進度 offset為1的位置

    1.從節點定時的形式同步主節點消費記錄信息,不管消費者訂閱主節點還是從節點
    最終都會優先的將消費記錄結果給主節點,如果節點真的宕機的情況下,先記錄在
    從節點。
    2.如果主節點現在突然存活的情況下,從哪個位置開始消費呢?
    如果我們消費者內存中有緩存消費進度的情況下,連接到主節點修改最新消費者進度記錄。
    如果消費者內存沒有緩存消費進度的情況下,可能會發生重復消費的問題。

    在RocketMQ中 Broker需要實現集群保證高可用(HA)

  • 在RocketMQ Broker集群中 分為 Master、Slave
  • Master做寫操作Slave(備份主節點commitlog日志文件)數據
  • ----Slave會同步Master節點中所有commitlog文件數據。
    如果當Master節點宕機了,這時候Master節點就無法寫入數據(生產者無法投遞數據),
    但是消費者可以根據Slave(備份)節點消費數據,注意的是 從Slave(備份)節點無法
    寫入數據。

    設置該參數:slaveReadEnable 從服務器不允許讀;

    RocketMQ讀寫分離機制:

  • 默認的情況下,RocketMQ優先從主Master節點拉取數據信息;
  • 如果主服務器的消息堆積過多,占用物理內存40%后,開始建議使用從節點消費,實現
    讀寫分離消費模型,提高IO讀寫的性能。
  • RocketMQ消費者進度如何同步:

  • 當主節點宕機之后,消費者會使用從服務器提交消費記錄,每次消費的記錄會保存在當前Broker存儲目錄:D:\rocketmq\store_slave\config consumerOffset.json
  • 從服務器會開啟一個定時任務向主服務器發送同步消費進度, 實現主從消費進度同步、不管是在主消費還是在從消費,消費者會優先將該消費的進度匯報給主的服務器,而且我們消費者將消費的進度保存在內存中,當主節點宕機之后,有恢復的話,發送最新消費者進度給主的服務器,這時候就避免了重復消費進度的問題。
  • Springboot整合方式

    注意springboot整合rocketmq server端 版本一定要與rocketmq 不然可能啟動報錯

    Maven依賴

    org.springframework.boot
    spring-boot-starter-parent
    2.2.4.RELEASE





    org.springframework.boot
    spring-boot-starter-web


    org.projectlombok
    lombok


    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.0.1

    生產者
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**

    • 普通消息投遞 單向發送
      */
      @GetMapping("/sendMsg")
      public String sendMsg() {
      MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
      rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
      return “投遞消息 => " + msg.toString() + " => 成功”;
      }

    消費者
    /**

    • @ClassName RocketMQConsumer

    • @Author 螞蟻課堂余勝軍 QQ644064779 www.mayikt.com

    • @Version V1.0
      **/
      @Service
      @Slf4j
      @RocketMQMessageListener(consumerGroup = “mayikt-group5”, topic = “topic_meite”)
      public class RocketMQConsumer implements RocketMQListener {
      @Override
      public void onMessage(MsgEntity msgEntity) {

      log.info("消費者監聽到消息:<msg:{}>", msgEntity);

      }
      }

    配置文件
    spring:
    application:
    name: mayikt-rocketmq
    server:
    port: 8000
    rocketmq:

    rocketmq地址

    name-server: 127.0.0.1:9876
    producer:
    # 必須填寫 group
    group: mayikt-group

    Rocketmq配置文件詳解
    所屬集群名字
    brokerClusterName=rocketmq-cluster
    此處需手動更改
    broker名字,注意此處不同的配置文件填寫的不一樣
    附加:按配置文件文件名來匹配
    brokerName=broker-a
    0 表示Master, > 0 表示slave
    brokerId=0
    此處許手動更改
    (此處nameserver跟host配置相匹配,9876為默認rk服務默認端口)nameServer 地址,分號分割
    附加:broker啟動時會跟nameserver建一個長連接,broker通過長連接才會向nameserver發新建的topic主題,然后java的客戶端才能跟nameserver端發起長連接,向nameserver索取topic,找到topic主題之后,判斷其所屬的broker,建立長連接進行通訊,這是一個至關重要的路由的概念,重點,也是區別于其它版本的一個重要特性
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
    defaultTopicQueueNums=4
    是否允許Broker 自動創建Topic,建議線下開啟,線上關閉
    autoCreateTopicEnable=true
    是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
    autoCreateSubscriptionGroup=true
    Broker 對外服務的監聽端口
    listenPort=10911
    刪除文件時間點,默認是凌晨4點
    deleteWhen=04
    文件保留時間,默認48小時
    fileReservedTime=120
    commitLog每個文件的大小默認1G
    附加:消息實際存儲位置,和ConsumeQueue是mq的核心存儲概念,之前搭建2m環境的時候創建在store下面,用于數據存儲,consumequeue是一個邏輯的概念,消息過來之后,consumequeue并不是把消息所有保存起來,而是記錄一個數據的位置,記錄好之后再把消息存到commitlog文件里
    mapedFileSizeCommitLog=1073741824
    ConsumeQueue每個文件默認存30W條,根據業務情況調整
    mapedFileSizeConsumeQueue=300000
    destroyMapedFileIntervalForcibly=120000
    redeleteHangedFileInterval=120000
    檢測物理文件磁盤空間
    diskMaxUsedSpaceRatio=88
    存儲路徑
    storePathRootDir=/usr/local/rocketmq/store
    commitLog存儲路徑
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    消費隊列存儲路徑
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    消息索引存儲路徑
    storePathIndex=/usr/local/rocketmq/store/index
    checkpoint 文件存儲路徑
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    abort 文件存儲路徑
    abortFile=/usr/local/rocketmq/store/abort
    限制的消息大小
    maxMessageSize=65536
    flushCommitLogLeastPages=4
    flushConsumeQueueLeastPages=2
    flushCommitLogThoroughInterval=10000
    flushConsumeQueueThoroughInterval=60000
    Broker 的角色
    ? ASYNC_MASTER 異步復制Master
    ? SYNC_MASTER 同步雙寫Master
    ? SLAVE
    brokerRote=ASYNC_MASTER
    刷盤方式
    ? ASYNC_FLUSH 異步刷盤
    ? SYNC_FLUSH 同步刷盤
    flushDiskType=ASYNC_FLUSH
    checkTransactionMessageEnable=false
    發消息線程池數量
    sendMessageTreadPoolNums=128
    拉消息線程池數量
    pullMessageTreadPoolNums=128

    Rocketmq隊列分區模型

    Rocketmq 底層存儲結構中 將一個主題分成n多個不同的
    隊列來實現存放消息。
    創建了一個主題:需要指定隊列個數 默認是4和16
    寫隊列數量: 對我們生產者寫投遞隊列數量 16
    讀隊列數量: 對我們消費者獲取消息隊列數量 16

    在rocketmq中,如果一個topic只有一個隊列的情況下支持的并行能力比較弱,所以會將一個topic分成分成n多個不同的隊列queue來實現存放, 類似于Kafka的分區模型概念。

    writeQueueNums:寫隊列數,表示producer發送到的MessageQueue的隊列個數
    readQueueNums:讀隊列數,表示Consumer讀取消息的MessageQueue隊列個數
    注:這兩個值需要相等,在集群模式下如果不相等,writeQueueNums=6,readQueueNums=3, 那么每個broker上會有3個queue的消息是無法消費的。
    perm 隊列的權限:2表示w、4表示r 6表示rw
    架構模擬圖

    注意我們rocketmq與kafka設計思想原理相同,消息存儲在mq服務器端中,消費者不管消費失敗還是成功,最終消息還是會存在mq服務器端,可以通過日志策略清理消息。

    RocketMQ解決方案

    如果現在消費者獲取消息、處理器都是同一個線程的情況下,
    有可能會影響到我們消費者速率—
    消費者獲取消息10毫秒—處理消息5s 5.001s
    Rocketmq中 消費者消費的消息,采用多線程的形式,需要注意消息順序一致性的問題。

    與kafka思想是一樣的,消費者不管消費成功還是失敗,最終消息不會立即被刪除。
    后期都是通過日志刪除策略,定時刪除消息。

    NameServer原理

    NameServer注冊中心 類似于zookeeper、nacos 存儲 服務節點信息。

    Rocketmq 不使用Zookeeper而使用Nameserver實現注冊中心呢?
    Cp(每個節點副本必須要保證數據一致性) ap(每個節點副本數據不一致
    ,保證可用性。)
    Kafka:副本機制 副本選舉依賴 控制器 選舉依賴Zookeeper—選舉過程

    在Rocketmq中 人為在配置文件中指定那個Broker 為主節點 ,而在kafka中依賴
    與zookeeper實現選舉的。

    NameServer 每個節點相互之間沒有數據過程、都是獨立的 (ap模式)
    保證可用性、不保證每個NameServer 數據的一致性問題。

    NameServer 與Broker 、生產者、消費者關系。

    NameServer 中存放那些信息呢?

  • NameServer 存儲Broker ip和端口號信息
  • 存放topic主題信息
  • NameServer 與Broker 關系

  • 在Broker配置文件中配置了多個不同的NameServer 集群地址
  • Broker 啟動的時候,讀取到該多個不同的NameServer 地址
  • Broker會與所有的NameServer 建立長連接
  • 會將自己的信息注冊到每個NameServer 上存儲起來。
  • nameServer如何知道Broker 宕機呢?

  • 續命的設計:Broker 與每個NameServer 建立長連接之后,每隔30s的時間
    發送一個心跳續約包給Broker 告訴我還在存活狀態
  • nameServer 定時器每隔10s的時間檢測 故障Broker ,如果發生故障Broker 會直接剔除。
    與nacos、eureka 實現服務注冊中心原理是一樣的。
  • 生產者:

  • 在生產者客戶端配置文件中,配置了多個連接nameserver地址
  • 采用輪詢算法 連接nameserver 如果能夠獲取到BrokerIP和端口信息
    發送請求
  • 獲取到BrokerIP 發送請求存儲該消息。
  • Broker 與 nameserver心跳續約間隔30s

    如果生產者獲取到一個故障的Broker地址,實現發送消息如果失敗的情況下
    如何處理
    答案:

  • 生產者會默認的情況下 會重試三次
  • 重試多次還是失敗的情況下,就從新連接nameserver 獲取下一條Broker地址
    實現調用 存儲消息。
  • 生產者ack模式 ,生產者必須要將消息落地存放在硬盤中 才認為消息投遞成功。
  • NameServer類似于zookeeper實現服務注冊中心

    為什么RocketMQ不使用zookeeper?而使用NameServer作為注冊中心呢?
    Zookeeper實現注冊中心 模式CP模式 保證數據的一致性
    NameServer 保證AP模式 可用性

    特點:
    NameServer集群之間不需要數據同步

    注冊分析:
    Broker啟動的時候,會讀取Broker.config 配置文件,獲取到多個不同的nameServer集群的地址列表,會將Broker信息注冊給所有的NameServer存儲與所有的NameServer保持長連接。
    注冊信息:
    Broker 的IP和端口號 主題信息、集群信息 過濾器等。

    NameSever如何知道我們Broker宕機呢?
    續約機制:
    Broker:
    Rocketmq底層基于netty實現的,每個Broker在默認每隔30
    S時間給nameServer發送一個心跳續約包,告訴我Broker還在存活。

    Name 每隔10s時間檢測,故障Broker節點,則剔除。

    Broker關系
    每個NameServer相互之間都是獨立,不會做任何數據同步,采用ap模式思想

    Broker信息注冊:Broker啟動的時候,會根據配置文件讀取到多個NameServer地址,輪詢的將Broker信息注冊到每個NameServer上,這樣每個NameServer都有該Broker信息。

    剔除:Broker需要每隔30s時間給每個NameServer發送一個心跳續約,如果沒有發送心跳續約的話,NameServer 會有一個定時器每隔每10s中掃描一次,檢測故障的Broker,則會剔除。
    最終每個Broker與NameSserver保持長連接。

    生產者關系

    如果是每隔30s發送續約,也就是意味著30s后 才能夠剔除該Broker
    真好在這時候 生產者發送到該故障節點如何處理?

    使用ack模式 確認刷盤成功 才屬于生產者消息投遞成功。

    相關疑問

    生產者發送消息三種模式
    Producer發送消息有三種方式:同步、異步和單向
    三種發送方式

  • 單向 生產者投遞消息到mq中,不需要返回結果。
    優點:延遲概率比較低
    缺點:丟失消息數據
    投遞消息過程比較耗時時間5毫秒

  • 異步 生產者投遞消息到mq中,使用回調形式返回。
    投遞消息過程比較耗時時間5毫秒
    補償----

  • 同步
    生產者投遞消息到mq中,采用同步的形式獲取到返回消息是否有
    投遞成功的結果,導致接口延遲概率比較大。
    投遞消息過程比較耗時時間10毫秒

  • 發送請求 基于請求與響應

    1.同步發送:發送請求模式屬于同步的,發送該條消息不需等待該條消息發送成功之后,才可以繼續發送下一條。
    2.異步發送:采用異步的發送模式,不需要同步阻塞等待,通過回調的形式監聽生產者消息投遞結果
    3單向發送:只負責發送消息給mq,不管是否有發送成功。

    同步發送

    /**

    • 同步發送
    • @throws Exception
      */
      @GetMapping("/sync")
      public void sync() {
      MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
      SendResult sendResult = rocketMQTemplate.syncSend(RocketMQConfig.TOPIC_NAME, msg);
      log.info(“同步發送字符串{}, 發送結果{}”, msg.toString(), sendResult);
      }

    異步發送
    /**

    • 異步發送

    • @throws Exception
      */
      @GetMapping(“async”)
      public void async() {
      MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
      log.info(">msg:<<" + msg);
      rocketMQTemplate.asyncSend(RocketMQConfig.TOPIC_NAME, msg.toString(), new SendCallback() {
      @Override
      public void onSuccess(SendResult var1) {
      log.info(“異步發送成功{}”, var1);
      }

      @Overridepublic void onException(Throwable var1) {log.info("異步發送失敗{}", var1);}

      });
      }

    單向發送

    /**

    • 普通消息投遞 單向發送
      */
      @GetMapping("/sendMsg")
      public String sendMsg() {
      MsgEntity msg = new MsgEntity(“mayikt” + UUID.randomUUID().toString(), 1234);
      rocketMQTemplate.convertAndSend(RocketMQConfig.TOPIC_NAME, msg);
      return “投遞消息 => " + msg.toString() + " => 成功”;
      }

    順序消息

    Rocketmq中,消費者處理消息業務邏輯的時候 是采用多線程。

    如何解決消息順序一致性的問題?

  • 生產者投遞消息根據key 投遞到同一個隊列中存放
  • 消費者應該訂閱到同一個隊列實現消費
  • 最終應該使用同一個線程去消費消息(不能夠實現多線程消費。)
  • 實際上做業務邏輯開發中,很少有需要保證消息順序一致性問題。

    1.在rocketmq 消費者默認是多線程異步消費的,開發者需要設定指定保證消息順序一致性問,也就是同一個隊列消息最終被同一個線程實現消費。

  • 生產者指定相同的消息key,根據hashKey運算投遞到同一個隊列中,最終被同一個消費者消費。
  • 相關代碼

    生產者

    String uuid = UUID.randomUUID().toString();
    SendResult result1 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “insert”, uuid);
    log.info(“insert:” + result1.toString());
    SendResult result2 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “update”, uuid);
    log.info(“update:” + result2.toString());
    SendResult result3 = rocketMQTemplate.syncSendOrderly(RocketMQConfig.TOPIC_SEQUENTIAL, “delete”, uuid);
    log.info(“delete:” + result3.toString());

    消費者

    @Service
    @Slf4j
    @RocketMQMessageListener(consumerGroup = “mayikt-group20”, topic = “topic_seq”, consumeMode = ConsumeMode.ORDERLY
    )
    public class RocketMQConsumer01 implements RocketMQListener {
    @Override
    public void onMessage(String msg) {
    try {
    Random r = new Random(100);
    int i = r.nextInt(500);
    Thread.sleep(i);
    } catch (Exception e) {

    }log.info("消費者監聽到消息:<msg:{}>", msg); }

    }

    結果

    消息存儲結構
    在win的安裝rocketmq,消息物理存放在
    C:\Users\Administrator\store

    commitlog:消息的存儲目錄
    config:運行期間一些配置信息
    consumequeue:消息消費隊列存儲目錄
    index:消息索引文件存儲目錄
    abort:如果存在abort文件說明Broker非正常關閉,該文件默認啟動時創建,正常退出時刪除
    checkpoint:文件檢測點。存儲commitlog文件最后一次刷盤時間戳、consumequeue最后一次刷盤時間、index索引文件最后一次刷盤時間戳。

    學習kafka 的時候,topic主題消息分成n多個不同的partition 分區存放,
    而在我們的rocketmq中,將一個topic主題的消息分成多個不同的Queue存放。

    前提條件:commitlog日志文件沒有滿的情況下:
    在rocketmq中所有topic主題對應的隊列的消息都會存放在同一個commitlog日志文件中,

    消費者在消費消息的時候 不會直接與commitlog日志文件打交道。
    Rocketmq 提供消費隊列 (邏輯概念)

    Commitlog日志文件 存放消息內容主體----Commitlogoffset

    ConsumeQueue 不存放消息主體,只存放消息的Commitlogoffset、msgsize、msgtag

    Queue offset與Commitlogoffset 之間區別?
    Queue offset—消息存放在ConsumeQueue 消費偏移量的位置
    Commitlogoffset ----消息物理存放位置

    [queueId=2, storeSize=242, queueOffset=25, sysFlag=0, bornTimestamp=1611665447721, bornHost=/192.168.31.1:55706, storeTimestamp=1611665447722, storeHost=/192.168.31.1:10911, msgId=C0A81F0100002A9F0000000000039330, commitLogOffset=234288, bodyCRC=336380854, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic=‘topic_ide_mayikt’, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=26,

    生產者如何投遞消息:

  • 生產者在投遞消息到mq服務器端,會將該消息存放在commitlog日志文件中(順序寫)、
  • Mq后臺就會開啟一個異步的線程將該commitlogoffset實現分配存放到不同隊列中。
  • 消費者如何消費:

  • 消費者消費消息的時候訂閱到隊列(consumequeue),根據queueoffset 獲取到該commitlogoffset
  • 在根據commitlogoffset 去commitlog日志文件中查找到該消息主體返回給客戶端。
  • 消息的commitlogoffset 如何存放在不同的consumequeue中。
    Consumequeue==16
    投遞消息 消息key
    消息key%16=1
    Consumequeue 中 Consumeoffset 對應 一條消息(沒有對應消息主體)—commitlogoffset

    消費者消費我們消息 在(kafka、rocketmq)中 消費成功或者失敗都不會立即將該消息刪除,日志清理策略刪除。

    Rocketmq消息存儲結構
    1.(commitlog日志文件沒有滿的情況下)在rocketmq中所有的消息日志,都存放在同一個commitlog日志文件中,(默認是1GB來存儲 好處1零拷貝映射 好處2 分段 非常好方式管理清理日志文件 commitlog命名就是上一個commitlog日志文件中最后一個commitlog-offset值)
    ,ConsumeQueue類似于在kafka中的 partition 分區模型, 而ConsumeQueue存放的是commitlog開始的commitLogoffset、msgsize、tag。
    2.每次消費者讀取消息的時候,先讀取ConsumeQueue中獲取到commitLogoffset,在根據該
    commitLogoffset查找commitLog日志文件獲取到消息體返回給消費者客戶端。

    3.與kafka的設計不同
    根據阿里巴巴消息中間件團隊的測試,如果每個topic中的partition 分區存儲的消息過多
    ,可能會影響到磁盤io的讀寫性能,所以采用ConsumeQueue存放少量的數據,消息讀取還是通過commitlog文件中查找。

  • 官方:為什么rocketmq commitlog日志文件大小最多只能有一個GB.
  • 分段存儲 后期日志清理比較方便;
  • 零拷貝
    Rocketmq讀寫使用:MappedByteBuffer 零拷貝
    因為rocketmq 中日志文件存儲采用文件映射機制+mmap 減少用戶態與內核態來回拷貝的次數,從而可以提高性能
    MappedByteBuffer 文件存儲映射,只能映射用戶態1.5GB-2GB 所以RocketMQ日志文件存儲commitlog默認為1G大小。
  • commitLog文件
    commitlog文件的存儲地址:KaTeX parse error: Undefined control sequence: \store at position 5: HOME\?s?t?o?r?e?\commitlog{fileName},每個文件的大小默認1G =102410241024,commitlog的文件名fileName,名字長度為20位,左邊補零,剩余為起始偏移量;比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當這個文件滿了,第二個文件名字為00000000001073741824,起始偏移量為1073741824,以此類推,第三個文件名字為00000000002147483648,起始偏移量為2147483648 ,消息存儲的時候會順序寫入文件,當文件滿了,寫入下一個文件。

    冪等問題

    整合方式

    消費者冪等問題

    當消費者消費消息失敗的時候,可以返回繼續重復消費,這時候rocketmq會給
    該消費者不斷的重試,重試的過程中需要注意冪等性問題。
    在rocketmq中,根據返回ack 狀態:
    ConsumeConcurrentlyStatus
    CONSUME_SUCCESS --消費成功
    RECONSUME_LATER— 消費失敗繼續重試消費。

    Messageid

    對我們每條消息生成一個messageid------根據該id 實現消費者冪等性問題

    Messageid 生成規則:關聯存放在那個mq服務器端、commitlog-offset值。

    Messageid --C0A812E0495C18B4AAC276054FC20005 轉碼
    獲取到commitlog-offset —查找物理對應消息位置。

    Rocketmq中的 messageid 長度一共占用16個字節,其中包含消息存儲的ip和端口號,
    消息commitLogOffset,客戶端發送請求傳遞messageid 給mq服務器端,mq服務器端
    根據該commitlogoffset值查找對應的消息返回給客戶端。
    消息過濾

    1.Tag就是相當于給消息打一個標簽,比如我的文章 屬于那些標簽下,用戶如果訂閱了該標簽就可能刷到該文章。
    2. 一個消息可能打上了多個tag標簽,消費者訂閱該主題和tag標簽,標簽如果一致的情況下就可以獲取到該消息。
    原理:在我們ConsumeQueue中,存儲結構為 commitoffset、size、messagetag 會根據消費者傳遞的tag hash值與隊列中每條消息的tag hash 做比較,如果相等的情況則獲取該消息,如果不相等的情況下,則不會獲取該消息。

    消費者組
    與kafka基本原理相同,同一個消費者中,只能有一個消費者消費消息,
    多個不同的消費者組可以消費同一條消息。
    消費成功之后,提交offset。
    分布式事務
    消息的可用性

    刷盤:將數據從pagecache中刷盤到硬盤中存儲
    ,避免數據的丟失。
    刷盤模式:
    1.同步刷盤:
    生產者必須等待該消息 從pagecache刷盤到硬盤中,在返回
    Ack給生產者。
    優點:可以保證消息不丟失
    缺點:影響到整體接口吞吐量
    應用場景:金融支付類 msg

    2.異步刷盤

    生產者不需要等待pagecache刷盤到硬盤中結果,完全采用異步的形式
    優點:
    提高整體接口吞吐量
    缺點:
    有可能消息會丟失 (概率不是很大)

    應用場景:行為分析---- 100 -12 98%

    刷盤策略

    RocketMQ,默認會將消息持久化存放在硬盤,首先會
    寫入到系統PageCahe中,讓后再刷盤到硬盤,這樣就可以保證
    硬盤與PageCache中數據完全一致性。

    同步刷盤:

    消息真正刷盤到磁盤中,才會返回給生產者,只要磁盤沒有壞,這樣做
    可以保證消息不丟失,但是可能會影響整體的吞吐量。

    異步刷盤:

    讀寫充分利用pagecache,消息寫入到pagecache成功之后,采用異步的形式
    刷盤到硬盤中,可以提高系統的吞吐量,但是可能消息會丟失。

    RocketMQ解決分布式事務問題

    RocketMQ解決分布式事務思想與原理

    RocketMQ在其消息定義的基礎上,對事務消息擴展了兩個相關的概念:
    1.Half(Prepare) Message——半消息(預處理消息)
    半消息是一種特殊的消息類型,該狀態的消息暫時不能被Consumer消費。當一條事務消息被成功投遞到Broker上,但是Broker并沒有接收到Producer發出的二次確認時,該事務消息就處于"暫時不可被消費"狀態,該狀態的事務消息被稱為半消息。
    2.Message Status Check——消息狀態回查
    由于網絡抖動、Producer重啟等原因,可能導致Producer向Broker發送的二次確認消息沒有成功送達。如果Broker檢測到某條事務消息長時間處于半消息狀態,則會主動向Producer端發起回查操作,查詢該事務消息在Producer端的事務狀態(Commit 或 Rollback)??梢钥闯?#xff0c;Message Status Check主要用來解決分布式事務中的超時問題。

    RocketMQ的事務消息是基于兩階段提交實現的,也就是說消息有兩個狀態,prepared和commited。當消息執行完send方法后,進入的prepared狀態,進入prepared狀態以后,就要執行executeLocalTransaction方法,這個方法的返回值有3個,也決定著這個消息的命運,

    COMMIT_MESSAGE:提交消息,這個消息由prepared狀態進入到commited狀態,消費者可以消費這個消息;
    ROLLBACK_MESSAGE:回滾,這個消息將被刪除,消費者不能消費這個消息;
    UNKNOW:未知,這個狀態有點意思,如果返回這個狀態,這個消息既不提交,也不回滾,還是保持prepared狀態,而最終決定這個消息命運的,是checkLocalTransaction這個方法。

    RocketMQ實現分布式事務的原理:

  • 生產者投遞一個半消息給我們RocketMQ服務器端存放,該消息暫時無法被我們消費者
    消費。
  • RocketMQ將該消息落地存放硬盤中,RocketMQ發送ACK給生產者。
  • 生產者收到事件監聽之后,開始執行生產者本地事務的操作;
  • 如果生產者執行本地的事務操作,如果成功的情況下,則發送一個提交通知給RocketMQ
    服務器端,RocketMQ服務器端將該消息,推送給消費者消費。
  • 如果生產者執行本地事務操作,如果失敗的情況下,則發送一個回滾通知給rocketmq服務器端,rocketmq服務器端在從本地將該消息刪除,不會給消費者消費。
    核心思想:確保生產者一定將消息投遞到mq服務器端,生產者必須先一定執行完成,在執行消費者。
  • RocketMQLocalTransactionState.ROLLBACK;—回滾
    RocketMQLocalTransactionState.COMMIT—提交
    RocketMQLocalTransactionState.UNKNOWN— 不是提交也不是回滾。

    存在的一些問題:

  • 如果生產者將該消息投遞成功之后,但是生產者如果執行本地事務如果失敗的情況下,
    發送ack給rocketmq 回滾該消息即可,不會被消費者消費。
  • 如果生產者將該消息投遞成功之后,本地事務執行成功呢,但是不返回狀態給rocketmq,如何處理呢?
  • Rocketmq服務器端 在默認的情況下 每隔60s 檢查本地事務是否已經執行過,如果執行過的情況下,則提交該提交,如果沒有執行該事務的情況下,則回滾。

    常見錯誤
    Lock failed,MQ already started

    將 broker 的 master 和 slave 節點放在同一臺機器上,配置的storePath相同導致的,修改配置文件,改為不同的路徑即可解決。

    總結

    以上是生活随笔為你收集整理的RocketMQ简介的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。