日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 基本概念与高级特性

發(fā)布時(shí)間:2025/3/12 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 基本概念与高级特性 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 1. 什么是消息隊(duì)列
    • 1.1 消息隊(duì)列概述
    • 1.2 使用消息隊(duì)列的優(yōu)勢(shì)
    • 1.3 使用消息隊(duì)列的劣勢(shì)
    • 1.4 常見的消息隊(duì)列產(chǎn)品對(duì)比
  • 2. RabbitMQ 基本概念
    • 2.1 RabbitMQ 概述
    • 2.2 RabbitMQ 的概念模型
      • 2.2.1 Message
      • 2.2.2 Publisher
      • 2.2.3 Exchange
      • 2.2.4 Binding
      • 2.2.5 Queue
      • 2.2.6 Connection
      • 2.2.7 Channel
      • 2.2.8 Virtual Host
      • 2.2.9 Consumer
      • 2.2.10 Broker
    • 2.3 RabbitMQ 的六種工作模式
      • 2.3.1 工作隊(duì)列模式
      • 2.3.2 發(fā)布/訂閱模式
      • 2.3.3 路由模式
      • 2.3.4 主題模式
      • 2.3.5 RPC 模式
      • 2.3.6 消息頭模式
  • 3. RabbitMQ 高級(jí)特性
    • 3.1 消息投遞時(shí)序圖
    • 3.2 消息的可靠投遞
      • 3.2.1 publishConfirm 機(jī)制
      • 3.2.2 returnCallback 機(jī)制
      • 3.2.3 消息確認(rèn) ACK 機(jī)制
        • 3.2.3.1 消息確認(rèn) ACK 機(jī)制的基本概念
        • 3.2.3.2 Spring AMQP 實(shí)現(xiàn)手動(dòng)確認(rèn) ACK 機(jī)制
    • 3.3 死信隊(duì)列
    • 3.4 延時(shí)隊(duì)列
      • 3.4.1 TTL + 死信隊(duì)列實(shí)現(xiàn)延時(shí)隊(duì)列
      • 3.4.2 x-delay-message-exchange 插件實(shí)現(xiàn)延時(shí)隊(duì)列
  • 4. RabbitMQ 如何解決問題
    • 4.1 使用 RabbitMQ 集群實(shí)現(xiàn)高可用
    • 4.2 如何保證消息不被重復(fù)消費(fèi)
    • 4.3 如何確保消息不丟失
      • 4.3.1 消息從生產(chǎn)者到 broker 之間的防丟失
      • 4.3.2 消息在 broker 中的防丟失
      • 4.3.3 消息從 broker 到消費(fèi)者之間的防丟失
    • 4.4 如何保證消息傳遞的順序性

1. 什么是消息隊(duì)列

1.1 消息隊(duì)列概述

消息隊(duì)列(Message Queue,以下簡(jiǎn)稱 MQ)是在消息的傳輸過程中保存消息的容器,多用于分布式系統(tǒng)服務(wù)間的通信

1.2 使用消息隊(duì)列的優(yōu)勢(shì)

使用消息隊(duì)列主要有解耦、異步、削峰的優(yōu)勢(shì)。

  • 應(yīng)用解耦:可以提高系統(tǒng)的容錯(cuò)性和可維護(hù)性
  • 異步提速:提升用戶體驗(yàn)和系統(tǒng)吞吐量
  • 削峰填谷:提高系統(tǒng)的穩(wěn)定性

1.3 使用消息隊(duì)列的劣勢(shì)

使用消息隊(duì)列主要有系統(tǒng)可用性降低、復(fù)雜度提高的劣勢(shì)。

  • 系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,則系統(tǒng)的穩(wěn)定性越低。一旦 MQ 宕機(jī),則系統(tǒng)中依賴 MQ 實(shí)現(xiàn)的功能全部不可用。所以 MQ 一定要 高可用。
  • 系統(tǒng)的復(fù)雜度提高:引入 MQ 后需要考慮的問題會(huì)比較多且比較棘手,主要有以下幾個(gè)方面的問題:
    • 如何保證消息不被重復(fù)消費(fèi)?
    • 如何處理消息丟失的情況?
    • 如何保證消息傳遞的順序性?

1.4 常見的消息隊(duì)列產(chǎn)品對(duì)比

市面上常見的消息隊(duì)列產(chǎn)品主要有 RabbitMQ 、ActiveMQ、RocketMQ、Kafka。

