日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMq重试及消息不丢失机制

發布時間:2023/12/4 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMq重试及消息不丢失机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、消息重試機制

由于MQ經常處于復雜的分布式系統中,考慮網絡波動、服務宕機、程序異常因素,很有可能出現消息發送或者消費失敗的問題。因此,消息的重試就是所有MQ中間件必須考慮到的一個關鍵點。如果沒有消息重試,就可能產生消息丟失的問題,可能對系統產生很大的影響。所以,秉承寧可多發消息,也不可丟失消息的原則,大部分MQ都對消息重試提供了很好的支持。

RocketMQ為使用者封裝了消息重試的處理流程,無需開發人員手動處理。RocketMQ支持了生產端和消費端兩類重試機制。

1.1 生產端重試

生產端配置的有發送失敗重試次數,默認為2。使用了set方法對外進行暴露,producer客戶端可以改寫這個默認值。

public DefaultMQProducer(String producerGroup, RPCHook rpcHook) {this.createTopicKey = "TBW102";this.defaultTopicQueueNums = 4;this.sendMsgTimeout = 3000;this.compressMsgBodyOverHowmuch = 4096;//發送失敗,重試次數this.retryTimesWhenSendFailed = 2;this.retryAnotherBrokerWhenNotStoreOK = false;this.maxMessageSize = 131072;this.unitMode = false;this.producerGroup = producerGroup;this.defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}

1.2 消費端重試

消費者消費消息后,需要給Broker返回消費狀態。以MessageListenerConcurrently監聽器為例,Consumer消費完成后需要返回ConsumeConcurrentlyStatus并發消費狀態。查看源碼,ConsumeConcurrentlyStatus是一個枚舉,共有兩種狀態:

public enum ConsumeConcurrentlyStatus {//消費成功ConsumeConcurrentlyStatus,//消費失敗,一段時間后重試RECONSUME_LATER; }

RECONSUME_LATER代表因為某種原因,消費失敗,稍后再試。后續會再次消費
官方文檔介紹如下:

RocketMQ中的消息無法無限次重新消費,當然了,手動修改重試次數是可以的,不介入的話不行。當重試次數超過所有延遲級別之后。消息會進入死信,死信Topic的命名為:%DLQ% + Consumer組名。

進入死信之后的消息肯定不會再投遞了,不過可以通過接口去查詢當前RocketMQ中死信隊列的消息。如果在上層實現自有命令,那么可以將消息從死信中移出并重新投遞。

死信消息具有以下特性:

  • 不會再被消費者正常消費。
  • 有效期與正常消息相同,均為 3 天,3 天后會被自動刪除。因此,請在死信消息產生后的 3 天內及時處理。

2、保證消息不丟失

分別從Producer發送機制、Broker的持久化機制,以及消費者的offSet機制來最大程度保證消息不易丟失

  • 從Producer的視角來看:如果消息未能正確的存儲在MQ中,或者消費者未能正確的消費到這條消息,都是消息丟失。
  • 從Broker的視角來看:如果消息已經存在Broker里面了,如何保證不會丟失呢(宕機、磁盤崩潰)
  • 從Consumer的視角來看:如果消息已經完成持久化了,但是Consumer取了,但是未消費成功且沒有反饋,就是消息丟失

從Producer分析:如何確保消息正確的發送到了Broker?

  • 默認情況下,可以通過同步的方式阻塞式的發送,check SendStatus,狀態是OK,表示消息一定成功的投遞到了Broker,狀態超時或者失敗,則會觸發默認的2次重試。此方法的發送結果,可能Broker存儲成功了,也可能沒成功
  • 采取事務消息的投遞方式,并不能保證消息100%投遞成功到了Broker,但是如果消息發送Ack失敗的話,此消息會存儲在CommitLog當中,但是對ConsumerQueue是不可見的。可以在日志中查看到這條異常的消息,嚴格意義上來講,也并沒有完全丟失
  • RocketMQ支持 日志的索引,如果一條消息發送之后超時,也可以通過查詢日志的API,來check是否在Broker存儲成功

從Broker分析:如果確保接收到的消息不會丟失?

  • 消息支持持久化到Commitlog里面,即使宕機后重啟,未消費的消息也是可以加載出來的
  • Broker自身支持同步刷盤、異步刷盤的策略,可以保證接收到的消息一定存儲在本地的內存中
  • Broker集群支持 1主N從的策略,支持同步復制和異步復制的方式,同步復制可以保證即使Master 磁盤崩潰,消息仍然不會丟失

從Cunmser分析:如何確保拉取到的消息被成功消費?

  • 消費者可以根據自身的策略批量Pull消息
  • Consumer自身維護一個持久化的offset(對應MessageQueue里面的min offset),標記已經成功消費或者已經成功發回到broker的消息下標
  • 如果Consumer消費失敗,那么它會把這個消息發回給Broker,發回成功后,再更新自己的offset
  • 如果Consumer消費失敗,發回給broker時,broker掛掉了,那么Consumer會定時重試這個操作
  • 如果Consumer和broker一起掛了,消息也不會丟失,因為consumer 里面的offset是定時持久化的,重啟之后,繼續拉取offset之前的消息到本地
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的RocketMq重试及消息不丢失机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。