万字长文:从 C# 入门学会 RabbitMQ 消息队列编程
RabbitMQ 教程
目錄-
RabbitMQ 教程
- RabbitMQ 簡(jiǎn)介
-
安裝與配置
- 安裝 RabbitMQ
-
發(fā)布與訂閱模型
- 生產(chǎn)者、消費(fèi)者、交換器、隊(duì)列
- 多工作隊(duì)列
-
交換器類型
- Direct
- Fanout
- Topic
- 交換器綁定交換器
-
消費(fèi)者、消息屬性
- Qos 、拒絕接收
- 消息確認(rèn)模式
- 消息持久化
- 消息 TTL 時(shí)間
- 隊(duì)列 TTL 時(shí)間
- DLX 死信交換器
- 延遲隊(duì)列
- 消息優(yōu)先級(jí)
- 事務(wù)機(jī)制
- 發(fā)送方確認(rèn)機(jī)制
本文已推送到 github :https://github.com/whuanle/learnrabbitmq
如果文章排版不方便閱讀,可以到倉(cāng)庫(kù)下載原版 markdown 文件閱讀。
RabbitMQ 簡(jiǎn)介
RabbitMQ 是一個(gè)實(shí)現(xiàn)了 AMQP 協(xié)議的消息隊(duì)列,AMQP 被定義為作為消息傳遞中間件的開放標(biāo)準(zhǔn)的應(yīng)用層協(xié)議。它代表高級(jí)消息隊(duì)列協(xié)議,具有消息定位、路由、隊(duì)列、安全性和可靠性等特點(diǎn)。
目前社區(qū)上比較流行的消息隊(duì)列有 kafka、ActiveMQ、Pulsar、RabbitMQ、RabbitMQ 等。
筆者也編寫了 一系列的 Kafka 教程,歡迎閱讀:https://kafka.whuanle.cn/
RabbitMQ 的優(yōu)點(diǎn)、用途等,大概是可靠性高、靈活的路由規(guī)則配置、支持分布式部署、遵守 AMQP 協(xié)議等??梢杂糜诋惒酵ㄓ?、日志收集(日志收集還是 Kafka 比較好)、事件驅(qū)動(dòng)架構(gòu)系統(tǒng)、應(yīng)用通訊解耦等。
RabbitMQ 社區(qū)版本的特點(diǎn)如下:
-
支持多種消息傳遞協(xié)議、消息隊(duì)列、傳遞確認(rèn)、靈活的隊(duì)列路由、多種交換類型(交換器)。
-
支持 Kubernetes 等分布式部署,提供多種語言的 SDK,如 Java、Go、C#。
-
可插入的身份驗(yàn)證、授權(quán),支持 TLS 和 LDAP。
-
支持持續(xù)集成、操作度量和與其他企業(yè)系統(tǒng)集成的各種工具和插件。
-
提供一套用于管理和監(jiān)視 RabbitMQ 的 HTTP-API、命令行工具和 UI。
RabbitMQ 的基本對(duì)象有以下幾點(diǎn),但是讀者現(xiàn)在并不需要記住,在后面的章節(jié)中,筆者將會(huì)逐個(gè)介紹。
- 生產(chǎn)者(Producer):推送消息到 RabbitMQ 的程序。
- 消費(fèi)者(Consumer):從 RabbitMQ 消費(fèi)消息的程序。
- 隊(duì)列(Queue):RabbitMQ 存儲(chǔ)消息的地方,消費(fèi)者可以從隊(duì)列中獲取消息。
- 交換器(Exchange):接收來自生產(chǎn)者的消息,并將消息路由到一個(gè)或多個(gè)隊(duì)列中。
- 綁定(Binding):將隊(duì)列和交換器關(guān)聯(lián)起來,當(dāng)生產(chǎn)者推送消息時(shí),交換器將消息路由到隊(duì)列中。
- 路由鍵(Routing Key):用于交換器將消息路由到特定隊(duì)列的匹配規(guī)則。
RabbitMQ 的技術(shù)知識(shí)點(diǎn)大概分為:
- 用戶和權(quán)限:配置用戶、角色和其對(duì)應(yīng)的權(quán)限。
- Virtual Hosts:配置虛擬主機(jī),用于分隔不同的消息隊(duì)列環(huán)境。
- Exchange 和 Queue 的屬性:配置交換器和隊(duì)列的屬性,比如持久化、自動(dòng)刪除等。
- Policies:定義策略來自動(dòng)設(shè)置隊(duì)列、交換器和鏈接的參數(shù)。
- 連接和通道:配置連接和通道的屬性,如心跳間隔、最大幀大小等。
- 插件:?jiǎn)⒂煤团渲酶鞣N插件,如管理插件、STOMP 插件等。
- 集群和高可用性:配置集群和鏡像隊(duì)列,以提供高可用性。
- 日志和監(jiān)控:配置日志級(jí)別、目標(biāo)和監(jiān)控插件。
- 安全性:配置 SSL/TLS 選項(xiàng)、認(rèn)證后端等安全相關(guān)的設(shè)置。
由于筆者技術(shù)有限以及篇幅限制,本文只講解與 C# 編程相關(guān)的技術(shù)細(xì)節(jié),從中了解 RabbitMQ 的編碼技巧和運(yùn)作機(jī)制。
安裝與配置
安裝 RabbitMQ
讀者可以在 RabbitMQ 官方文檔中找到完整的安裝教程:https://www.rabbitmq.com/download.html
本文使用 Docker 的方式部署。
RabbitMQ 社區(qū)鏡像列表:https://hub.docker.com/_/rabbitmq
創(chuàng)建目錄用于映射存儲(chǔ)卷:
mkdir -p /opt/lib/rabbitmq
部署容器:
docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 \
-v /opt/lib/rabbitmq:/var/lib/rabbitmq \
rabbitmq:3.12.8-management
部署時(shí)占用兩個(gè)端口。5672 是 MQ 通訊端口,15672 是 Management UI 工具端口。
打開 15672 端口,會(huì)進(jìn)入 Web 登錄頁面,默認(rèn)賬號(hào)密碼都是 guest。
關(guān)于 RabbitMQ Management UI 的使用方法,后續(xù)再介紹。
打開管理界面后會(huì),在 Exchanges 菜單中,可以看到如下圖表格。這些是默認(rèn)的交換器?,F(xiàn)在可以不需要了解這些東西,后面會(huì)有介紹。
| Virtual host | Name | Type | Features |
|---|---|---|---|
| / | (AMQP default) | direct | D |
| / | amq.direct | direct | D |
| / | amq.fanout | fanout | D |
| / | amq.headers | headers | D |
| / | amq.match | headers | D |
| / | amq.rabbitmq.trace | topic | D I |
| / | amq.topic | topic | D |
發(fā)布與訂閱模型
使用 C# 開發(fā) RabbitMQ,需要使用 nuget 引入 RabbitMQ.Client,官網(wǎng)文檔地址:.NET/C# RabbitMQ Client Library — RabbitMQ
在繼續(xù)閱讀文章之前,請(qǐng)先創(chuàng)建一個(gè)控制臺(tái)程序。
生產(chǎn)者、消費(fèi)者、交換器、隊(duì)列
為了便于理解,本文制作了幾十張圖片,約定一些圖形表示的含義:
對(duì)應(yīng)生產(chǎn)者,使用如下圖表示:
對(duì)于消費(fèi)者,使用如下圖表示:
對(duì)于消息隊(duì)列,使用如下圖表示:
對(duì)于交換器,使用如下圖表示:
在 RabbitMQ 中,生產(chǎn)者發(fā)布的消息是不會(huì)直接進(jìn)入到隊(duì)列中,而是經(jīng)過交換器(Exchange) 分發(fā)到各個(gè)隊(duì)列中。前面提到,部署 RabbitMQ 后,默認(rèn)有 七個(gè)交換器,如 (AMQP default)、amq.direct 等。
當(dāng)然,對(duì)于現(xiàn)在來說,我們不需要了解交換器,所以,在本節(jié)的教程中,會(huì)使用默認(rèn)交換器完成實(shí)驗(yàn)。
在忽略交換器存在的情況下,我們可以將生產(chǎn)和消費(fèi)的流程簡(jiǎn)化如下圖所示:
請(qǐng)一定要注意,圖中省略了交換器的存在,因?yàn)槭褂玫氖悄J(rèn)的交換器。但是生產(chǎn)者推送消息必須是推送到交換器,而不是隊(duì)列,這一句一定要弄清楚。
對(duì)于消費(fèi)者來說,要使用隊(duì)列,必須確保隊(duì)列已經(jīng)存在。
使用 C# 聲明(創(chuàng)建)一個(gè)隊(duì)列的代碼和參數(shù)如下所示:
// 聲明一個(gè)隊(duì)列
channel.QueueDeclare(
// 隊(duì)列名稱
queue: "myqueue",
// 持久化配置,隊(duì)列是否能夠在 broker 重啟后存活
durable: false,
// 連接關(guān)閉時(shí)被刪除該隊(duì)列
exclusive: false,
// 當(dāng)最后一個(gè)消費(fèi)者(如果有的話)退訂時(shí),是否應(yīng)該自動(dòng)刪除這個(gè)隊(duì)列
autoDelete: false,
// 額外的參數(shù)配置
arguments: null
);
完整代碼示例:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
// 連接
using IConnection connection = factory.CreateConnection();
// 通道
using IModel channel = connection.CreateModel();
channel.QueueDeclare(
// 隊(duì)列名稱
queue: "myqueue",
// 持久化配置,隊(duì)列是否能夠在 broker 重啟后存活
durable: false,
// 連接關(guān)閉時(shí)被刪除該隊(duì)列
exclusive: false,
// 當(dāng)最后一個(gè)消費(fèi)者(如果有的話)退訂時(shí),是否應(yīng)該自動(dòng)刪除這個(gè)隊(duì)列
autoDelete: false,
// 額外的參數(shù)配置
arguments: null
);
-
queue:隊(duì)列的名稱。 -
durable:設(shè)置是否持久化。持久化的隊(duì)列會(huì)存盤,在服務(wù)器重啟的時(shí)候可以保證不丟失相關(guān)信息。 -
exclusive設(shè)置是否排他。如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次聲明它的連接可見,并在連接斷開時(shí)自動(dòng)刪除。 -
該配置是基于 IConnection 的,同一個(gè) IConnection 創(chuàng)建的不同通道 (IModel) ,也會(huì)遵守此規(guī)則。
-
autoDelete:設(shè)置是否自動(dòng)刪除。自動(dòng)刪除的前提是至少有一個(gè)消費(fèi)者連接到這個(gè)隊(duì)列,之后所有與這個(gè)隊(duì)列連接的消費(fèi)者都斷開時(shí),才會(huì)自動(dòng)刪除。 -
argurnents: 設(shè)置隊(duì)列的其他一些參數(shù),如隊(duì)列的消息過期時(shí)間等。
如果隊(duì)列已經(jīng)存在,不需要再執(zhí)行 QueueDeclare()。重復(fù)調(diào)用 QueueDeclare(),如果參數(shù)相同,不會(huì)出現(xiàn)副作用,已經(jīng)推送的消息也不會(huì)出問題。
但是,如果 QueueDeclare() 參數(shù)如果跟已存在的隊(duì)列配置有差異,則可能會(huì)報(bào)錯(cuò)。
一般情況下,為了合理架構(gòu)和可靠性,會(huì)由架構(gòu)師等在消息隊(duì)列中提前創(chuàng)建好交換器、隊(duì)列,然后客戶端直接使用即可。一般不讓程序啟動(dòng)時(shí)設(shè)置,這樣會(huì)帶來很大的不確定性和副作用。
生產(chǎn)者發(fā)送消息時(shí)的代碼也很簡(jiǎn)單,指定要發(fā)送到哪個(gè)交換器或路由中即可。
請(qǐng)一定要注意,RabbitMQ 生產(chǎn)者發(fā)送消息,推送到的是交換器,而不是直接推送到隊(duì)列!
channel.BasicPublish(
// 使用默認(rèn)交換器
exchange: string.Empty,
// 推送到哪個(gè)隊(duì)列中
routingKey: "myqueue",
// 隊(duì)列屬性
basicProperties: null,
// 要發(fā)送的消息需要先轉(zhuǎn)換為 byte[]
body: Encoding.UTF8.GetBytes("測(cè)試")
);
BasicPublish 有三個(gè)重載:
BasicPublish(
PublicationAddress addr,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
BasicPublish(string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
BasicPublish(string exchange,
string routingKey,
bool mandatory = false,
IBasicProperties basicProperties = null,
ReadOnlyMemory<byte> body = default)
-
exchange: 交換器的名稱,如果留空則會(huì)推送到默認(rèn)交換器。 -
routingKey: 路由鍵,交換器根據(jù)路由鍵將消息存儲(chǔ)到相應(yīng)的隊(duì)列之中。 -
basicProperties:消息屬性,如過期時(shí)間等。 -
mandatory:值為 false 時(shí),如果交換器沒有綁定合適的隊(duì)列,則該消息會(huì)丟失。值為 true 時(shí),如果交換器沒有綁定合適的隊(duì)列,則會(huì)觸發(fā)IModel.BasicReturn事件。
IBasicProperties basicProperties 參數(shù)是接口,我們可以使用 IModel.CreateBasicProperties() 創(chuàng)建一個(gè)接口對(duì)象。
IBasicProperties 接口中封裝了很多屬性,使得我們不需要使用字符串的顯示傳遞配置。
IBasicProperties 其完整屬性如下:
// 標(biāo)識(shí)應(yīng)用程序的 ID
public String AppId { set; get; }
// 標(biāo)識(shí)集群的 ID
public String ClusterId { set; get; }
// 指定消息內(nèi)容的編碼方式,例如 "utf-8"
public String ContentEncoding { set; get; }
// 指定消息內(nèi)容的 MIME 類型,例如 "application/json"
public String ContentType { set; get; }
// 用于關(guān)聯(lián)消息之間的關(guān)系,通常用于 RPC(遠(yuǎn)程過程調(diào)用)場(chǎng)景
public String CorrelationId { set; get; }
// 指定消息的持久化方式,值 1:不持久化,值 2:持久化
public Byte DeliveryMode { set; get; }
// 單位毫秒,指定該消息的過期時(shí)間
public String Expiration { set; get; }
// 自定義消息的頭部信息
public IDictionary`2 Headers { set; get; }
// 指定消息的唯一標(biāo)識(shí)符
public String MessageId { set; get; }
// 是否持久化
public Boolean Persistent { set; get; }
// 指定消息的優(yōu)先級(jí),范圍從 0 到 9
public Byte Priority { set; get; }
// 指定用于回復(fù)消息的隊(duì)列名稱
public String ReplyTo { set; get; }
// 指定用于回復(fù)消息的地址信息
public PublicationAddress ReplyToAddress { set; get; }
// 指定消息的時(shí)間戳
public AmqpTimestamp Timestamp { set; get; }
// 消息的類型
public String Type { set; get; }
// 標(biāo)識(shí)用戶的 ID
public String UserId { set; get; }
推送消息時(shí),可以對(duì)單個(gè)消息細(xì)粒度地設(shè)置 IBasicProperties :
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
var properties = channel.CreateBasicProperties();
// 示例 1:
properties.Persistent = true;
properties.ContentType = "application/json";
properties.ContentEncoding = "UTF-8";
// 示例 2:
//properties.Persistent = true;
//properties.ContentEncoding = "gzip";
//properties.Headers = new Dictionary<string, object>();
channel.BasicPublish(
exchange: string.Empty,
routingKey: "q1",
basicProperties: properties,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
對(duì)于 IBasicProperties 的使用,文章后面會(huì)有更加詳細(xì)的介紹。
現(xiàn)在,我們推送了 10 條消息到隊(duì)列中,然后在 Management UI 中觀察。
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: string.Empty,
routingKey: "myqueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
我們可以在 UI 的 Queues and Streams 中看到當(dāng)前所有的隊(duì)列。
可以看到當(dāng)前隊(duì)列中的
Ready狀態(tài)Unacked狀態(tài)的消息數(shù),分別對(duì)應(yīng)上文中的等待投遞給消費(fèi)者的消息數(shù)和己經(jīng)投遞給消費(fèi)者但是未收到確認(rèn)信號(hào)的消息數(shù)
點(diǎn)擊該隊(duì)列后,會(huì)打開如下圖所示的界面。
首先看 Overview。
Ready 指還沒有被消費(fèi)的消息數(shù)量。
Unacked 指消費(fèi)但是沒有 ack 的消息數(shù)量。
另一個(gè) Message rates 圖表,指的是發(fā)布、消費(fèi)消息的速度,因?yàn)椴恢匾?,因此這里不說明。
在 Bindings 中,可以看到該隊(duì)列綁定了默認(rèn)的交換器。
然后編寫一個(gè)消費(fèi)者,消費(fèi)該隊(duì)列中的消息,其完整代碼如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.QueueDeclare(
// 隊(duì)列名稱
queue: "myqueue",
// 持久化配置,隊(duì)列是否能夠在 broker 重啟后存活
durable: false,
// 連接關(guān)閉時(shí)被刪除該隊(duì)列
exclusive: false,
// 當(dāng)最后一個(gè)消費(fèi)者(如果有的話)退訂時(shí),是否應(yīng)該自動(dòng)刪除這個(gè)隊(duì)列
autoDelete: false,
// 額外的參數(shù)配置
arguments: null
);
// 定義消費(fèi)者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
};
// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue",
autoAck: true,
consumer: consumer);
Console.ReadLine();
注意,如果填寫了一個(gè)不存在的隊(duì)列,那么程序會(huì)報(bào)異常。
在消費(fèi)者程序未退出前,即 IConnection 未被 Dispose() 之前,可以在 Consumers 中看到消費(fèi)者客戶端程序信息。
那么,如果我們只消費(fèi),不設(shè)置自動(dòng) ack 呢?
將消費(fèi)者代碼改成:
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
完整代碼如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.QueueDeclare(
queue: "myqueue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: string.Empty,
routingKey: "myqueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
// 定義消費(fèi)者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
};
// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
Console.ReadLine();
此時(shí)會(huì)發(fā)現(xiàn),所有的消息都已經(jīng)讀了,但是 Unacked 為 10。
如下圖所示,autoAck: false 之后,如果重新啟動(dòng)程序(只消費(fèi),不推送消息),那么程序會(huì)繼續(xù)重新消費(fèi)一遍。
對(duì)于未 ack 的消息,消費(fèi)者重新連接后,RabbitMQ 會(huì)再次推送。
與 Kafka 不同的是,Kafka 如果沒有 ack 當(dāng)前消息,則服務(wù)器會(huì)自動(dòng)重新發(fā)送該條消息給消費(fèi)者,如果該條消息未完成,則會(huì)一直堵塞在這里。而對(duì)于 RabbitMQ,未被 ack 的消息會(huì)被暫時(shí)忽略,自動(dòng)消費(fèi)下一條。所以基于這一點(diǎn),默認(rèn)情況下,RabbitMQ 是不能保證消息順序性。
當(dāng)然, RabbitMQ 是很靈活的,我們可以選擇性地消費(fèi)部分消息,避免當(dāng)前消息阻塞導(dǎo)致程序不能往下消費(fèi):
// 定義消費(fèi)者
int i = 0;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
i++;
// 確認(rèn)該消息被正確消費(fèi)
if (i % 2 == 0)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue",
autoAck: false,
consumer: consumer);
在某些場(chǎng)景下,這個(gè)特性很有用,我們可以將多次執(zhí)行失敗的消息先放一放,轉(zhuǎn)而消費(fèi)下一條消息,從而避免消息堆積。
多工作隊(duì)列
如果同一個(gè)隊(duì)列的不同客戶端綁定到交換器中,多個(gè)消費(fèi)者一起工作的話,那么會(huì)發(fā)生什么情況?
對(duì)于第一種情況,RabbitMQ 會(huì)將消息平均分發(fā)給每個(gè)客戶端。
該條件成立的基礎(chǔ)是,兩個(gè)消費(fèi)者是不同的消費(fèi)者,如果在同一個(gè)程序里面參加不同的實(shí)例去消費(fèi),但是因?yàn)槠浔蛔R(shí)別為同一個(gè)消費(fèi)者,則規(guī)則無效。
但是,RabbitMQ 并不會(huì)看未確認(rèn)的消息數(shù)量,它只是盲目地將第 n 個(gè)消息發(fā)送給第 n 個(gè)消費(fèi)者。
另外在指定交換器名稱的情況下,我們可以將 routingKey 設(shè)置為空,這樣發(fā)布的消息會(huì)由交換器轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中。
channel.BasicPublish(
exchange: "logs",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
而多隊(duì)列對(duì)應(yīng)一個(gè)交換器的情況比較復(fù)雜,后面的章節(jié)會(huì)提到。
生產(chǎn)者和消費(fèi)者都能夠使用 QueueDeclare() 來聲明一個(gè)隊(duì)列。所謂的聲明,實(shí)際上是對(duì) RabbitMQ Broker 請(qǐng)求創(chuàng)建一個(gè)隊(duì)列,因此誰來創(chuàng)建都是一樣的。
跟聲明隊(duì)列相關(guān)的,還有兩個(gè)函數(shù):
// 無論創(chuàng)建失敗與否,都不理會(huì)
channel.QueueDeclareNoWait();
// 判斷隊(duì)列是否存在,如果不存在則彈出異常,存在則什么也不會(huì)發(fā)生
channel.QueueDeclarePassive();
此外,我們還可以刪除隊(duì)列:
// ifUnused: 隊(duì)列沒有被使用時(shí)
// ifEmpty: 隊(duì)列中沒有堆積的消息時(shí)
channel.QueueDelete(queue: "aaa", ifUnused: true, ifEmpty: true);
交換器類型
生產(chǎn)者只能向交換器推送消息,而不能向隊(duì)列推送消息。
推送消息時(shí),可以指定交換器名稱和路由鍵。
如下面代碼所示:
channel.BasicPublish(
exchange: string.Empty,
routingKey: "myqueue",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
ExchangeType 中定義了幾種交換器類型的名稱。
public static class ExchangeType
{
public const string Direct = "direct";
public const string Fanout = "fanout";
public const string Headers = "headers";
public const string Topic = "topic";
private static readonly string[] s_all = {Fanout, Direct, Topic, Headers};
}
在使用一個(gè)交換器之前,需要先聲明一個(gè)交換器:
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
如果交換器已存在,重復(fù)執(zhí)行聲明代碼,只要配置跟現(xiàn)存的交換器配置區(qū)配,則 RabbitMQ 啥也不干,不會(huì)出現(xiàn)副作用。
但是,不能出現(xiàn)不一樣的配置,例如已存在的交換器是 Fanout 類型,但是重新執(zhí)行代碼聲明隊(duì)列為 Direct 類型。
ExchangeDeclare 函數(shù)的定義如下:
ExchangeDeclare(string exchange,
string type,
bool durable = false,
bool autoDelete = false,
IDictionary<string, object> arguments = null)
-
exchange: 交換器的名稱。 -
type交換器的類型,如 fanout、direct、topic。 -
durable: 設(shè)置是否持久 durab ,如果值為 true,則服務(wù)器重啟后也不會(huì)丟失。 -
autoDelete:設(shè)置是否自動(dòng)刪除。 -
argument:其他一些結(jié)構(gòu)化參數(shù)。
當(dāng)然,交換器也可以被刪除。
// ifUnused 只有在隊(duì)列未被使用的情況下,才會(huì)刪除
channel.ExchangeDelete(exchange: "log", ifUnused: true);
還有一個(gè) NotWait 方法。
channel.ExchangeDeclareNoWait("logs", ExchangeType.Direct);
//channel.ExchangeDeclareNoWait(...);
即使重新聲明交換器和刪除時(shí)有問題,由于其返回 void,因此操作失敗也不會(huì)報(bào)異常。
也有個(gè)判斷交換器是否存在的方法。如果交換器不存在,則會(huì)拋出異常,如果交換器存在,則什么也不會(huì)發(fā)生。
channel.ExchangeDeclarePassive("logs")
創(chuàng)建多個(gè)隊(duì)列后,還需要將隊(duì)列和交換器綁定起來。
如下代碼所示,其交換器綁定了兩個(gè)隊(duì)列,生產(chǎn)者推送消息到交換器時(shí),兩個(gè)隊(duì)列都會(huì)收到相同的消息。
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// 創(chuàng)建交換器
channel.ExchangeDeclare("logs", ExchangeType.Fanout);
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(
queue: "myqueue1",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
channel.QueueDeclare(
queue: "myqueue2",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
channel.QueueBind(queue: "myqueue1", exchange: "logs", routingKey: string.Empty);
channel.QueueBind(queue: "myqueue2", exchange: "logs", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "logs",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
推送消息后,每個(gè)綁定了 logs 交換器的隊(duì)列都會(huì)收到相同的消息。
注意,由于交換器不會(huì)存儲(chǔ)消息,因此,再創(chuàng)建一個(gè) myqueue3 的消息隊(duì)列綁定 logs 交換器時(shí),myqueue3 只會(huì)接收到綁定之后推送的消息,不能得到更早之前的消息。
交換器有以下類型:
- direct:根據(jù) routingKey 將消息傳遞到隊(duì)列。
- topic:有點(diǎn)復(fù)雜。根據(jù)消息路由鍵與用于將隊(duì)列綁定到交換器的模式之間的匹配將消息路由到一個(gè)或多個(gè)隊(duì)列。
- headers:本文不講,所以不做解釋。
- fanout:只要綁定即可,不需要理會(huì)路由。
Direct
direct 是根據(jù) routingKey 將消息推送到不同的隊(duì)列中。
首先,創(chuàng)建多個(gè)隊(duì)列。
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");
然后將隊(duì)列綁定交換器時(shí),綁定關(guān)系需要設(shè)置 routingKey。
// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");
最后,推送消息時(shí),需要指定交換器名稱,以及 routingKey。
// 發(fā)送消息時(shí),需要指定 routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
當(dāng)消息推送到 logs 交換器時(shí),交換器會(huì)根據(jù) routingKey 將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中。
完整的代碼示例如下:
// 創(chuàng)建交換器
channel.ExchangeDeclare("logs", ExchangeType.Direct);
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");
// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");
// 發(fā)送消息時(shí),需要指定 routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
啟動(dòng)后,發(fā)現(xiàn)只有 direct1 隊(duì)列可以收到消息,因?yàn)檫@是根據(jù)綁定時(shí)使用的 routingKey=debug 決定的。
Fanout
只要隊(duì)列綁定了交換器,則每個(gè)交換器都會(huì)收到一樣的消息,F(xiàn)anout 會(huì)忽略 routingKey。
如下代碼所示:
// 創(chuàng)建交換器
channel.ExchangeDeclare("logs1", ExchangeType.Fanout);
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "fanout1");
channel.QueueDeclare(queue: "fanout2");
// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs1", queue: "fanout1", routingKey: "debug");
channel.QueueBind(exchange: "logs1", queue: "fanout2", routingKey: "info");
// 發(fā)送消息時(shí),需要指定 routingKey
channel.BasicPublish(
exchange: "logs1",
routingKey: "debug",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
Topic
Topic 會(huì)根據(jù) routingKey 查找符合條件的隊(duì)列,隊(duì)列可以使用 .、#、* 三種符號(hào)進(jìn)行區(qū)配,Topic 的區(qū)配規(guī)則比較靈活,
在創(chuàng)建隊(duì)列之后,綁定交換器時(shí),routingKey 使用表達(dá)式。
// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");
推送消息時(shí),routingKey 需要設(shè)置完整的名稱。
// 發(fā)送消息
channel.BasicPublish(
exchange: "logs3",
routingKey: "red.green",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
首先,routingKey 會(huì)根據(jù) . 符號(hào)進(jìn)行劃分。
比如 red.yellow.green 會(huì)被拆成 [red,yellow,green] 三個(gè)部分。
如果想模糊區(qū)配一個(gè)部分,則可以使用 *。比如 red.*.green ,可以區(qū)配到 red.aaa.green、red.666.green。
* 可以在任何一部分使用,比如 *.yellow.*、*.*.green。
# 可以區(qū)配多個(gè)部分,比如 red.# 可以區(qū)配到 red.a、red.a.a、red.a.a.a。
完整的代碼示例如下:
// 創(chuàng)建交換器
channel.ExchangeDeclare("logs3", ExchangeType.Topic);
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "topic1");
channel.QueueDeclare(queue: "topic2");
// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");
// 發(fā)送消息
channel.BasicPublish(
exchange: "logs3",
routingKey: "red.green",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
channel.BasicPublish(
exchange: "logs3",
routingKey: "red.yellow.green",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
上面推送了兩條消息到 logs 交換器中,其中 routingKey=red.green 的消息,被 red.# 區(qū)配到,因此會(huì)被轉(zhuǎn)發(fā)到 topic1 隊(duì)列中。
而 routingKey=red.yellow.green 的消息,可以被兩個(gè)隊(duì)列區(qū)配,因此 topic1 和 topic 2 都可以接收到。
交換器綁定交換器
交換器除了可以綁定隊(duì)列,也可以綁定交換器。
示例:
將 b2 綁定到 b1 中,b2 可以得到 b1 的消息。
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);
綁定之后,推送到 b1 交換器的消息,會(huì)被轉(zhuǎn)發(fā)到 b2 交換器。
完整示例代碼如下:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Fanout);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);
// 因?yàn)閮烧叨际?ExchangeType.Fanout,
// 所以 routingKey 使用 string.Empty
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);
// 創(chuàng)建隊(duì)列
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "b1",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
當(dāng)然,可以將交換器、隊(duì)列同時(shí)綁定到 b1 交換器中。
另外,兩個(gè)交換器的類型可以不同。不過這樣會(huì)導(dǎo)致區(qū)配規(guī)則有點(diǎn)復(fù)雜。
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);
我們可以理解成在交換器綁定時(shí),b2 相對(duì)于一個(gè)隊(duì)列。當(dāng) b1 設(shè)置成 Direct 交換器時(shí),綁定交換器時(shí)還需要指定 routingKey。
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: "demo");
而 b2 交換器和 q2 隊(duì)列,依然是 Fanout 關(guān)系,不受影響。
意思是說,b1、b2 是一個(gè)關(guān)系,它們的映射關(guān)系不會(huì)影響到別人,也不會(huì)影響到下一層。
完整代碼示例如下:
using RabbitMQ.Client;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);
// 因?yàn)閮烧叨际?ExchangeType.Fanout,
// 所以 routingKey 使用 string.Empty
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: "demo");
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "b1",
routingKey: "demo",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
消費(fèi)者、消息屬性
消費(fèi)者 BasicConsume 函數(shù)定義如下:
BasicConsume(string queue,
bool autoAck,
string consumerTag,
IDictionary<string, object> arguments,
IBasicConsumer consumer)
不同的消費(fèi)訂閱采用不同消費(fèi)者標(biāo)簽 (consumerTag) 來區(qū)分彼 ,在同一個(gè)通道(IModel)中的消費(fèi)者 需要通過消費(fèi)者標(biāo)簽作區(qū)分,默認(rèn)情況下不需要設(shè)置。
-
queue:隊(duì)列的名稱。 -
autoAck:設(shè)置是否自動(dòng)確認(rèn)。 -
consumerTag: 消費(fèi)者標(biāo)簽,用來區(qū)分多個(gè)消費(fèi)者。 -
arguments:設(shè)置消費(fèi)者的其他參數(shù)。
前面,我們使用了 EventingBasicConsumer 創(chuàng)建 IBasicConsumer 接口的消費(fèi)者程序,其中,EventingBasicConsumer 包含了以下事件:
public event EventHandler<BasicDeliverEventArgs> Received;
public event EventHandler<ConsumerEventArgs> Registered;
public event EventHandler<ShutdownEventArgs> Shutdown;
public event EventHandler<ConsumerEventArgs> Unregistered;
這些事件會(huì)在消息處理的不同階段被觸發(fā)。
消費(fèi)者程序有推、拉兩種消費(fèi)模式,前面所提到的代碼都是推模式,即出現(xiàn)新的消息時(shí),RabbitMQ 會(huì)自動(dòng)推送到消費(fèi)者程序中。
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue5",
autoAck: false,
consumer: consumer,
consumerTag: "demo");
如果使用拉模式(BasicGet() 函數(shù)),那么在 RabbitMQ Broker 的隊(duì)列中沒有消息時(shí),會(huì)返回 null。
// 開始消費(fèi)
while (true)
{
var result = channel.BasicGet(queue: "q1", autoAck: false);
// 如果沒有拉到消息時(shí)
if (result == null)
{
// 沒有消息時(shí),避免無限拉取
Thread.Sleep(100);
continue;
}
Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
channel.BasicAck(deliveryTag: result.DeliveryTag, multiple: false);
}
當(dāng)使用 BasicGet() 手動(dòng)拉取消息時(shí),該程序不會(huì)作為消費(fèi)者程序存在,也就是 RabbitMQ 的 Consumer 中看不到。
兩種推拉模式之下,ack 消息時(shí),均有一個(gè) multiple 參數(shù)。
- 如果將
multiple設(shè)為false,則只確認(rèn)指定deliveryTag的一條消息。 - 如果將
multiple設(shè)為true,則會(huì)確認(rèn)所有比指定deliveryTag小的并且未被確認(rèn)的消息。
消息的 deliveryTag 屬性是 ulong 類型,表示消息的偏移量,從
1....開始算起。
在大批量接收消息并進(jìn)行處理時(shí),可以使用 multiple 來確認(rèn)一組消息,而不必逐條確認(rèn),這樣可以提高效率。
Qos 、拒絕接收
消費(fèi)者程序可以設(shè)置 Qos。
channel.BasicQos(prefetchSize: 10, prefetchCount: 10, global: false);
prefetchSize:這個(gè)參數(shù)表示消費(fèi)者所能接收未確認(rèn)消息的總體大小的上限,設(shè)置為 0 則表示沒有上限。
prefetchCount: 的方法來設(shè)置消費(fèi)者客戶端最大能接收的未確認(rèn)的消息數(shù)。這個(gè)配置跟滑動(dòng)窗口數(shù)量意思差不多。
global 則有些特殊。
當(dāng) global 為 false 時(shí),只有新的消費(fèi)者需要遵守規(guī)則。
如果是 global 為 true 時(shí),同一個(gè) IConnection 中的消費(fèi)者均會(huì)被修改配置。
// 不受影響
// var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
// 新的消費(fèi)者受影響
// var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);
當(dāng)收到消息時(shí),如果需要明確拒絕該消息,可以使用 BasicReject,RabbitMQ 會(huì)將該消息從隊(duì)列中移除。
BasicReject()會(huì)觸發(fā)消息死信。
while (true)
{
var result = channel.BasicGet(queue: "q1", autoAck: false);
if (result == null) continue;
Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
channel.BasicReject(deliveryTag: result.DeliveryTag, requeue: true);
}
如果 requeue 參數(shù)設(shè)置為 true ,則 RabbitMQ 會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下個(gè)訂閱的消費(fèi)者,或者說該程序重啟后可以重新接收。
如果 requeue 參數(shù)設(shè)置為 false ,則 RabbitMQ立即會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消費(fèi)者。
如果想批量拒絕消息。
channel.BasicNack(deliveryTag: result.DeliveryTag, multiple: true, requeue: true);
multiple 為 true 時(shí),則表示拒絕 deliveryTag 編號(hào)之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息。
BasicRecover() 方法用來從 RabbitMQ 重新獲取還未被確認(rèn)的消息
當(dāng) requeue=true 時(shí),未被確認(rèn)的消息會(huì)被重新加入到隊(duì)列中,對(duì)于同一條消息來說,其會(huì)被分配給給其它消費(fèi)者。
當(dāng) requeue=false,同條消息會(huì)被分配給與之前相同的消費(fèi)者。
channel.BasicRecover(requeue: true);
// 異步
channel.BasicRecoverAsync(requeue: true);
消息確認(rèn)模式
前面提到,當(dāng) autoAck=false 時(shí),消息雖然沒有 ack,但是 RabbitMQ 還是會(huì)跳到下一個(gè)消息。
為了保證消息的順序性,在未將當(dāng)前消息消費(fèi)完成的情況下,不允許自動(dòng)消費(fèi)下一個(gè)消息。
只需要使用 BasicQos 配置即可:
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// 創(chuàng)建交換器
channel.ExchangeDeclare("acktest", ExchangeType.Fanout);
// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "myqueue5");
// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "acktest", queue: "myqueue5", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
// 發(fā)送消息
channel.BasicPublish(
exchange: "acktest",
routingKey: string.Empty,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);
i++;
}
// 未 ack 之前,不能消費(fèi)下一個(gè)
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
// channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue5",
autoAck: false,
consumer: consumer);
之前這段代碼后,你會(huì)發(fā)現(xiàn),第一條消息未被 ack 時(shí),程序不會(huì)自動(dòng)讀取下一條消息,也不會(huì)重新拉取未被 ack 的消息。
如果我們想重新讀取未被 ack 的消息,可以重新啟動(dòng)程序,或使用 BasicRecover() 讓服務(wù)器重新推送。
消息持久化
前面提到了 BasicPublish 函數(shù)的定義:
BasicPublish(string exchange,
string routingKey,
bool mandatory = false,
IBasicProperties basicProperties = null,
ReadOnlyMemory<byte> body = default)
當(dāng)設(shè)置 mandatory = true 時(shí),如果交換器無法根據(jù)自身的類型和路由鍵找到一個(gè)符合條件的隊(duì)列,那么 RabbitMQ 觸發(fā)客戶端的 IModel.BasicReturn 事件, 將消息返回給生產(chǎn)者 。
從設(shè)計(jì)上看,一個(gè) IConnection 雖然可以創(chuàng)建多個(gè) IModel(通道),但是只建議編寫一個(gè)消費(fèi)者程序或生產(chǎn)者程序,不建議混合多用。
因?yàn)楦黝愂录完?duì)列配置,是針對(duì)一個(gè) IModel(通道) 來設(shè)置的。
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.BasicReturn += (object sender, BasicReturnEventArgs e) =>
{
};
當(dāng)設(shè)置了 mandatory = true 時(shí),如果該消息找不到隊(duì)列存儲(chǔ)消息,那么就會(huì)觸發(fā)客戶端的 BasicReturn 事件接收 BasicPublish 失敗的消息。
完整示例代碼如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Runtime;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "e2", type: ExchangeType.Fanout, durable: false, autoDelete: false);
channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
Console.WriteLine($"無效消息:{Encoding.UTF8.GetString(e.Body.Span)}");
};
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "e2",
routingKey: string.Empty,
// mandatory=true,當(dāng)沒有隊(duì)列接收消息時(shí),會(huì)觸發(fā) BasicReturn 事件
mandatory: true,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
Console.ReadLine();
在實(shí)際開發(fā)中,當(dāng) mandatory=false 時(shí),如果一條消息推送到交換器,但是卻沒有綁定隊(duì)列,那么該條消息就會(huì)丟失,可能會(huì)導(dǎo)致嚴(yán)重的后果。
而在 RabbitMQ 中,提供了一種被稱為備胎交換器的方案,這是通過在定義交換器時(shí)添加 alternate-exchange 參數(shù)來實(shí)現(xiàn)。其作用是當(dāng) A 交換器無法找到隊(duì)列轉(zhuǎn)發(fā)消息時(shí),就會(huì)將消息轉(zhuǎn)發(fā)到 B 隊(duì)列中。
完整代碼示例如下:
首先創(chuàng)建 e3_bak 隊(duì)列,接著創(chuàng)建 e3 隊(duì)列時(shí)設(shè)置其備胎交換器為 e3_bak。
然后,e3_bak 需要綁定一個(gè)隊(duì)列消費(fèi)消息。
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "e3_bak",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
// 聲明 e3 交換器,當(dāng) e3 交換器沒有綁定隊(duì)列時(shí),消息將會(huì)被轉(zhuǎn)發(fā)到 e3_bak 交換器
channel.ExchangeDeclare(
exchange: "e3",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false,
arguments: new Dictionary<string, object> {
{ "alternate-exchange", "e3_bak" }
}
);
channel.QueueDeclare(queue: "q3", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q3", "e3_bak", routingKey: string.Empty);
// 因?yàn)橐呀?jīng)設(shè)置了 e3 的備用交換器,所以不會(huì)觸發(fā) BasicReturn
channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
Console.WriteLine($"無效消息:{Encoding.UTF8.GetString(e.Body.Span)}");
};
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "e3",
routingKey: string.Empty,
// 因?yàn)橐呀?jīng)設(shè)置了 e3 的備用交換器,所以開啟這個(gè)不會(huì)觸發(fā) BasicReturn
mandatory: true,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
Console.ReadLine();
注意,如果備胎交換器有沒有綁定合適隊(duì)列的話,那么該消息就會(huì)丟失。
如果 e3 是 Direct,e3_bak 也是 Direct,那么需要兩者具有相同的 routingKey,如果 e3 中有個(gè) routingKey = cat,但是 e3_bak 中不存在對(duì)應(yīng)的 routingKey,那么該消息還是會(huì)丟失的。還有其它一些情況,這里不再贅述。
推送消息時(shí),有一個(gè) IBasicProperties basicProperties 屬性,前面的小節(jié)中已經(jīng)介紹過該接口的屬性,當(dāng) IBasicProperties.DeliveryMode=2 時(shí),消息將被標(biāo)記為持久化,即使 RabbitMQ 服務(wù)器重啟,消息也不會(huì)丟失。
相對(duì)來說,通過前面的實(shí)驗(yàn),你可以觀察到客戶端把隊(duì)列的消息都消費(fèi)完畢后,隊(duì)列中的消息都會(huì)消失。而對(duì)應(yīng) Kafka 來說,一個(gè) topic 中的消息被消費(fèi),其依然會(huì)被保留。這一點(diǎn)要注意,使用 RabbitMQ 時(shí),需要提前設(shè)置好隊(duì)列消息的持久化,避免消費(fèi)或未成功消費(fèi)時(shí),消息丟失。
生產(chǎn)者在推送消息時(shí),可以使用 IBasicProperties.DeliveryMode=2 將該消息設(shè)置為持久化。
var ps = channel.CreateBasicProperties();
ps.DeliveryMode = 2;
channel.BasicPublish(
exchange: "e3",
routingKey: string.Empty,
mandatory: false,
basicProperties: ps,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
消息 TTL 時(shí)間
設(shè)置消息 TTL 時(shí)間后,該消息如果在一定時(shí)間內(nèi)沒有被消費(fèi),那么該消息就成為了死信消息。對(duì)于這種消息,會(huì)有大概這么兩個(gè)處理情況。
第一種,如果隊(duì)列設(shè)置了 "x-dead-letter-exchange" ,那么該消息會(huì)被從隊(duì)列轉(zhuǎn)發(fā)到另一個(gè)交換器中。這種方法在死信交換器一節(jié)中會(huì)介紹。
第二種,消息被丟棄。
目前有兩種方法可以設(shè)置消息的 TTL 。
第一種方法是通過隊(duì)列屬性設(shè)置,這樣一來隊(duì)列中所有消息都有相同的過期時(shí)間。
第二種方法是對(duì)單條消息進(jìn)行單獨(dú)設(shè)置,每條消息的 TTL 可以不同。
如果兩種設(shè)置一起使用,則消息的 TTL 以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列中的生存時(shí)一旦超過設(shè)置 TTL 值時(shí),消費(fèi)者將無法再收到該消息,所以最好設(shè)置死信交換器。
第一種,對(duì)隊(duì)列設(shè)置:
channel.QueueDeclare(queue: "q4",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>() { { "x-message-ttl", 6000 } });
第二種通過設(shè)置屬性配置消息過期時(shí)間。
var ps = channel.CreateBasicProperties();
// 單位毫秒
ps.Expiration = "6000";
對(duì)于第一種設(shè)置隊(duì)列屬性的方法,一旦消息過期就會(huì)從隊(duì)列中抹去(如果設(shè)置了死信交換器,會(huì)被轉(zhuǎn)發(fā)到死信交換器中)。而在第二種方法中,即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)樵摋l消息在即將投遞到消費(fèi)者之前,才會(huì)檢查消息是否過期。對(duì)于第二種情況,當(dāng)隊(duì)列進(jìn)行任何一次輪詢操作時(shí),才會(huì)被真正移除。
對(duì)于第二種情況,雖然是在被輪詢時(shí),過期了才會(huì)被真正移除,但是一旦過期,就會(huì)被轉(zhuǎn)發(fā)到死信隊(duì)列中,只是不會(huì)立即移除。
隊(duì)列 TTL 時(shí)間
當(dāng)對(duì)一個(gè)隊(duì)列設(shè)置 TTL 時(shí),如果該隊(duì)列在規(guī)定時(shí)間內(nèi)沒被使用,那么該隊(duì)列就會(huì)被刪除。這個(gè)約束包括一段時(shí)間內(nèi)沒有被消費(fèi)消息(包括 BasicGet() 方式消費(fèi)的)、沒有被重新聲明、沒有消費(fèi)者連接,否則被刪除的倒計(jì)時(shí)間會(huì)被重置。
channel.QueueDeclare(queue: "q6",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
// 單位是毫秒,設(shè)置 隊(duì)列過期時(shí)間是 1 小時(shí)
{"x-expires",1*3600*1000}
});
DLX 死信交換器
DLX(Dead-Letter-Exchange) 死信交換器,消息在一個(gè)隊(duì)列 A 中變成死信之后,它能被重新被發(fā)送到另一個(gè) B 交換器中。其中 A 隊(duì)列綁定了死信交換器,那么在Management UI 界面會(huì)看到 DLX 標(biāo)識(shí),而 B 交換器就是一個(gè)普通的交換器,無需配置。
消息變成死信 般是由于以下幾種情況:
- 消息被消費(fèi)者拒絕,
BasicReject()、BasicNack()兩個(gè)函數(shù)可以拒絕消息。 - 消息過期。
- 隊(duì)列達(dá)到最大長(zhǎng)度。
當(dāng)這個(gè)隊(duì)列 A 中存在死信消息時(shí),RabbitMQ 就會(huì)自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的交換器 B 中。一般會(huì)專門給重要的隊(duì)列設(shè)置死信交換器 B,而交換器 B 也需要綁定一個(gè)隊(duì)列 C 才行,不然消息也會(huì)丟失。
設(shè)置隊(duì)列出現(xiàn)死信消息時(shí),將消息轉(zhuǎn)發(fā)到哪個(gè)交換器中:
channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange", "e7_bak" } });
完整示例代碼如下所示:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "e7_bak",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.QueueDeclare(queue: "q7_bak", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q7_bak", "e7_bak", routingKey: string.Empty);
channel.ExchangeDeclare(
exchange: "e7",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange", "e7_bak" } });
channel.QueueBind(queue: "q7", "e7", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
channel.BasicPublish(
exchange: "e7",
routingKey: string.Empty,
mandatory: false,
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}"));
i++;
}
Thread.Sleep(1000);
int y = 0;
// 定義消費(fèi)者
channel.BasicQos(0, prefetchCount: 1, true);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] Received {message}");
if (y % 2 == 0)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
// requeue 要設(shè)置為 false 才行,
// 否則此消息被拒絕后還會(huì)被放回隊(duì)列。
else
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
Interlocked.Add(ref y, 1);
};
// 開始消費(fèi)
channel.BasicConsume(queue: "q7",
autoAck: false,
consumer: consumer);
Console.ReadLine();
延遲隊(duì)列
RabbitMQ 本身沒有直接支持延遲隊(duì)列的功能。
那么為什么會(huì)出現(xiàn)延遲隊(duì)列這種東西呢?
主要是因?yàn)橄⑼扑秃螅幌肓⒓幢幌M(fèi)。比如說,用戶下單后,如果 10 分鐘內(nèi)沒有支付,那么該訂單會(huì)被自動(dòng)取消。所以需要做一個(gè)消息被延遲消費(fèi)的功能。
所以說,實(shí)際需求是,該消息在一定時(shí)間之后才能被消費(fèi)者消費(fèi)。
在 RabbitMQ 中做這個(gè)功能,需要使用兩個(gè)交換器,以及至少兩個(gè)隊(duì)列。
思路是定義兩個(gè)交換器 e8、e9 和兩個(gè)隊(duì)列 q8、q9,交換器 e8 和隊(duì)列 q8 綁定、交換器 e9 和 q9 綁定。
最重要的一點(diǎn)來了,q9 設(shè)置了死信隊(duì)列,當(dāng)消息 TTL 時(shí)間到時(shí),轉(zhuǎn)發(fā)到 e9 交換器中。所以,e9 交換器 - q9 隊(duì)列 接收到的都是到期(或者說過期)的消息。
在發(fā)送消息到 e8 交換器時(shí),設(shè)置 TTL 時(shí)間。當(dāng) q8 隊(duì)列中的消息過期時(shí),消息會(huì)被轉(zhuǎn)發(fā)到 e9 交換器,然后存入 q9 隊(duì)列。
消費(fèi)者只需要訂閱 q9 隊(duì)列,即可消費(fèi)到期后的消息。
全部完整代碼示例如下:
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(
exchange: "e8",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.ExchangeDeclare(
exchange: "e9",
type: ExchangeType.Fanout,
durable: false,
autoDelete: false
);
channel.QueueDeclare(queue: "q9", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q9", "e9", routingKey: string.Empty);
channel.QueueDeclare(queue: "q8", durable: false, exclusive: false, autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange", "e9" } });
channel.QueueBind(queue: "q8", "e8", routingKey: string.Empty);
int i = 0;
while (i < 10)
{
var ps = channel.CreateBasicProperties();
ps.Expiration = "6000";
channel.BasicPublish(
exchange: "e8",
routingKey: string.Empty,
mandatory: false,
basicProperties: ps,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
i++;
}
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.Span);
Console.WriteLine($" [x] 已到期消息 {message}");
};
// 開始消費(fèi)
channel.BasicConsume(queue: "q9",
autoAck: true,
consumer: consumer);
Console.ReadLine();
消息優(yōu)先級(jí)
消息優(yōu)先級(jí)越高,就會(huì)越快被消費(fèi)者消費(fèi)。
代碼示例如下:
var ps = channel.CreateBasicProperties();
// 優(yōu)先級(jí) 0-9
ps.Priority = 9;
channel.BasicPublish(
exchange: "e8",
routingKey: string.Empty,
mandatory: false,
basicProperties: ps,
body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);
所以說,RabbitMQ 不一定可以保證消息的順序性,這一點(diǎn)跟 Kafka 是有區(qū)別的。
事務(wù)機(jī)制
事務(wù)機(jī)制是,發(fā)布者確定消息一定推送到 RabbitMQ Broker 中,往往會(huì)跟業(yè)務(wù)代碼一起使用。
比如說,用戶成功支付之后,推送一個(gè)通知到 RabbitMQ 隊(duì)列中。
數(shù)據(jù)庫(kù)當(dāng)然要做事務(wù),這樣在支付失敗后修改的數(shù)據(jù)會(huì)被回滾。但是問題來了,如果消息已經(jīng)推送了,但是數(shù)據(jù)庫(kù)卻回滾了。
這個(gè)時(shí)候會(huì)涉及到一致性,可以使用 RabbitMQ 的事務(wù)機(jī)制來處理,其思路跟數(shù)據(jù)庫(kù)事務(wù)過程差不多,也是有提交和回滾操作。
其目的是確保消息成功推送到 RabbitMQ Broker 以及跟客戶端其它代碼保持?jǐn)?shù)據(jù)一致,推送消息跟代碼操作同時(shí)成功或同時(shí)回滾。
其完整的代碼示例如下:
ConnectionFactory factory = new ConnectionFactory
{
HostName = "localhost"
};
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// 客戶端發(fā)送 Tx.Select.將信道置為事務(wù)模式;
channel.TxSelect();
try
{
// 發(fā)送消息
channel.QueueDeclare(queue: "transaction_queue",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: "transaction_queue",
basicProperties: null,
body: body);
// 執(zhí)行一系列操作
// 提交事務(wù)
channel.TxCommit();
Console.WriteLine(" [x] Sent '{0}'", message);
}
catch (Exception e)
{
// 回滾事務(wù)
channel.TxRollback();
Console.WriteLine("An error occurred: " + e.Message);
}
Console.ReadLine();
發(fā)送方確認(rèn)機(jī)制
發(fā)送方確認(rèn)機(jī)制,是保證消息一定推送到 RabbitMQ 的方案。
而事務(wù)機(jī)制,一般是為了保證一致性,推送消息和其它操作同時(shí)成功或同時(shí)失敗,不能出現(xiàn)兩者不一致的情況。
其完整代碼示例如下:
using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
// 開啟發(fā)送方確認(rèn)模式
channel.ConfirmSelect();
string exchangeName = "exchange_name";
string routingKey = "routing_key";
// 定義交換器
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
// 發(fā)送消息
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
// 發(fā)布消息
channel.BasicPublish(exchange: exchangeName,
routingKey: routingKey,
basicProperties: null,
body: body);
// 等待確認(rèn)已推送到 RabbitMQ
if (channel.WaitForConfirms())
{
Console.WriteLine(" [x] Sent '{0}'", message);
}
else
{
Console.WriteLine("Message delivery failed.");
}
Console.ReadLine();
文章寫到這里,恰好一萬詞。
對(duì)于 RabbitMQ 集群、運(yùn)維等技術(shù),本文不再贅述。
總結(jié)
以上是生活随笔為你收集整理的万字长文:从 C# 入门学会 RabbitMQ 消息队列编程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2023.11.11 模拟赛
- 下一篇: C# 12 Blazor入门教程