特性RabbitMQActiveMQRocketMQKafka
單機(jī)吞吐量萬級(jí)(高于 ActiveMQ)萬級(jí)(次于 RabbitMQ)10 萬級(jí),支持高吞吐10 萬級(jí),支持高吞吐
topic 數(shù)量對(duì)吞吐量的影響topic 可以達(dá)到幾百/幾千的級(jí)別,吞吐量會(huì)有較小幅度的下降,這是 RocketMQ 的一大優(yōu)勢(shì),在同等級(jí)旗下,可以支撐大量的 topictopic 到幾十/幾百的級(jí)別時(shí),吞吐量會(huì)大幅度下降。在同等機(jī)器下,如果要支持大規(guī)模的 topic,需要增加更多的機(jī)器資源
協(xié)議支持AMQP、XMPP、SMTP、STOMPAMQP、OpenWire、STOMP、REST、XMPP自定義自定義協(xié)議,社區(qū)封裝了 HTTP 協(xié)議支持
時(shí)效性微秒(us) 級(jí),RabbitMQ 的最大特點(diǎn),延遲最低毫秒(ms)級(jí)毫秒(ms)級(jí)毫秒(ms)級(jí)內(nèi)
可用性高,基于主從架構(gòu)可實(shí)現(xiàn)高可用高,基于主從架構(gòu)可實(shí)現(xiàn)高可用非常高,分布式架構(gòu)非常高,分布式架構(gòu),一個(gè)數(shù)據(jù)多個(gè)副本,少數(shù)機(jī)器宕機(jī)不會(huì)丟失數(shù)據(jù),不會(huì)導(dǎo)致不可用
消息可靠性基本不丟有較低的概率丟失數(shù)據(jù)經(jīng)過參數(shù)優(yōu)化配置,可以做到 0 丟失經(jīng)過參數(shù)優(yōu)化配置,可以做到 0 丟失
功能支持基于 erlang 開發(fā),并發(fā)能力很強(qiáng),性能極好,延時(shí)最低,社區(qū)活躍,管理界面功能豐富老牌產(chǎn)品,成熟度高,文檔多MQ 功能較為完善,基于分布式架構(gòu),擴(kuò)展性好功能較為簡(jiǎn)單,只支持簡(jiǎn)單的 MQ 功能,在大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算以及日志采集被大規(guī)模使用

2. RabbitMQ 基本概念

2.1 RabbitMQ 概述

RabbitMQ 是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息中間件。主要的特點(diǎn)包括:

  • 可靠性(Reliability):RabbitMQ使用一些機(jī)制來保證可靠性,如持久化、傳輸確認(rèn)、發(fā)布確認(rèn)
  • 靈活的路由(Flexible Routing):在消息進(jìn)入隊(duì)列之前,通過 Exchange 來路由消息。對(duì)于典型的路由功能,RabbitMQ已經(jīng)提供了一些內(nèi)置的 Exchange 來實(shí)現(xiàn)。針對(duì)更復(fù)雜的路由功能,可以將多個(gè) Exchange 綁定在一起,也可以通過插件機(jī)制實(shí)現(xiàn)自己的 Exchange。
  • 消息集群(Clustering):多個(gè) RabbitMQ服務(wù)器可以組成一個(gè)集群,形成一個(gè)邏輯 Broker。
  • 高可用(Highly Available Queues):隊(duì)列可以在集群中的機(jī)器上進(jìn)行同步,使得在部分節(jié)點(diǎn)出問題的情況下隊(duì)列仍然可用。
  • 多種協(xié)議(Multi-protocol):RabbitMQ 支持多種消息隊(duì)列協(xié)議,比如 STOMP、MQTT。
  • 多語言客戶端(Many Clients):RabbitMQ 幾乎支持所有的常用編程語言。
  • 管理界面(Management UI):RabbitMQ 提供了一個(gè)易用的用戶界面。
  • 跟蹤機(jī)制(Tracing):如果消息異常,RabbitMQ 提供了消息跟蹤機(jī)制,可以方便定位問題。
  • 插件機(jī)制(Plugin System):RabbitMQ 提供插件機(jī)制,可以根據(jù)自己的需求獲得或自己編寫插件以滿足定制化需求。

2.2 RabbitMQ 的概念模型

2.2.1 Message

Message 即消息。消息是不具名的,由**消息頭(header)消息體(payload)**組成。

  • 消息體由二進(jìn)制數(shù)組承載,保證了傳輸?shù)母咝院鸵欢ǖ陌踩浴?/li>
  • 消息頭由一系列的可選屬性組成。包括 路由鍵(routing-key)消息優(yōu)先級(jí)(priority)、**是否需要持久化(delivery-mode)**等組成。

2.2.2 Publisher

Publisher 即消息的生產(chǎn)者。也是一個(gè)向 broker 發(fā)布消息的客戶端應(yīng)用程序。

2.2.3 Exchange

