【消息中间件】RabbitMQ 高级特性与应用问题
消息的可靠投遞
在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩種方式用來控制消息的投遞可靠性模式。
- confirm 確認模式
- return 退回模式
rabbitmq 整個消息投遞的路徑為:
producer—>rabbitmq broker—>exchange—>queue—>consumer
- 消息從 producer 到 exchange 則會返回一個 confirmCallback 。
- 消息從 exchange–>queue 投遞失敗則會返回一個 returnCallback 。
我們將利用這兩個 callback 控制消息的可靠性投遞
-
設置ConnectionFactory的publisher-confirms=“true” 開啟 確認模式。
-
使用rabbitTemplate.setConfirmCallback設置回調函數。當消息發送到exchange后回調confirm方法。在方法中判斷ack,如果為true,則發送成功,如果為false,則發送失敗,需要處理。
-
設置ConnectionFactory的publisher-returns=“true” 開啟 退回模式。
-
使用rabbitTemplate.setReturnCallback設置退回函數,當消息從exchange路由到queue失敗后,如果設置了rabbitTemplate.setMandatory(true)參數,則會將消息退回給producer。并執行回調函數returnedMessage。
-
在RabbitMQ中也提供了事務機制,但是性能較差
使用channel下列方法,完成事務控制:
- txSelect(), 用于將當前channel設置成transaction模式
- txCommit(),用于提交事務
- txRollback(),用于回滾事務
Consumer Ack
ack指Acknowledge,確認。 表示消費端收到消息后的確認方式。
有三種確認方式:
- 自動確認:acknowledge=“none”
- 手動確認:acknowledge=“manual”
- 根據異常情況確認:acknowledge=“auto”
其中自動確認是指,當消息一旦被Consumer接收到,則自動確認收到,并將相應 message 從 RabbitMQ 的消息緩存中移除。
但是在實際業務處理中,很可能消息接收到,業務處理出現異常,那么該消息就會丟失
如果設置了手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收
如果出現異常,則調用channel.basicNack()方法,讓其自動重新發送消息
- 在rabbit:listener-container標簽中設置acknowledge屬性,設置ack方式 none:自動確認,manual:手動確認
- 如果在消費端沒有出現異常,則調用channel.basicAck(deliveryTag,false);方法確認簽收消息
- 如果出現異常,則在catch中調用 basicNack或 basicReject,拒絕消息,讓MQ重新發送消息。
消費端限流
- 在rabbit:listener-container 中配置 prefetch屬性設置消費端一次拉取多少消息
- 消費端的確認模式一定為手動確認。acknowledge=“manual”
TTL
TTL 全稱 Time To Live(存活時間/過期時間)。當消息到達存活時間后,還沒有被消費,會被自動清除。RabbitMQ可以對消息設置過期時間,也可以對整個隊列(Queue)設置過期時間。
- 設置隊列過期時間使用參數:x-message-ttl,單位:ms(毫秒),會對整個隊列消息統一過期。
- 設置消息過期時間使用參數:expiration。單位:ms(毫秒),當該消息在隊列頭部時(消費時),會單獨判斷這一消息是否過期。
- 如果兩者都進行了設置,以時間短的為準。
死信隊列
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX
消息成為死信的三種情況:
1. 隊列消息長度到達限制;
2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;
3. 原隊列存在消息過期設置,消息到達超時時間未被消費;
隊列綁定死信交換機:
給隊列設置參數: x-dead-letter-exchange 死信交換機名稱和 x-dead-letter-routing-key 發送給死信交換機的routingkey
1. 死信交換機和死信隊列和普通的沒有區別
2. 當消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
延遲隊列
延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
很可惜,在RabbitMQ中并未提供延遲隊列功能。
但是可以使用:TTL+死信隊列 組合實現延遲隊列的效果。
日志與監控
RabbitMQ日志
RabbitMQ默認日志存放路徑: /var/log/rabbitmq/rabbit@xxx.log
日志包含了RabbitMQ的版本號、Erlang的版本號、RabbitMQ服務節點名稱、cookie的hash值、RabbitMQ配置文件地址、內存限制、磁盤限制、默認賬戶guest的創建以及權限配置等等。
web管控臺監控
rabbitmqctl管理和監控查看隊列# rabbitmqctl list_queues查看exchanges# rabbitmqctl list_exchanges查看用戶# rabbitmqctl list_users查看連接# rabbitmqctl list_connections查看消費者信息# rabbitmqctl list_consumers查看環境變量# rabbitmqctl environment查看未被確認的隊列# rabbitmqctl list_queues name messages_unacknowledged查看單個隊列的內存使用# rabbitmqctl list_queues name memory查看準備就緒的隊列# rabbitmqctl list_queues name messages_ready消息追蹤
在使用任何消息中間件的過程中,難免會出現某條消息異常丟失的情況。對于RabbitMQ而言,可能是因為生產者或消費者與RabbitMQ斷開了連接,而它們與RabbitMQ又采用了不同的確認機制;也有可能是因為交換器與隊列之間不同的轉發策略;甚至是交換器并沒有與任何隊列進行綁定,生產者又不感知或者沒有采取相應的措施;另外RabbitMQ本身的集群策略也可能導致消息的丟失。這個時候就需要有一個較好的機制跟蹤記錄消息的投遞過程,以此協助開發和運維人員進行問題的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能來實現消息追蹤。
消息追蹤-Firehose
firehose的機制是將生產者投遞給rabbitmq的消息,rabbitmq投遞給消費者的消息按照指定的格式發送到默認的exchange上。這個默認的exchange的名稱為amq.rabbitmq.trace,它是一個topic類型的exchange。發送到這個exchange上的消息的routing key為 publish.exchangename 和deliver.queuename。其中exchangename和queuename為實際exchange和queue的名稱,分別對應生產者投遞到exchange的消息,和消費者從queue上獲取的消息。
注意:打開 trace 會影響消息寫入功能,適當打開后請關閉。
rabbitmqctl trace_on:開啟Firehose命令
rabbitmqctl trace_off:關閉Firehose命令
消息追蹤-rabbitmq_tracing
rabbitmq_tracing和Firehose在實現上如出一轍,只不過rabbitmq_tracing的方式比Firehose多了一層GUI的包裝,更容易使用和管理。
啟用插件:rabbitmq-plugins enable rabbitmq_tracing
RabbitMQ 應用問題
消息可靠性保障
100%確保消息發送成功
消息補償
消息冪等性保障
冪等性指一次和多次請求某一個資源,對于資源本身應該具有同樣的結果。也就是說,其任
意多次執行對資源本身所產生的影響均與一次執行的影響相同。
在MQ中指,消費多條相同的消息,得到與消費該消息一次相同的結果。
總結
以上是生活随笔為你收集整理的【消息中间件】RabbitMQ 高级特性与应用问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【消息中间件】Spring Boot整合
- 下一篇: 【JVM调优】JVM指令集大全