Exchange 即交換機(jī),用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給與其綁定的隊(duì)列。
Exchange 分發(fā)消息根據(jù)類型不同,分發(fā)策略隨之不同,一共由四種類型:

  • direct:只有消息中的路由鍵(Routing Key)和綁定關(guān)系(Binding)完全匹配時(shí),交換機(jī)才會(huì)將消息分發(fā)到對(duì)應(yīng)的隊(duì)列中。
  • fanout:每條發(fā)送到 fanout 類型的交換機(jī)上的消息都會(huì)被分發(fā)到所有綁定到這個(gè)交換機(jī)的隊(duì)列上去。fanout 類型的交換機(jī)不處理路由鍵的匹配邏輯,只是簡(jiǎn)單地將消息分發(fā)到所有綁定的隊(duì)列上去。由于在分發(fā)過程中少處理了路由鍵匹配的邏輯,fanout 類型的交換機(jī)轉(zhuǎn)發(fā)消息效率是最高的。
  • topic:在分發(fā)過程中需要處理路由鍵匹配的邏輯,匹配邏輯如下:
    • 單詞與單詞之間用 . 隔開
    • * 匹配一個(gè)單詞
    • # 匹配 0 個(gè)或多個(gè)單詞
    • *、# 只能單獨(dú)用作占位符,不能與單詞貼著使用
  • headers:在分發(fā)過程中不需要處理路由鍵匹配的邏輯,而使用**綁定參數(shù)(Arguments)**來進(jìn)行匹配,匹配邏輯如下:
    • Routing Key 不參與路由邏輯
    • 如果綁定關(guān)系中 Arguments 中的 x-match = all 時(shí),表示發(fā)送的消息屬性中的 header 中的所有鍵值對(duì)與綁定關(guān)系中聲明的 Arguments 中的所有鍵值對(duì)都相等才能匹配,可以理解為消息屬性中的 header 中的所有鍵值對(duì)包含綁定關(guān)系中聲明的 Arguments 中的所有鍵值對(duì)。
    • 如果綁定關(guān)系中 Arguments 中的 x-match = any 時(shí),表示發(fā)送的消息屬性中的 header 中任一鍵值對(duì)與 Arguments 中相等就能匹配。

2.2.4 Binding

Binding 即隊(duì)列與交換機(jī)的綁定關(guān)系,可以理解為交換機(jī)和隊(duì)列的關(guān)系中間表。Binding 包括以下兩部分:

  • Routing Key:即路由鍵,可以理解為消息的路由關(guān)鍵字。在交換機(jī)類型為 direct 或 topic 時(shí),將參與路由邏輯。
  • Arguments:即綁定參數(shù),可以理解為消息的路由參數(shù),鍵值對(duì)的形式。在交換機(jī)類型為 Headers 時(shí),將參與路由邏輯。

2.2.5 Queue

Queue 即隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是存放消息的容器,一個(gè)消息可以被投入到一個(gè)或多個(gè)隊(duì)列。消息將一直呆在隊(duì)列中,等待消費(fèi)者將其從隊(duì)列中取走。

it's essentially a large message buffer. —— 本質(zhì)上隊(duì)列就是一個(gè)巨大的消息緩沖區(qū)。

RabbitMQ 官網(wǎng)上如是介紹 Queue。


隊(duì)列有幾個(gè)非常重要的屬性:

  • TTL:即 Time-To-Live 屬性,隊(duì)列的最大存活時(shí)間。通過在聲明隊(duì)列時(shí),設(shè)置 x-message-ttl 參數(shù)(單位:ms)來完成。隊(duì)列中的所有消息都會(huì)遵守這個(gè)時(shí)間的限制。
  • 最大長(zhǎng)度限制:顧名思義,即隊(duì)列的大小。這個(gè)最大長(zhǎng)度可以是針對(duì)隊(duì)列中就緒狀態(tài)消息數(shù)量的限制,也可以是針對(duì)隊(duì)列中所有就緒狀態(tài)消息的總字節(jié)數(shù)的限制,且兩個(gè)限制可以同時(shí)存在。(注意,這里說的大小都是針對(duì)隊(duì)列中就緒狀態(tài)的消息來進(jìn)行統(tǒng)計(jì)的。)
    • 消息數(shù)量限制:通過設(shè)置 x-max-length 來實(shí)現(xiàn)
    • 消息總字節(jié)數(shù)限制:通過設(shè)置 x-max-length-bytes 來實(shí)現(xiàn)(單位:字節(jié),只計(jì)算消息頭的字節(jié)數(shù),不計(jì)算消息頭、消息屬性占用的字節(jié)數(shù))
  • 隊(duì)列溢出行為:當(dāng)隊(duì)列中的就緒狀態(tài)消息超出了消息的最大長(zhǎng)度限制后,將會(huì)有隊(duì)列溢出行為發(fā)生,由 x-overflow 進(jìn)行設(shè)置
    • drop-head:刪除隊(duì)列頭部的消息
    • reject-publish:丟掉最近接收的消息
    • reject-publish-dlx:將消息發(fā)往死信隊(duì)列

2.2.6 Connection

Connection 即網(wǎng)絡(luò)連接

2.2.7 Channel

Channel 即信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真是的 TCP 連接內(nèi)的虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息,訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成的。因?yàn)閷?duì)于 OS 來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用 TCP 連接。

2.2.8 Virtual Host

Virtual Host 即虛擬主機(jī),可以理解為虛擬消息服務(wù)器。每個(gè) Virtual Host 相當(dāng)于一個(gè)獨(dú)立的 RabbitMQ 服務(wù)器,每個(gè) Virtual Host 之間的數(shù)據(jù)是相互隔離的,不能互通。

可以將 Virtual Host 類比為 MySQL 中的 DataBase 的概念。

2.2.9 Consumer

Consumer 即消息的消費(fèi)者,表示一個(gè)從 broker 中獲取消息的客戶端應(yīng)用程序。

2.2.10 Broker

Broker 即消息隊(duì)列服務(wù)實(shí)體。

2.3 RabbitMQ 的六種工作模式

2.3.1 工作隊(duì)列模式

工作隊(duì)列模式

工作隊(duì)列模式實(shí)現(xiàn):

  • 生產(chǎn)者和消費(fèi)者需要指定同一個(gè)交換機(jī)
  • 生產(chǎn)者發(fā)送時(shí)指定路由名稱為隊(duì)列名稱
  • 消費(fèi)者將隊(duì)列綁定至交換機(jī)上,無需指定路由鍵

工作隊(duì)列模式下,多個(gè)消費(fèi)者監(jiān)聽同一個(gè)隊(duì)列的時(shí)候,一條消息只會(huì)有一個(gè)消費(fèi)者接收到。

2.3.2 發(fā)布/訂閱模式

發(fā)布訂閱模式

發(fā)布訂閱模式實(shí)現(xiàn):

  • 生產(chǎn)者和所有消費(fèi)者需要指定同一個(gè)交換機(jī)
  • 指定交換機(jī)類型為 fanout
  • 生產(chǎn)者發(fā)送時(shí)無需指定 Routing Key
  • 消費(fèi)者綁定各自隊(duì)列到交換機(jī)上

發(fā)布訂閱模式下,所有綁定了交換機(jī)的隊(duì)列都會(huì)收到消息;但如果多個(gè)消費(fèi)者監(jiān)聽了同一個(gè)隊(duì)列,一條消息還是只會(huì)有一個(gè)消費(fèi)者收到。

2.3.3 路由模式

路由模式

路由模式實(shí)現(xiàn):

  • 生產(chǎn)者和所有消費(fèi)者需要指定同一個(gè)交換機(jī)
  • 指定交換機(jī)類型為 direct
  • 生產(chǎn)者發(fā)送時(shí)指定 Routing Key
  • 消費(fèi)者將各自需要監(jiān)聽的隊(duì)列通過自定義 Routing Key 綁定到交換機(jī)上

路由模式下,只有與生產(chǎn)者指定了相同的 Routing Key 的隊(duì)列才會(huì)收到消息;如果兩個(gè)隊(duì)列綁定的 Routing Key 相同,則都會(huì)收到消息。

2.3.4 主題模式

主題模式

主題模式實(shí)現(xiàn):

  • 生產(chǎn)者和所有消費(fèi)者需要指定同一個(gè)交換機(jī)
  • 指定交換機(jī)類型為 topic
  • 生產(chǎn)者發(fā)送時(shí)指定 Routing Key
  • 消費(fèi)者將各自需要監(jiān)聽的隊(duì)列通過自定義 Routing Key 匹配格式綁定到交換機(jī)上。
    • 多個(gè)單詞用 . 隔開,否則視為一個(gè)單詞
    • # 匹配零個(gè)或多個(gè)單詞,* 只能匹配一個(gè)單詞

主題模式下,只有生產(chǎn)者的 Routing Key 與隊(duì)列綁定的 Routing Key 格式匹配成功的隊(duì)列才能收到消息。如果幾個(gè)隊(duì)列的 Routing Key 格式都能匹配上,則都會(huì)收到消息。

2.3.5 RPC 模式

RPC 模式

RPC 模式 Spring AMQP 實(shí)現(xiàn)

  • 客戶端和服務(wù)端需要指定同一個(gè)交換機(jī)
  • 指定交換機(jī)類型為 direct
  • 客戶端發(fā)送請(qǐng)求時(shí)指定 Routing Key ,且使用 RabbitMQ.convertAndSendAndRecive 方法將請(qǐng)求入?yún)魅?#xff0c;此方法將返回一個(gè) Object 對(duì)象,即為 RPC 的調(diào)用結(jié)果(在未獲取結(jié)果之前,發(fā)送的線程將一直處于阻塞狀態(tài))。
  • 服務(wù)端將需要監(jiān)聽的隊(duì)列通過指定 Routing Key 綁定到交換機(jī)上,且將 @RabbitListener 標(biāo)注的方法的返回參數(shù)設(shè)置為 RPC 調(diào)用的返回結(jié)果。

RPC 模式可以實(shí)現(xiàn) RPC 調(diào)用的功能。但在未使用 Spring AMQP 的情況下,需要手動(dòng)指定 RPC 的回調(diào)隊(duì)列,且需要手動(dòng)監(jiān)聽回調(diào)隊(duì)列,并從中獲取指定 correlationId 的返回結(jié)果,Spring AMPQ 已經(jīng)自動(dòng)實(shí)現(xiàn)了這些功能。

2.3.6 消息頭模式

消息頭模式與路由模式類似,不過消息頭模式并不使用路由鍵來進(jìn)行匹配,而是使用綁定參數(shù)來進(jìn)行匹配。匹配規(guī)則見Exchange 類型之 header

3. RabbitMQ 高級(jí)特性

3.1 消息投遞時(shí)序圖

RabbitMQ 的消息投遞時(shí)序圖如下所示:

3.2 消息的可靠投遞

在使用消息隊(duì)列時(shí),由于消息鏈路上涉及到三個(gè)終端,則如何避免消息丟失或者投遞失敗是必須要考慮的問題。RabbitMQ 提供了三種機(jī)制來控制消息的投遞可靠性:

  • 生產(chǎn)者發(fā)送確認(rèn)機(jī)制:發(fā)送確認(rèn)分為兩部分:
    • 確認(rèn)生產(chǎn)者發(fā)送的消息是否到達(dá) broker(或者說是否到達(dá)交換機(jī),默認(rèn)只要到達(dá)了 broker 就一定會(huì)到達(dá)交換機(jī)),即 publishConfirm 機(jī)制
    • 確認(rèn)交換機(jī)的消息是否由路由規(guī)則到達(dá)了隊(duì)列,即 returnCallback 機(jī)制
  • 消費(fèi)者接收確認(rèn)機(jī)制:確認(rèn)消費(fèi)者是否成功消費(fèi)了隊(duì)列中的消息,即消息確認(rèn) ACK 機(jī)制

3.2.1 publishConfirm 機(jī)制

當(dāng)消息到達(dá) broker 之后,會(huì)觸發(fā) confirmCallback 回調(diào),確認(rèn)消息已經(jīng)到達(dá) broker,默認(rèn)只要到達(dá)了 broker 就一定會(huì)到達(dá)交換機(jī)
在 SpringBoot 應(yīng)用中,需要開啟配置:

spring:rabbitmq:# 高版本寫法publisherConfirmType: CORRELATED# 低版本寫法:publisher-confirms: true

然后在 RabbitTemplate 中調(diào)用 setConfirmCallback 設(shè)置具體回調(diào)邏輯。

3.2.2 returnCallback 機(jī)制

當(dāng)交換機(jī)按照路由規(guī)則投遞消息至隊(duì)列失敗時(shí),會(huì)觸發(fā) returnCallback 回調(diào),將消息退回或者直接丟棄
在 SpringBoot 應(yīng)用中,需要開啟配置:

spring:rabbitmq:publisher-returns: true

然后在 RabbitTemplate 中調(diào)用 setReturnsCallback 設(shè)置具體回調(diào)邏輯。
注意,在 AMPQ 協(xié)議中,mandatory 標(biāo)識(shí)位用于控制消息的退回邏輯,在 SpringBoot 應(yīng)用中對(duì)應(yīng):

  • 當(dāng) RabbitTemplate 中的 mandatory 屬性為 true 時(shí),將會(huì)將消息退回給生產(chǎn)者
  • 當(dāng) RabbitTemplate 中的 mandatory 屬性為 false時(shí),將直接把消息丟棄

3.2.3 消息確認(rèn) ACK 機(jī)制

3.2.3.1 消息確認(rèn) ACK 機(jī)制的基本概念

消息確認(rèn) ACK 機(jī)制,即消費(fèi)者接收到 broker 投遞的消息后,與 broker 的一個(gè)應(yīng)答機(jī)制。消費(fèi)者接收到 broker 投遞的消息后,會(huì)出現(xiàn)以下幾種情況:

  • 消費(fèi)者在接收到消息后,正常返回給 broker 一個(gè) ACK 的應(yīng)答,broker 在接收到這個(gè)應(yīng)答后,將消息從隊(duì)列中刪除
  • 消費(fèi)者自身出現(xiàn)了宕機(jī)/邏輯出錯(cuò)導(dǎo)致沒有正常返回 broker 一個(gè)正常的應(yīng)答,或者使用了 nack 或者 reject 應(yīng)答,則根據(jù) requeue 參數(shù)的不同執(zhí)行不同的邏輯(reject 應(yīng)答對(duì)應(yīng)的 requeue 參數(shù)值為 true)
    • 如果 requeue 參數(shù)值為 false,則 broker 會(huì)直接丟棄這條消息
    • 如果 requeue 參數(shù)值為 true,則會(huì)將這條消息發(fā)送給正在監(jiān)聽這個(gè)隊(duì)列的其他消費(fèi)者,如果只有當(dāng)前消費(fèi)者,則會(huì)繼續(xù)把這條消息投遞給當(dāng)前消費(fèi)者。

nack 和 reject 的主要區(qū)別在于,reject 只能拒絕單個(gè)消息,但 nack 可以拒絕小于指定 deliveryTag 的所有消息。deliveryTag 是 AMQP 中針對(duì)一個(gè)消息的序號(hào)值,同一消息每次從 broker 發(fā)出后,該序號(hào)值都會(huì) +1。

注意,如果在消費(fèi)者的消費(fèi)邏輯中沒有捕獲異常,或者在手動(dòng)確認(rèn)模式下一直沒有回復(fù) ACK ,那么消息將會(huì)不停地重新分發(fā),將極大地影響 broker 的運(yùn)行穩(wěn)定性

3.2.3.2 Spring AMQP 實(shí)現(xiàn)手動(dòng)確認(rèn) ACK 機(jī)制

Spring AMQP 中,ACK 一共有三種確認(rèn)模式

  • AcknowledgeMode.AUTO:自動(dòng)確認(rèn)
  • AcknowledgeMode.MANUAL:手動(dòng)確認(rèn)
  • AcknowledgeMode.NONE:不確認(rèn)
    一般來說,我們?cè)趯?shí)現(xiàn)消息可靠性投遞時(shí),最好將確認(rèn)模式修改為手動(dòng)確認(rèn)模式,手動(dòng)處理 ACK 。配置文件如下:
spring:rabbitmq:listener:# spring amqp 的監(jiān)聽容器有兩種,simple 和 direct,可以根據(jù)不同的場(chǎng)景選擇不同的監(jiān)聽容器simple:acknowledge-mode: manual

在完成上面的設(shè)置之后,需要在監(jiān)聽方法入?yún)⒅刑钊?Channel 對(duì)象:

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload;@RabbitListener(//在這里完善你的綁定關(guān)系)/*** <p>消息監(jiān)聽方法</p>** @param header 消息頭鍵值對(duì)* @param message 消息體* @param channel 信道*/public void onListening(@Headers Map<String, Object> header, @Payload Message message, Channel channel) {// do something here//消息處理完成后調(diào)用 ack 方法//這里的 deliveryTag 從消息中獲得channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);// 或者可以使用 nack 方法,第二個(gè)參數(shù)是是否批量處理,第三個(gè)參數(shù)即為 requeue 參數(shù)//channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);}

3.3 死信隊(duì)列

首先需要理解一下什么叫死信。正常的消息可以按照消息投遞時(shí)序圖走完整個(gè)流程,但是出現(xiàn)以下幾種情況后,正常消息就變成了死信

  • 消息被拒絕(basicReject 或 basicNack),且 requeue 值為 false
  • 超過了隊(duì)列指定的 x-message-ttl 屬性值,或者超過了消息設(shè)定的 expiration 值時(shí)
  • 隊(duì)列超過了最大長(zhǎng)度限制,且隊(duì)列溢出行為參數(shù)為 reject-publish-dlx 時(shí)

死信隊(duì)列,實(shí)際上就是一個(gè)接收死信的隊(duì)列,其與正常的交換機(jī),隊(duì)列完全一致。使用方法:

  • 在隊(duì)列屬性上添加 x-dead-letter-exchange 和 x-dead-letter-routing-key 參數(shù),指定死信出現(xiàn)時(shí),將死信轉(zhuǎn)發(fā)過去的目標(biāo)死信隊(duì)列
  • 在隊(duì)列屬性上,添加 x-overflow 參數(shù),并指定值為 reject-publish-dlx

死信隊(duì)列應(yīng)用場(chǎng)景:

  • 當(dāng)發(fā)生消費(fèi)異常時(shí),可以方便地將出錯(cuò)的消息發(fā)送至死信隊(duì)列,隨后調(diào)用業(yè)務(wù)邏輯將當(dāng)時(shí)出錯(cuò)的數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)(或者執(zhí)行其他的操作)
  • 通過這樣的方式,不用去翻看應(yīng)用日志就可以知道找出出錯(cuò)的消息,也方便重新投遞,有助于排查線上問題。

3.4 延時(shí)隊(duì)列

有時(shí)候我們會(huì)延時(shí)執(zhí)行任務(wù)的需求,即在過一段時(shí)間之后,再執(zhí)行任務(wù)
AMQP 協(xié)議,RabbitMQ 原生并沒有直接支持延時(shí)隊(duì)列的功能,目前使用 RabbitMQ 來實(shí)現(xiàn)延時(shí)隊(duì)列有兩種方式:

  • 通過**TTL 和死信隊(duì)列**結(jié)合來實(shí)現(xiàn)延時(shí)隊(duì)列
  • 通過 RabbitMQ 的**x-delay-message-exchange 插件**來實(shí)現(xiàn)延時(shí)隊(duì)列

3.4.1 TTL + 死信隊(duì)列實(shí)現(xiàn)延時(shí)隊(duì)列

通過**TTL 和死信隊(duì)列**結(jié)合來實(shí)現(xiàn)延時(shí)隊(duì)列

  • 首先聲明一個(gè)隊(duì)列,隊(duì)列上綁定死信隊(duì)列,并設(shè)置隊(duì)列的超時(shí)時(shí)間
  • 不設(shè)置任何消費(fèi)者監(jiān)聽此隊(duì)列
  • 生產(chǎn)者將消息發(fā)送到隊(duì)列上
  • 由于沒有消費(fèi)者監(jiān)聽隊(duì)列,則發(fā)送到隊(duì)列中的消息經(jīng)過超時(shí)時(shí)間之后,就會(huì)轉(zhuǎn)發(fā)到死信隊(duì)列上,由死信隊(duì)列完成相應(yīng)的延時(shí)處理業(yè)務(wù)邏輯

此種方法存在時(shí)序性的弊端,因?yàn)殛?duì)列的特殊性(FIFO),在同一個(gè)隊(duì)列消息過期時(shí)間不同的情況下,可能會(huì)出現(xiàn)先到期的后被執(zhí)行延時(shí)邏輯的可能性

3.4.2 x-delay-message-exchange 插件實(shí)現(xiàn)延時(shí)隊(duì)列

通過 RabbitMQ 的**x-delay-message-exchange 插件**來實(shí)現(xiàn)延時(shí)隊(duì)列

  • broker 集成 x-delay-message-exchange 插件,使用 Docker 進(jìn)行安裝:
docker pull feeld/x-delayed-rabbitmq
  • 聲明延時(shí)隊(duì)列交換機(jī),并指定交換機(jī)類型為 x-delayed-message,可以通過指定 x-delayed-type 來指定實(shí)際交換機(jī)的類型
  • 將隊(duì)列綁定到該延時(shí)隊(duì)列交換機(jī)上
  • 發(fā)送消息時(shí),通過設(shè)置消息頭參數(shù) x-delay 或者 MessageProperties.setDelay() 方法可以設(shè)置延時(shí)時(shí)間(單位:ms),如果兩者都設(shè)置,則以最后一次設(shè)置的時(shí)間為準(zhǔn)

4. RabbitMQ 如何解決問題

這里說的問題,即前面提到的使用消息隊(duì)列的劣勢(shì)中舉例的一些問題

4.1 使用 RabbitMQ 集群實(shí)現(xiàn)高可用

RabbitMQ 有兩種集群模式:

  • 普通集群模式
  • 鏡像集群模式

4.2 如何保證消息不被重復(fù)消費(fèi)

消息重復(fù)消費(fèi),即同一條消息被消費(fèi)者的消費(fèi)邏輯處理了多遍,即消息重復(fù)重復(fù)消費(fèi)的問題。可能出現(xiàn)的原因如下:

  • 消息路由方式選擇錯(cuò)誤:錯(cuò)誤地使用了消息分發(fā)路由模式,使本該路由給單個(gè)消費(fèi)者的消息分發(fā)給了多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都執(zhí)行了一次消息消費(fèi)的邏輯。
  • 消息確認(rèn)機(jī)制沒有正常工作:當(dāng)出現(xiàn)網(wǎng)絡(luò)波動(dòng)或其他情況,使生產(chǎn)者/消費(fèi)者與 broker 之間的消息確認(rèn)機(jī)制沒有正常工作時(shí),則可能出現(xiàn)消息重復(fù)消費(fèi)的問題。

這里我們只討論消息確認(rèn)機(jī)制沒有正常工作的處理方法:

  • 在消息中增加一個(gè) ID 字段用來標(biāo)識(shí)消息的唯一性
  • 在消費(fèi)者消費(fèi)消息后,將收到的消息的 ID 在存儲(chǔ)層中進(jìn)行查詢
    • 如果能查詢出來,則說明被處理過,即消息已經(jīng)被消費(fèi)過,執(zhí)行比較更新或者其他邏輯
    • 如果查詢不出來,則說明沒有被處理過,即消息還沒有被消費(fèi)過,則執(zhí)行正常的處理邏輯,并將這條消費(fèi)記錄存在存儲(chǔ)層中

4.3 如何確保消息不丟失

從消息投遞時(shí)序圖我們可以知道,消息的整個(gè)生命周期會(huì)經(jīng)歷生產(chǎn)者,broker,消費(fèi)者三個(gè)終端,則消息的防丟失需要從以下幾個(gè)方面來進(jìn)行考慮:

  • 消息從生產(chǎn)者到 broker 之間的防丟失
  • 消息在 broker 中的防丟失
  • 消息從 broker 到消費(fèi)者之間的防丟失

4.3.1 消息從生產(chǎn)者到 broker 之間的防丟失

生產(chǎn)者將消息發(fā)送到 broker 時(shí),因?yàn)榫W(wǎng)絡(luò)等原因,消息可能會(huì)發(fā)送失敗,即出現(xiàn)了消息丟失。一般有兩種方式來解決:

  • RabbitMQ 提供的事務(wù)機(jī)制,在 Spring AMQP 中可以通過設(shè)置
rabbitTemplate.setChannelTransacted(true);

來啟用事務(wù)。當(dāng)消息沒有正常到達(dá) broker 時(shí),會(huì)拋出異常,生產(chǎn)者可以捕獲異常來感知到消息并沒有正常到達(dá) broker ,從而執(zhí)行重發(fā)等邏輯。但是缺點(diǎn)是開啟事務(wù)后,每個(gè) channel 會(huì)采用同步阻塞的方式來等待事務(wù)的完成,導(dǎo)致** broker 的吞吐量和性能會(huì)大幅下降**。

  • publishConfirm 機(jī)制:
    發(fā)布確認(rèn)模式:
一旦使用了 publishConfirm 模式,所有發(fā)布的消息都會(huì)被指派一個(gè)唯一的 ID,一旦消息到達(dá)了 broker 之后,broker 就會(huì)發(fā)送一個(gè) ack 消息給生產(chǎn)者,標(biāo)識(shí)這個(gè)消息已經(jīng)正常到達(dá)了 broker;如果消息是持久化的,那么確認(rèn)消息會(huì)在消息寫入磁盤之后發(fā)出。

Spring AMQP 的相關(guān)配置見 publishConfirm 機(jī)制。相比于事務(wù)模式,發(fā)布確認(rèn)模式的性能比較優(yōu)秀。

4.3.2 消息在 broker 中的防丟失

消息、交換機(jī)、隊(duì)列的都設(shè)置為持久化,則可以避免消息在 broker 中的丟失。

  • 消息持久化:生產(chǎn)者發(fā)送消息時(shí)設(shè)置 deliveryMode 屬性為 2 ,即可將消息設(shè)置為持久化,在 Spring AMQP 中設(shè)置:
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

進(jìn)行消息持久化的設(shè)置

  • 交換機(jī)/隊(duì)列持久化:在聲明交換機(jī)/隊(duì)列時(shí),設(shè)置 durable 為 true 即可。

4.3.3 消息從 broker 到消費(fèi)者之間的防丟失

消息從 broker 到消費(fèi)者之間的防丟失,需要考慮以下兩個(gè)方面:

  • 消息沒有路由到隊(duì)列:由于消息不能匹配任何的隊(duì)列路由邏輯,在生產(chǎn)者設(shè)置了 mandatory 為 true 且開啟了 returnCallback 配置時(shí),broker 將會(huì)把消息退回給生產(chǎn)者
  • 消息沒有從隊(duì)列到達(dá)消費(fèi)者
    • 在開啟了隊(duì)列持久化和消息持久化后,如果消息和隊(duì)列沒設(shè)置過期時(shí)間,即使 broker 宕機(jī),消息也不會(huì)丟失,消息最終還是會(huì)發(fā)送到消費(fèi)者手中
    • 在開啟了隊(duì)列持久化和消息持久化后,如果消息和隊(duì)列存在過期時(shí)間,則在隊(duì)列上配置死信隊(duì)列,將過期的消息發(fā)送到死信隊(duì)列中進(jìn)行處理
  • 消息到達(dá)了消費(fèi)者,但是還沒有被消費(fèi)邏輯處理
    • 將消息 ACK 改為手動(dòng)確認(rèn)模式,在消息完成處理邏輯后,手動(dòng)發(fā)送 ACK 確認(rèn)消費(fèi)者收到消息
    • 如果消息處理邏輯中出現(xiàn)異常,則將消息發(fā)往死信隊(duì)列,或者設(shè)置 requeue 屬性為 true,將消息發(fā)往其他隊(duì)列

4.4 如何保證消息傳遞的順序性

當(dāng)消息有順序狀態(tài)時(shí),由于存在多個(gè)消費(fèi)者,則可能會(huì)出現(xiàn)消息最終處理順序錯(cuò)亂的問題。

舉例說明:進(jìn)入隊(duì)列的消息是 1,2,3,如果有多個(gè)消費(fèi)者監(jiān)聽了這個(gè)隊(duì)列,那么由于網(wǎng)絡(luò)或者自身處理速度等因素影響之后,可能會(huì)出現(xiàn)消費(fèi)者的處理順序?yàn)?3,2,1 的情況

解決辦法:

  • 當(dāng)消息需要強(qiáng)一致順序時(shí),將具有順序性的多條消息打包成一條消息發(fā)往 broker
  • 當(dāng)消息不需要強(qiáng)一致順序時(shí),可以在生產(chǎn)端給每一個(gè)消息增加一個(gè)自增 ID,或者一個(gè)消息產(chǎn)生的時(shí)間戳,用作消息順序標(biāo)記字段,在消息消費(fèi)的處理邏輯上進(jìn)行消息順序標(biāo)記字段的判斷
    • 如果接收到的消息的順序字段比當(dāng)前處理過的消息當(dāng)前順序要大,則正常消費(fèi)
    • 如果接收到的消息的順序字段比當(dāng)前處理過的消息當(dāng)前順序要小,則直接丟棄或者做一些其他邏輯處理

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ 基本概念与高级特性的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。