日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > C# >内容正文

C#

万字长文:从 C# 入门学会 RabbitMQ 消息队列编程

發(fā)布時(shí)間:2023/11/18 C# 87 coder
生活随笔 收集整理的這篇文章主要介紹了 万字长文:从 C# 入门学会 RabbitMQ 消息队列编程 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

RabbitMQ 教程

目錄
  • RabbitMQ 教程
    • RabbitMQ 簡(jiǎn)介
    • 安裝與配置
      • 安裝 RabbitMQ
    • 發(fā)布與訂閱模型
      • 生產(chǎn)者、消費(fèi)者、交換器、隊(duì)列
      • 多工作隊(duì)列
      • 交換器類型
        • Direct
        • Fanout
        • Topic
        • 交換器綁定交換器
    • 消費(fèi)者、消息屬性
      • Qos 、拒絕接收
      • 消息確認(rèn)模式
      • 消息持久化
      • 消息 TTL 時(shí)間
      • 隊(duì)列 TTL 時(shí)間
      • DLX 死信交換器
      • 延遲隊(duì)列
      • 消息優(yōu)先級(jí)
      • 事務(wù)機(jī)制
      • 發(fā)送方確認(rèn)機(jī)制

本文已推送到 github :https://github.com/whuanle/learnrabbitmq
如果文章排版不方便閱讀,可以到倉(cāng)庫(kù)下載原版 markdown 文件閱讀。

RabbitMQ 簡(jiǎn)介

RabbitMQ 是一個(gè)實(shí)現(xiàn)了 AMQP 協(xié)議的消息隊(duì)列,AMQP 被定義為作為消息傳遞中間件的開放標(biāo)準(zhǔn)的應(yīng)用層協(xié)議。它代表高級(jí)消息隊(duì)列協(xié)議,具有消息定位、路由、隊(duì)列、安全性和可靠性等特點(diǎn)。

目前社區(qū)上比較流行的消息隊(duì)列有 kafka、ActiveMQ、Pulsar、RabbitMQ、RabbitMQ 等。

筆者也編寫了 一系列的 Kafka 教程,歡迎閱讀:https://kafka.whuanle.cn/

RabbitMQ 的優(yōu)點(diǎn)、用途等,大概是可靠性高、靈活的路由規(guī)則配置、支持分布式部署、遵守 AMQP 協(xié)議等??梢杂糜诋惒酵ㄓ?、日志收集(日志收集還是 Kafka 比較好)、事件驅(qū)動(dòng)架構(gòu)系統(tǒng)、應(yīng)用通訊解耦等。

RabbitMQ 社區(qū)版本的特點(diǎn)如下:

  • 支持多種消息傳遞協(xié)議、消息隊(duì)列、傳遞確認(rèn)、靈活的隊(duì)列路由、多種交換類型(交換器)。

  • 支持 Kubernetes 等分布式部署,提供多種語言的 SDK,如 Java、Go、C#。

  • 可插入的身份驗(yàn)證、授權(quán),支持 TLS 和 LDAP。

  • 支持持續(xù)集成、操作度量和與其他企業(yè)系統(tǒng)集成的各種工具和插件。

  • 提供一套用于管理和監(jiān)視 RabbitMQ 的 HTTP-API、命令行工具和 UI。

RabbitMQ 的基本對(duì)象有以下幾點(diǎn),但是讀者現(xiàn)在并不需要記住,在后面的章節(jié)中,筆者將會(huì)逐個(gè)介紹。

  • 生產(chǎn)者(Producer):推送消息到 RabbitMQ 的程序。
  • 消費(fèi)者(Consumer):從 RabbitMQ 消費(fèi)消息的程序。
  • 隊(duì)列(Queue):RabbitMQ 存儲(chǔ)消息的地方,消費(fèi)者可以從隊(duì)列中獲取消息。
  • 交換器(Exchange):接收來自生產(chǎn)者的消息,并將消息路由到一個(gè)或多個(gè)隊(duì)列中。
  • 綁定(Binding):將隊(duì)列和交換器關(guān)聯(lián)起來,當(dāng)生產(chǎn)者推送消息時(shí),交換器將消息路由到隊(duì)列中。
  • 路由鍵(Routing Key):用于交換器將消息路由到特定隊(duì)列的匹配規(guī)則。

RabbitMQ 的技術(shù)知識(shí)點(diǎn)大概分為:

  • 用戶和權(quán)限:配置用戶、角色和其對(duì)應(yīng)的權(quán)限。
  • Virtual Hosts:配置虛擬主機(jī),用于分隔不同的消息隊(duì)列環(huán)境。
  • Exchange 和 Queue 的屬性:配置交換器和隊(duì)列的屬性,比如持久化、自動(dòng)刪除等。
  • Policies:定義策略來自動(dòng)設(shè)置隊(duì)列、交換器和鏈接的參數(shù)。
  • 連接和通道:配置連接和通道的屬性,如心跳間隔、最大幀大小等。
  • 插件:?jiǎn)⒂煤团渲酶鞣N插件,如管理插件、STOMP 插件等。
  • 集群和高可用性:配置集群和鏡像隊(duì)列,以提供高可用性。
  • 日志和監(jiān)控:配置日志級(jí)別、目標(biāo)和監(jiān)控插件。
  • 安全性:配置 SSL/TLS 選項(xiàng)、認(rèn)證后端等安全相關(guān)的設(shè)置。

由于筆者技術(shù)有限以及篇幅限制,本文只講解與 C# 編程相關(guān)的技術(shù)細(xì)節(jié),從中了解 RabbitMQ 的編碼技巧和運(yùn)作機(jī)制。

安裝與配置

安裝 RabbitMQ

讀者可以在 RabbitMQ 官方文檔中找到完整的安裝教程:https://www.rabbitmq.com/download.html

本文使用 Docker 的方式部署。

RabbitMQ 社區(qū)鏡像列表:https://hub.docker.com/_/rabbitmq

創(chuàng)建目錄用于映射存儲(chǔ)卷:

mkdir -p /opt/lib/rabbitmq

部署容器:

docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 \
-v /opt/lib/rabbitmq:/var/lib/rabbitmq \
rabbitmq:3.12.8-management

部署時(shí)占用兩個(gè)端口。5672 是 MQ 通訊端口,15672 是 Management UI 工具端口。

打開 15672 端口,會(huì)進(jìn)入 Web 登錄頁面,默認(rèn)賬號(hào)密碼都是 guest。

關(guān)于 RabbitMQ Management UI 的使用方法,后續(xù)再介紹。

打開管理界面后會(huì),在 Exchanges 菜單中,可以看到如下圖表格。這些是默認(rèn)的交換器?,F(xiàn)在可以不需要了解這些東西,后面會(huì)有介紹。

Virtual host Name Type Features
/ (AMQP default) direct D
/ amq.direct direct D
/ amq.fanout fanout D
/ amq.headers headers D
/ amq.match headers D
/ amq.rabbitmq.trace topic D I
/ amq.topic topic D

發(fā)布與訂閱模型

使用 C# 開發(fā) RabbitMQ,需要使用 nuget 引入 RabbitMQ.Client,官網(wǎng)文檔地址:.NET/C# RabbitMQ Client Library — RabbitMQ

在繼續(xù)閱讀文章之前,請(qǐng)先創(chuàng)建一個(gè)控制臺(tái)程序。

生產(chǎn)者、消費(fèi)者、交換器、隊(duì)列

為了便于理解,本文制作了幾十張圖片,約定一些圖形表示的含義:

對(duì)應(yīng)生產(chǎn)者,使用如下圖表示:

對(duì)于消費(fèi)者,使用如下圖表示:

對(duì)于消息隊(duì)列,使用如下圖表示:

對(duì)于交換器,使用如下圖表示:

在 RabbitMQ 中,生產(chǎn)者發(fā)布的消息是不會(huì)直接進(jìn)入到隊(duì)列中,而是經(jīng)過交換器(Exchange) 分發(fā)到各個(gè)隊(duì)列中。前面提到,部署 RabbitMQ 后,默認(rèn)有 七個(gè)交換器,如 (AMQP default)amq.direct 等。

當(dāng)然,對(duì)于現(xiàn)在來說,我們不需要了解交換器,所以,在本節(jié)的教程中,會(huì)使用默認(rèn)交換器完成實(shí)驗(yàn)。

忽略交換器存在的情況下,我們可以將生產(chǎn)和消費(fèi)的流程簡(jiǎn)化如下圖所示:

請(qǐng)一定要注意,圖中省略了交換器的存在,因?yàn)槭褂玫氖悄J(rèn)的交換器。但是生產(chǎn)者推送消息必須是推送到交換器,而不是隊(duì)列這一句一定要弄清楚。

對(duì)于消費(fèi)者來說,要使用隊(duì)列,必須確保隊(duì)列已經(jīng)存在。

使用 C# 聲明(創(chuàng)建)一個(gè)隊(duì)列的代碼和參數(shù)如下所示:

// 聲明一個(gè)隊(duì)列
channel.QueueDeclare(
	// 隊(duì)列名稱
	queue: "myqueue",

	// 持久化配置,隊(duì)列是否能夠在 broker 重啟后存活
	durable: false,

	// 連接關(guān)閉時(shí)被刪除該隊(duì)列
	exclusive: false,

	// 當(dāng)最后一個(gè)消費(fèi)者(如果有的話)退訂時(shí),是否應(yīng)該自動(dòng)刪除這個(gè)隊(duì)列
	autoDelete: false,

	// 額外的參數(shù)配置
	arguments: null
	);

完整代碼示例:


ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

// 連接
using IConnection connection = factory.CreateConnection();

// 通道
using IModel channel = connection.CreateModel();

channel.QueueDeclare(
	// 隊(duì)列名稱
	queue: "myqueue",

	// 持久化配置,隊(duì)列是否能夠在 broker 重啟后存活
	durable: false,

	// 連接關(guān)閉時(shí)被刪除該隊(duì)列
	exclusive: false,

	// 當(dāng)最后一個(gè)消費(fèi)者(如果有的話)退訂時(shí),是否應(yīng)該自動(dòng)刪除這個(gè)隊(duì)列
	autoDelete: false,

	// 額外的參數(shù)配置
	arguments: null
	);
  • queue:隊(duì)列的名稱。

  • durable:設(shè)置是否持久化。持久化的隊(duì)列會(huì)存盤,在服務(wù)器重啟的時(shí)候可以保證不丟失相關(guān)信息。

  • exclusive 設(shè)置是否排他。如果一個(gè)隊(duì)列被聲明為排他隊(duì)列,該隊(duì)列僅對(duì)首次聲明它的連接可見,并在連接斷開時(shí)自動(dòng)刪除。

  • 該配置是基于 IConnection 的,同一個(gè) IConnection 創(chuàng)建的不同通道 (IModel) ,也會(huì)遵守此規(guī)則。

  • autoDelete:設(shè)置是否自動(dòng)刪除。自動(dòng)刪除的前提是至少有一個(gè)消費(fèi)者連接到這個(gè)隊(duì)列,之后所有與這個(gè)隊(duì)列連接的消費(fèi)者都斷開時(shí),才會(huì)自動(dòng)刪除。

  • argurnents: 設(shè)置隊(duì)列的其他一些參數(shù),如隊(duì)列的消息過期時(shí)間等。

如果隊(duì)列已經(jīng)存在,不需要再執(zhí)行 QueueDeclare()。重復(fù)調(diào)用 QueueDeclare(),如果參數(shù)相同,不會(huì)出現(xiàn)副作用,已經(jīng)推送的消息也不會(huì)出問題。

但是,如果 QueueDeclare() 參數(shù)如果跟已存在的隊(duì)列配置有差異,則可能會(huì)報(bào)錯(cuò)。

一般情況下,為了合理架構(gòu)和可靠性,會(huì)由架構(gòu)師等在消息隊(duì)列中提前創(chuàng)建好交換器、隊(duì)列,然后客戶端直接使用即可。一般不讓程序啟動(dòng)時(shí)設(shè)置,這樣會(huì)帶來很大的不確定性和副作用。

生產(chǎn)者發(fā)送消息時(shí)的代碼也很簡(jiǎn)單,指定要發(fā)送到哪個(gè)交換器或路由中即可。

請(qǐng)一定要注意,RabbitMQ 生產(chǎn)者發(fā)送消息,推送到的是交換器,而不是直接推送到隊(duì)列!

channel.BasicPublish(

	// 使用默認(rèn)交換器
	exchange: string.Empty,

	// 推送到哪個(gè)隊(duì)列中
	routingKey: "myqueue",

	// 隊(duì)列屬性
	basicProperties: null,

	// 要發(fā)送的消息需要先轉(zhuǎn)換為 byte[]
	body: Encoding.UTF8.GetBytes("測(cè)試")
	);

BasicPublish 有三個(gè)重載:

BasicPublish(
    PublicationAddress addr, 
    IBasicProperties basicProperties, 
    ReadOnlyMemory<byte> body)
BasicPublish(string exchange, 
             string routingKey, 
             IBasicProperties basicProperties, 
             ReadOnlyMemory<byte> body)
BasicPublish(string exchange, 
             string routingKey, 
             bool mandatory = false, 
             IBasicProperties basicProperties = null, 
             ReadOnlyMemory<byte> body = default)
  • exchange: 交換器的名稱,如果留空則會(huì)推送到默認(rèn)交換器。
  • routingKey: 路由鍵,交換器根據(jù)路由鍵將消息存儲(chǔ)到相應(yīng)的隊(duì)列之中。
  • basicProperties:消息屬性,如過期時(shí)間等。
  • mandatory:值為 false 時(shí),如果交換器沒有綁定合適的隊(duì)列,則該消息會(huì)丟失。值為 true 時(shí),如果交換器沒有綁定合適的隊(duì)列,則會(huì)觸發(fā)IModel.BasicReturn 事件。

IBasicProperties basicProperties 參數(shù)是接口,我們可以使用 IModel.CreateBasicProperties() 創(chuàng)建一個(gè)接口對(duì)象。

IBasicProperties 接口中封裝了很多屬性,使得我們不需要使用字符串的顯示傳遞配置。

IBasicProperties 其完整屬性如下:

// 標(biāo)識(shí)應(yīng)用程序的 ID
public String AppId { set; get; }

// 標(biāo)識(shí)集群的 ID
public String ClusterId { set; get; }

// 指定消息內(nèi)容的編碼方式,例如 "utf-8"
public String ContentEncoding { set; get; }

// 指定消息內(nèi)容的 MIME 類型,例如 "application/json"
public String ContentType { set; get; }

// 用于關(guān)聯(lián)消息之間的關(guān)系,通常用于 RPC(遠(yuǎn)程過程調(diào)用)場(chǎng)景
public String CorrelationId { set; get; }

// 指定消息的持久化方式,值 1:不持久化,值 2:持久化
public Byte DeliveryMode { set; get; }

// 單位毫秒,指定該消息的過期時(shí)間
public String Expiration { set; get; }

// 自定義消息的頭部信息
public IDictionary`2 Headers { set; get; }

// 指定消息的唯一標(biāo)識(shí)符
public String MessageId { set; get; }

// 是否持久化
public Boolean Persistent { set; get; }

// 指定消息的優(yōu)先級(jí),范圍從 0 到 9
public Byte Priority { set; get; }

// 指定用于回復(fù)消息的隊(duì)列名稱
public String ReplyTo { set; get; }

// 指定用于回復(fù)消息的地址信息
public PublicationAddress ReplyToAddress { set; get; }

// 指定消息的時(shí)間戳
public AmqpTimestamp Timestamp { set; get; }

// 消息的類型
public String Type { set; get; }

// 標(biāo)識(shí)用戶的 ID
public String UserId { set; get; }

推送消息時(shí),可以對(duì)單個(gè)消息細(xì)粒度地設(shè)置 IBasicProperties :

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);

var properties = channel.CreateBasicProperties();
// 示例 1:
properties.Persistent = true;
properties.ContentType = "application/json";
properties.ContentEncoding = "UTF-8";

// 示例 2:
//properties.Persistent = true;
//properties.ContentEncoding = "gzip";
//properties.Headers = new Dictionary<string, object>();

channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "q1",
	basicProperties: properties,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
);

對(duì)于 IBasicProperties 的使用,文章后面會(huì)有更加詳細(xì)的介紹。

現(xiàn)在,我們推送了 10 條消息到隊(duì)列中,然后在 Management UI 中觀察。

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "myqueue",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}

我們可以在 UI 的 Queues and Streams 中看到當(dāng)前所有的隊(duì)列。

可以看到當(dāng)前隊(duì)列中的 Ready 狀態(tài) Unacked 狀態(tài)的消息數(shù),分別對(duì)應(yīng)上文中的等待投遞給消費(fèi)者的消息數(shù)和己經(jīng)投遞給消費(fèi)者但是未收到確認(rèn)信號(hào)的消息數(shù)

點(diǎn)擊該隊(duì)列后,會(huì)打開如下圖所示的界面。

首先看 Overview。

Ready 指還沒有被消費(fèi)的消息數(shù)量。

Unacked 指消費(fèi)但是沒有 ack 的消息數(shù)量。

另一個(gè) Message rates 圖表,指的是發(fā)布、消費(fèi)消息的速度,因?yàn)椴恢匾?,因此這里不說明。

在 Bindings 中,可以看到該隊(duì)列綁定了默認(rèn)的交換器。

然后編寫一個(gè)消費(fèi)者,消費(fèi)該隊(duì)列中的消息,其完整代碼如下:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.QueueDeclare(
	// 隊(duì)列名稱
	queue: "myqueue",

	// 持久化配置,隊(duì)列是否能夠在 broker 重啟后存活
	durable: false,

	// 連接關(guān)閉時(shí)被刪除該隊(duì)列
	exclusive: false,

	// 當(dāng)最后一個(gè)消費(fèi)者(如果有的話)退訂時(shí),是否應(yīng)該自動(dòng)刪除這個(gè)隊(duì)列
	autoDelete: false,

	// 額外的參數(shù)配置
	arguments: null
	);

// 定義消費(fèi)者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
};

// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue",
					 autoAck: true,
					 consumer: consumer);

Console.ReadLine();

注意,如果填寫了一個(gè)不存在的隊(duì)列,那么程序會(huì)報(bào)異常。

在消費(fèi)者程序未退出前,即 IConnection 未被 Dispose() 之前,可以在 Consumers 中看到消費(fèi)者客戶端程序信息。

那么,如果我們只消費(fèi),不設(shè)置自動(dòng) ack 呢?

將消費(fèi)者代碼改成:

channel.BasicConsume(queue: "myqueue",
					 autoAck: false,
					 consumer: consumer);

完整代碼如下:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.QueueDeclare(
	queue: "myqueue",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: null
	);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "myqueue",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}

// 定義消費(fèi)者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
};

// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue",
					 autoAck: false,
					 consumer: consumer);

Console.ReadLine();

此時(shí)會(huì)發(fā)現(xiàn),所有的消息都已經(jīng)讀了,但是 Unacked 為 10。

如下圖所示,autoAck: false 之后,如果重新啟動(dòng)程序(只消費(fèi),不推送消息),那么程序會(huì)繼續(xù)重新消費(fèi)一遍。

對(duì)于未 ack 的消息,消費(fèi)者重新連接后,RabbitMQ 會(huì)再次推送。

與 Kafka 不同的是,Kafka 如果沒有 ack 當(dāng)前消息,則服務(wù)器會(huì)自動(dòng)重新發(fā)送該條消息給消費(fèi)者,如果該條消息未完成,則會(huì)一直堵塞在這里。而對(duì)于 RabbitMQ,未被 ack 的消息會(huì)被暫時(shí)忽略,自動(dòng)消費(fèi)下一條。所以基于這一點(diǎn),默認(rèn)情況下,RabbitMQ 是不能保證消息順序性。

當(dāng)然, RabbitMQ 是很靈活的,我們可以選擇性地消費(fèi)部分消息,避免當(dāng)前消息阻塞導(dǎo)致程序不能往下消費(fèi):

	// 定義消費(fèi)者
	int i = 0;
	var consumer = new EventingBasicConsumer(channel);
	consumer.Received += (model, ea) =>
	{
		var message = Encoding.UTF8.GetString(ea.Body.Span);
		Console.WriteLine($" [x] Received {message}");
		i++;
        // 確認(rèn)該消息被正確消費(fèi)
		if (i % 2 == 0)
			channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
	};

	// 開始消費(fèi)
	channel.BasicConsume(queue: "myqueue",
						 autoAck: false,
						 consumer: consumer);

在某些場(chǎng)景下,這個(gè)特性很有用,我們可以將多次執(zhí)行失敗的消息先放一放,轉(zhuǎn)而消費(fèi)下一條消息,從而避免消息堆積。

多工作隊(duì)列

如果同一個(gè)隊(duì)列的不同客戶端綁定到交換器中,多個(gè)消費(fèi)者一起工作的話,那么會(huì)發(fā)生什么情況?

對(duì)于第一種情況,RabbitMQ 會(huì)將消息平均分發(fā)給每個(gè)客戶端。

該條件成立的基礎(chǔ)是,兩個(gè)消費(fèi)者是不同的消費(fèi)者,如果在同一個(gè)程序里面參加不同的實(shí)例去消費(fèi),但是因?yàn)槠浔蛔R(shí)別為同一個(gè)消費(fèi)者,則規(guī)則無效。

但是,RabbitMQ 并不會(huì)看未確認(rèn)的消息數(shù)量,它只是盲目地將第 n 個(gè)消息發(fā)送給第 n 個(gè)消費(fèi)者。

另外在指定交換器名稱的情況下,我們可以將 routingKey 設(shè)置為空,這樣發(fā)布的消息會(huì)由交換器轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中。

	channel.BasicPublish(
	exchange: "logs",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);

而多隊(duì)列對(duì)應(yīng)一個(gè)交換器的情況比較復(fù)雜,后面的章節(jié)會(huì)提到。

生產(chǎn)者和消費(fèi)者都能夠使用 QueueDeclare() 來聲明一個(gè)隊(duì)列。所謂的聲明,實(shí)際上是對(duì) RabbitMQ Broker 請(qǐng)求創(chuàng)建一個(gè)隊(duì)列,因此誰來創(chuàng)建都是一樣的。

跟聲明隊(duì)列相關(guān)的,還有兩個(gè)函數(shù):

// 無論創(chuàng)建失敗與否,都不理會(huì)
channel.QueueDeclareNoWait();
// 判斷隊(duì)列是否存在,如果不存在則彈出異常,存在則什么也不會(huì)發(fā)生
channel.QueueDeclarePassive();

此外,我們還可以刪除隊(duì)列:

// ifUnused: 隊(duì)列沒有被使用時(shí)
// ifEmpty: 隊(duì)列中沒有堆積的消息時(shí)
channel.QueueDelete(queue: "aaa", ifUnused: true, ifEmpty: true);

交換器類型

生產(chǎn)者只能向交換器推送消息,而不能向隊(duì)列推送消息。

推送消息時(shí),可以指定交換器名稱和路由鍵。

如下面代碼所示:

	channel.BasicPublish(
	exchange: string.Empty,
	routingKey: "myqueue",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);

ExchangeType 中定義了幾種交換器類型的名稱。

    public static class ExchangeType
    {
        public const string Direct = "direct";
        public const string Fanout = "fanout";
        public const string Headers = "headers";
        public const string Topic = "topic";
        private static readonly string[] s_all = {Fanout, Direct, Topic, Headers};
    }

在使用一個(gè)交換器之前,需要先聲明一個(gè)交換器:

channel.ExchangeDeclare("logs", ExchangeType.Fanout);

如果交換器已存在,重復(fù)執(zhí)行聲明代碼,只要配置跟現(xiàn)存的交換器配置區(qū)配,則 RabbitMQ 啥也不干,不會(huì)出現(xiàn)副作用。

但是,不能出現(xiàn)不一樣的配置,例如已存在的交換器是 Fanout 類型,但是重新執(zhí)行代碼聲明隊(duì)列為 Direct 類型。

ExchangeDeclare 函數(shù)的定義如下:

ExchangeDeclare(string exchange, 
                string type, 
                bool durable = false, 
                bool autoDelete = false,
                IDictionary<string, object> arguments = null)
  • exchange: 交換器的名稱。
  • type 交換器的類型,如 fanout、direct、topic。
  • durable: 設(shè)置是否持久 durab ,如果值為 true,則服務(wù)器重啟后也不會(huì)丟失。
  • autoDelete:設(shè)置是否自動(dòng)刪除。
  • argument:其他一些結(jié)構(gòu)化參數(shù)。

當(dāng)然,交換器也可以被刪除。

// ifUnused 只有在隊(duì)列未被使用的情況下,才會(huì)刪除
channel.ExchangeDelete(exchange: "log", ifUnused: true);

還有一個(gè) NotWait 方法。

channel.ExchangeDeclareNoWait("logs", ExchangeType.Direct);
//channel.ExchangeDeclareNoWait(...);

即使重新聲明交換器和刪除時(shí)有問題,由于其返回 void,因此操作失敗也不會(huì)報(bào)異常。

也有個(gè)判斷交換器是否存在的方法。如果交換器不存在,則會(huì)拋出異常,如果交換器存在,則什么也不會(huì)發(fā)生。

channel.ExchangeDeclarePassive("logs")

創(chuàng)建多個(gè)隊(duì)列后,還需要將隊(duì)列和交換器綁定起來。

如下代碼所示,其交換器綁定了兩個(gè)隊(duì)列,生產(chǎn)者推送消息到交換器時(shí),兩個(gè)隊(duì)列都會(huì)收到相同的消息。

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// 創(chuàng)建交換器
channel.ExchangeDeclare("logs", ExchangeType.Fanout);

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(
	queue: "myqueue1",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: null
	);
channel.QueueDeclare(
	queue: "myqueue2",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: null
	);

channel.QueueBind(queue: "myqueue1", exchange: "logs", routingKey: string.Empty);
channel.QueueBind(queue: "myqueue2", exchange: "logs", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "logs",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}

推送消息后,每個(gè)綁定了 logs 交換器的隊(duì)列都會(huì)收到相同的消息。

注意,由于交換器不會(huì)存儲(chǔ)消息,因此,再創(chuàng)建一個(gè) myqueue3 的消息隊(duì)列綁定 logs 交換器時(shí),myqueue3 只會(huì)接收到綁定之后推送的消息,不能得到更早之前的消息。

交換器有以下類型:

  • direct:根據(jù) routingKey 將消息傳遞到隊(duì)列。
  • topic:有點(diǎn)復(fù)雜。根據(jù)消息路由鍵與用于將隊(duì)列綁定到交換器的模式之間的匹配將消息路由到一個(gè)或多個(gè)隊(duì)列。
  • headers:本文不講,所以不做解釋。
  • fanout:只要綁定即可,不需要理會(huì)路由。

Direct

direct 是根據(jù) routingKey 將消息推送到不同的隊(duì)列中。

首先,創(chuàng)建多個(gè)隊(duì)列。

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");

然后將隊(duì)列綁定交換器時(shí),綁定關(guān)系需要設(shè)置 routingKey。

// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");

最后,推送消息時(shí),需要指定交換器名稱,以及 routingKey。

// 發(fā)送消息時(shí),需要指定 routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);

當(dāng)消息推送到 logs 交換器時(shí),交換器會(huì)根據(jù) routingKey 將消息轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中。

完整的代碼示例如下:

// 創(chuàng)建交換器
channel.ExchangeDeclare("logs", ExchangeType.Direct);

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "direct1");
channel.QueueDeclare(queue: "direct2");

// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs", queue: "direct1", routingKey: "debug");
channel.QueueBind(exchange: "logs", queue: "direct2", routingKey: "info");

// 發(fā)送消息時(shí),需要指定 routingKey
channel.BasicPublish(
exchange: "logs",
routingKey: "debug",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);

啟動(dòng)后,發(fā)現(xiàn)只有 direct1 隊(duì)列可以收到消息,因?yàn)檫@是根據(jù)綁定時(shí)使用的 routingKey=debug 決定的。

Fanout

只要隊(duì)列綁定了交換器,則每個(gè)交換器都會(huì)收到一樣的消息,F(xiàn)anout 會(huì)忽略 routingKey。

如下代碼所示:

// 創(chuàng)建交換器
channel.ExchangeDeclare("logs1", ExchangeType.Fanout);

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "fanout1");
channel.QueueDeclare(queue: "fanout2");

// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs1", queue: "fanout1", routingKey: "debug");
channel.QueueBind(exchange: "logs1", queue: "fanout2", routingKey: "info");

// 發(fā)送消息時(shí),需要指定 routingKey
channel.BasicPublish(
exchange: "logs1",
routingKey: "debug",
basicProperties: null,
body: Encoding.UTF8.GetBytes($"測(cè)試")
);

Topic

Topic 會(huì)根據(jù) routingKey 查找符合條件的隊(duì)列,隊(duì)列可以使用 .#、* 三種符號(hào)進(jìn)行區(qū)配,Topic 的區(qū)配規(guī)則比較靈活,

在創(chuàng)建隊(duì)列之后,綁定交換器時(shí),routingKey 使用表達(dá)式。

// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");

推送消息時(shí),routingKey 需要設(shè)置完整的名稱。

// 發(fā)送消息
channel.BasicPublish(
	exchange: "logs3",
	routingKey: "red.green",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試")
);

首先,routingKey 會(huì)根據(jù) . 符號(hào)進(jìn)行劃分。

比如 red.yellow.green 會(huì)被拆成 [red,yellow,green] 三個(gè)部分。

如果想模糊區(qū)配一個(gè)部分,則可以使用 *。比如 red.*.green ,可以區(qū)配到 red.aaa.green、red.666.green

* 可以在任何一部分使用,比如 *.yellow.*、*.*.green

# 可以區(qū)配多個(gè)部分,比如 red.# 可以區(qū)配到 red.a、red.a.a、red.a.a.a。

完整的代碼示例如下:

// 創(chuàng)建交換器
channel.ExchangeDeclare("logs3", ExchangeType.Topic);

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "topic1");
channel.QueueDeclare(queue: "topic2");

// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "logs3", queue: "topic1", routingKey: "red.#");
channel.QueueBind(exchange: "logs3", queue: "topic2", routingKey: "red.yellow.#");

// 發(fā)送消息
channel.BasicPublish(
	exchange: "logs3",
	routingKey: "red.green",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試")
);

channel.BasicPublish(
	exchange: "logs3",
	routingKey: "red.yellow.green",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試")
);

上面推送了兩條消息到 logs 交換器中,其中 routingKey=red.green 的消息,被 red.# 區(qū)配到,因此會(huì)被轉(zhuǎn)發(fā)到 topic1 隊(duì)列中。

routingKey=red.yellow.green 的消息,可以被兩個(gè)隊(duì)列區(qū)配,因此 topic1 和 topic 2 都可以接收到。

交換器綁定交換器

交換器除了可以綁定隊(duì)列,也可以綁定交換器。

示例:

將 b2 綁定到 b1 中,b2 可以得到 b1 的消息。

channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);

綁定之后,推送到 b1 交換器的消息,會(huì)被轉(zhuǎn)發(fā)到 b2 交換器。

完整示例代碼如下:

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Fanout);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);

// 因?yàn)閮烧叨际?ExchangeType.Fanout,
// 所以 routingKey 使用 string.Empty
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: string.Empty);


// 創(chuàng)建隊(duì)列
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "b1",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}

當(dāng)然,可以將交換器、隊(duì)列同時(shí)綁定到 b1 交換器中。

另外,兩個(gè)交換器的類型可以不同。不過這樣會(huì)導(dǎo)致區(qū)配規(guī)則有點(diǎn)復(fù)雜。

channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);

我們可以理解成在交換器綁定時(shí),b2 相對(duì)于一個(gè)隊(duì)列。當(dāng) b1 設(shè)置成 Direct 交換器時(shí),綁定交換器時(shí)還需要指定 routingKey。

channel.ExchangeBind(destination: "b2", source: "b1", routingKey: "demo");

而 b2 交換器和 q2 隊(duì)列,依然是 Fanout 關(guān)系,不受影響。

意思是說,b1、b2 是一個(gè)關(guān)系,它們的映射關(guān)系不會(huì)影響到別人,也不會(huì)影響到下一層。

完整代碼示例如下:


using RabbitMQ.Client;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "b1", ExchangeType.Direct);
channel.ExchangeDeclare(exchange: "b2", ExchangeType.Fanout);

// 因?yàn)閮烧叨际?ExchangeType.Fanout,
// 所以 routingKey 使用 string.Empty
channel.ExchangeBind(destination: "b2", source: "b1", routingKey: "demo");


// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "q1", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q1", exchange: "b2", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "b1",
	routingKey: "demo",
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}

消費(fèi)者、消息屬性

消費(fèi)者 BasicConsume 函數(shù)定義如下:

BasicConsume(string queue,
            bool autoAck,
            string consumerTag,
            IDictionary<string, object> arguments,
            IBasicConsumer consumer)

不同的消費(fèi)訂閱采用不同消費(fèi)者標(biāo)簽 (consumerTag) 來區(qū)分彼 ,在同一個(gè)通道(IModel)中的消費(fèi)者 需要通過消費(fèi)者標(biāo)簽作區(qū)分,默認(rèn)情況下不需要設(shè)置。

  • queue:隊(duì)列的名稱。
  • autoAck:設(shè)置是否自動(dòng)確認(rèn)。
  • consumerTag: 消費(fèi)者標(biāo)簽,用來區(qū)分多個(gè)消費(fèi)者。
  • arguments:設(shè)置消費(fèi)者的其他參數(shù)。

前面,我們使用了 EventingBasicConsumer 創(chuàng)建 IBasicConsumer 接口的消費(fèi)者程序,其中,EventingBasicConsumer 包含了以下事件:

public event EventHandler<BasicDeliverEventArgs> Received;
public event EventHandler<ConsumerEventArgs> Registered;
public event EventHandler<ShutdownEventArgs> Shutdown;
public event EventHandler<ConsumerEventArgs> Unregistered;

這些事件會(huì)在消息處理的不同階段被觸發(fā)。

消費(fèi)者程序有推、拉兩種消費(fèi)模式,前面所提到的代碼都是推模式,即出現(xiàn)新的消息時(shí),RabbitMQ 會(huì)自動(dòng)推送到消費(fèi)者程序中。

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
	channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue5",
					 autoAck: false,
					 consumer: consumer,
					 consumerTag: "demo");

如果使用拉模式(BasicGet() 函數(shù)),那么在 RabbitMQ Broker 的隊(duì)列中沒有消息時(shí),會(huì)返回 null。

// 開始消費(fèi)
while (true)
{
	var result = channel.BasicGet(queue: "q1", autoAck: false);

	// 如果沒有拉到消息時(shí)
	if (result == null) 
    {
      // 沒有消息時(shí),避免無限拉取
      Thread.Sleep(100);
      continue;   
    }

	Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
	channel.BasicAck(deliveryTag: result.DeliveryTag, multiple: false);
}

當(dāng)使用 BasicGet() 手動(dòng)拉取消息時(shí),該程序不會(huì)作為消費(fèi)者程序存在,也就是 RabbitMQ 的 Consumer 中看不到。

兩種推拉模式之下,ack 消息時(shí),均有一個(gè) multiple 參數(shù)。

  • 如果將 multiple 設(shè)為 false,則只確認(rèn)指定 deliveryTag 的一條消息。
  • 如果將 multiple 設(shè)為 true,則會(huì)確認(rèn)所有比指定 deliveryTag 小的并且未被確認(rèn)的消息。

消息的 deliveryTag 屬性是 ulong 類型,表示消息的偏移量,從 1.... 開始算起。

在大批量接收消息并進(jìn)行處理時(shí),可以使用 multiple 來確認(rèn)一組消息,而不必逐條確認(rèn),這樣可以提高效率。

Qos 、拒絕接收

消費(fèi)者程序可以設(shè)置 Qos。

channel.BasicQos(prefetchSize: 10, prefetchCount: 10, global: false);

prefetchSize:這個(gè)參數(shù)表示消費(fèi)者所能接收未確認(rèn)消息的總體大小的上限,設(shè)置為 0 則表示沒有上限。

prefetchCount: 的方法來設(shè)置消費(fèi)者客戶端最大能接收的未確認(rèn)的消息數(shù)。這個(gè)配置跟滑動(dòng)窗口數(shù)量意思差不多。

global 則有些特殊。

當(dāng) global 為 false 時(shí),只有新的消費(fèi)者需要遵守規(guī)則。

如果是 global 為 true 時(shí),同一個(gè) IConnection 中的消費(fèi)者均會(huì)被修改配置。

// 不受影響
// 	var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);

channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);

// 新的消費(fèi)者受影響
// 	var result = channel.BasicConsume(queue: "q1", autoAck: false,... ...);

當(dāng)收到消息時(shí),如果需要明確拒絕該消息,可以使用 BasicReject,RabbitMQ 會(huì)將該消息從隊(duì)列中移除。

BasicReject() 會(huì)觸發(fā)消息死信。

while (true)
{
	var result = channel.BasicGet(queue: "q1", autoAck: false);
	if (result == null) continue;

	Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));
	channel.BasicReject(deliveryTag: result.DeliveryTag, requeue: true);
}

如果 requeue 參數(shù)設(shè)置為 true ,則 RabbitMQ 會(huì)重新將這條消息存入隊(duì)列,以便可以發(fā)送給下個(gè)訂閱的消費(fèi)者,或者說該程序重啟后可以重新接收。

如果 requeue 參數(shù)設(shè)置為 false ,則 RabbitMQ立即會(huì)把消息從隊(duì)列中移除,而不會(huì)把它發(fā)送給新的消費(fèi)者。

如果想批量拒絕消息。

channel.BasicNack(deliveryTag: result.DeliveryTag, multiple: true, requeue: true);

multiple 為 true 時(shí),則表示拒絕 deliveryTag 編號(hào)之前所有未被當(dāng)前消費(fèi)者確認(rèn)的消息。

BasicRecover() 方法用來從 RabbitMQ 重新獲取還未被確認(rèn)的消息

當(dāng) requeue=true 時(shí),未被確認(rèn)的消息會(huì)被重新加入到隊(duì)列中,對(duì)于同一條消息來說,其會(huì)被分配給給其它消費(fèi)者。

當(dāng) requeue=false,同條消息會(huì)被分配給與之前相同的消費(fèi)者。

channel.BasicRecover(requeue: true);
// 異步
channel.BasicRecoverAsync(requeue: true);

消息確認(rèn)模式

前面提到,當(dāng) autoAck=false 時(shí),消息雖然沒有 ack,但是 RabbitMQ 還是會(huì)跳到下一個(gè)消息。

為了保證消息的順序性,在未將當(dāng)前消息消費(fèi)完成的情況下,不允許自動(dòng)消費(fèi)下一個(gè)消息。

只需要使用 BasicQos 配置即可:

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// 創(chuàng)建交換器
channel.ExchangeDeclare("acktest", ExchangeType.Fanout);

// 創(chuàng)建兩個(gè)隊(duì)列
channel.QueueDeclare(queue: "myqueue5");

// 使用 routingKey 綁定交換器
channel.QueueBind(exchange: "acktest", queue: "myqueue5", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	// 發(fā)送消息
	channel.BasicPublish(
	exchange: "acktest",
	routingKey: string.Empty,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試")
	);
	i++;
}

// 未 ack 之前,不能消費(fèi)下一個(gè)
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");
	// channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

// 開始消費(fèi)
channel.BasicConsume(queue: "myqueue5",
					 autoAck: false,
					 consumer: consumer);

之前這段代碼后,你會(huì)發(fā)現(xiàn),第一條消息未被 ack 時(shí),程序不會(huì)自動(dòng)讀取下一條消息,也不會(huì)重新拉取未被 ack 的消息。

如果我們想重新讀取未被 ack 的消息,可以重新啟動(dòng)程序,或使用 BasicRecover() 讓服務(wù)器重新推送。

消息持久化

前面提到了 BasicPublish 函數(shù)的定義:

BasicPublish(string exchange, 
             string routingKey, 
             bool mandatory = false, 
             IBasicProperties basicProperties = null, 
             ReadOnlyMemory<byte> body = default)

當(dāng)設(shè)置 mandatory = true 時(shí),如果交換器無法根據(jù)自身的類型和路由鍵找到一個(gè)符合條件的隊(duì)列,那么 RabbitMQ 觸發(fā)客戶端的 IModel.BasicReturn 事件, 將消息返回給生產(chǎn)者 。

從設(shè)計(jì)上看,一個(gè) IConnection 雖然可以創(chuàng)建多個(gè) IModel(通道),但是只建議編寫一個(gè)消費(fèi)者程序或生產(chǎn)者程序,不建議混合多用。

因?yàn)楦黝愂录完?duì)列配置,是針對(duì)一個(gè) IModel(通道) 來設(shè)置的。

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();
channel.BasicReturn += (object sender, BasicReturnEventArgs e) =>
{

};

當(dāng)設(shè)置了 mandatory = true 時(shí),如果該消息找不到隊(duì)列存儲(chǔ)消息,那么就會(huì)觸發(fā)客戶端的 BasicReturn 事件接收 BasicPublish 失敗的消息。

完整示例代碼如下:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Runtime;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(exchange: "e2", type: ExchangeType.Fanout, durable: false, autoDelete: false);


channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
	Console.WriteLine($"無效消息:{Encoding.UTF8.GetString(e.Body.Span)}");
};


int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "e2",
	routingKey: string.Empty,

	// mandatory=true,當(dāng)沒有隊(duì)列接收消息時(shí),會(huì)觸發(fā) BasicReturn 事件
	mandatory: true,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}


Console.ReadLine();

在實(shí)際開發(fā)中,當(dāng) mandatory=false 時(shí),如果一條消息推送到交換器,但是卻沒有綁定隊(duì)列,那么該條消息就會(huì)丟失,可能會(huì)導(dǎo)致嚴(yán)重的后果。

而在 RabbitMQ 中,提供了一種被稱為備胎交換器的方案,這是通過在定義交換器時(shí)添加 alternate-exchange 參數(shù)來實(shí)現(xiàn)。其作用是當(dāng) A 交換器無法找到隊(duì)列轉(zhuǎn)發(fā)消息時(shí),就會(huì)將消息轉(zhuǎn)發(fā)到 B 隊(duì)列中。

完整代碼示例如下:

首先創(chuàng)建 e3_bak 隊(duì)列,接著創(chuàng)建 e3 隊(duì)列時(shí)設(shè)置其備胎交換器為 e3_bak。

然后,e3_bak 需要綁定一個(gè)隊(duì)列消費(fèi)消息。

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(
	exchange: "e3_bak",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

// 聲明 e3 交換器,當(dāng) e3 交換器沒有綁定隊(duì)列時(shí),消息將會(huì)被轉(zhuǎn)發(fā)到 e3_bak 交換器
channel.ExchangeDeclare(
	exchange: "e3",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false,
	arguments: new Dictionary<string, object> {
		{ "alternate-exchange", "e3_bak" }
	}
	);

channel.QueueDeclare(queue: "q3", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q3", "e3_bak", routingKey: string.Empty);

// 因?yàn)橐呀?jīng)設(shè)置了 e3 的備用交換器,所以不會(huì)觸發(fā) BasicReturn
channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>
{
	Console.WriteLine($"無效消息:{Encoding.UTF8.GetString(e.Body.Span)}");
};


int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "e3",
	routingKey: string.Empty,
	// 因?yàn)橐呀?jīng)設(shè)置了 e3 的備用交換器,所以開啟這個(gè)不會(huì)觸發(fā) BasicReturn
	mandatory: true,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}

Console.ReadLine();

注意,如果備胎交換器有沒有綁定合適隊(duì)列的話,那么該消息就會(huì)丟失。

如果 e3 是 Direct,e3_bak 也是 Direct,那么需要兩者具有相同的 routingKey,如果 e3 中有個(gè) routingKey = cat,但是 e3_bak 中不存在對(duì)應(yīng)的 routingKey,那么該消息還是會(huì)丟失的。還有其它一些情況,這里不再贅述。

推送消息時(shí),有一個(gè) IBasicProperties basicProperties 屬性,前面的小節(jié)中已經(jīng)介紹過該接口的屬性,當(dāng) IBasicProperties.DeliveryMode=2 時(shí),消息將被標(biāo)記為持久化,即使 RabbitMQ 服務(wù)器重啟,消息也不會(huì)丟失。

相對(duì)來說,通過前面的實(shí)驗(yàn),你可以觀察到客戶端把隊(duì)列的消息都消費(fèi)完畢后,隊(duì)列中的消息都會(huì)消失。而對(duì)應(yīng) Kafka 來說,一個(gè) topic 中的消息被消費(fèi),其依然會(huì)被保留。這一點(diǎn)要注意,使用 RabbitMQ 時(shí),需要提前設(shè)置好隊(duì)列消息的持久化,避免消費(fèi)或未成功消費(fèi)時(shí),消息丟失。

生產(chǎn)者在推送消息時(shí),可以使用 IBasicProperties.DeliveryMode=2 將該消息設(shè)置為持久化。

	var ps = channel.CreateBasicProperties();
	ps.DeliveryMode = 2;

	channel.BasicPublish(
	exchange: "e3",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: ps,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);

消息 TTL 時(shí)間

設(shè)置消息 TTL 時(shí)間后,該消息如果在一定時(shí)間內(nèi)沒有被消費(fèi),那么該消息就成為了死信消息。對(duì)于這種消息,會(huì)有大概這么兩個(gè)處理情況。

第一種,如果隊(duì)列設(shè)置了 "x-dead-letter-exchange" ,那么該消息會(huì)被從隊(duì)列轉(zhuǎn)發(fā)到另一個(gè)交換器中。這種方法在死信交換器一節(jié)中會(huì)介紹。

第二種,消息被丟棄。

目前有兩種方法可以設(shè)置消息的 TTL 。

第一種方法是通過隊(duì)列屬性設(shè)置,這樣一來隊(duì)列中所有消息都有相同的過期時(shí)間。

第二種方法是對(duì)單條消息進(jìn)行單獨(dú)設(shè)置,每條消息的 TTL 可以不同。

如果兩種設(shè)置一起使用,則消息的 TTL 以兩者之間較小的那個(gè)數(shù)值為準(zhǔn)。消息在隊(duì)列中的生存時(shí)一旦超過設(shè)置 TTL 值時(shí),消費(fèi)者將無法再收到該消息,所以最好設(shè)置死信交換器。

第一種,對(duì)隊(duì)列設(shè)置:

channel.QueueDeclare(queue: "q4",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: new Dictionary<string, object>() { { "x-message-ttl", 6000 } });

第二種通過設(shè)置屬性配置消息過期時(shí)間。

var ps = channel.CreateBasicProperties();
// 單位毫秒
ps.Expiration = "6000";

對(duì)于第一種設(shè)置隊(duì)列屬性的方法,一旦消息過期就會(huì)從隊(duì)列中抹去(如果設(shè)置了死信交換器,會(huì)被轉(zhuǎn)發(fā)到死信交換器中)。而在第二種方法中,即使消息過期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)樵摋l消息在即將投遞到消費(fèi)者之前,才會(huì)檢查消息是否過期。對(duì)于第二種情況,當(dāng)隊(duì)列進(jìn)行任何一次輪詢操作時(shí),才會(huì)被真正移除。

對(duì)于第二種情況,雖然是在被輪詢時(shí),過期了才會(huì)被真正移除,但是一旦過期,就會(huì)被轉(zhuǎn)發(fā)到死信隊(duì)列中,只是不會(huì)立即移除。

隊(duì)列 TTL 時(shí)間

當(dāng)對(duì)一個(gè)隊(duì)列設(shè)置 TTL 時(shí),如果該隊(duì)列在規(guī)定時(shí)間內(nèi)沒被使用,那么該隊(duì)列就會(huì)被刪除。這個(gè)約束包括一段時(shí)間內(nèi)沒有被消費(fèi)消息(包括 BasicGet() 方式消費(fèi)的)、沒有被重新聲明、沒有消費(fèi)者連接,否則被刪除的倒計(jì)時(shí)間會(huì)被重置。

channel.QueueDeclare(queue: "q6",
	durable: false,
	exclusive: false,
	autoDelete: false,
	arguments: new Dictionary<string, object>
	{
		// 單位是毫秒,設(shè)置 隊(duì)列過期時(shí)間是 1 小時(shí)
		{"x-expires",1*3600*1000}
	});

DLX 死信交換器

DLX(Dead-Letter-Exchange) 死信交換器,消息在一個(gè)隊(duì)列 A 中變成死信之后,它能被重新被發(fā)送到另一個(gè) B 交換器中。其中 A 隊(duì)列綁定了死信交換器,那么在Management UI 界面會(huì)看到 DLX 標(biāo)識(shí),而 B 交換器就是一個(gè)普通的交換器,無需配置。

消息變成死信 般是由于以下幾種情況:

  • 消息被消費(fèi)者拒絕,BasicReject()BasicNack() 兩個(gè)函數(shù)可以拒絕消息。
  • 消息過期。
  • 隊(duì)列達(dá)到最大長(zhǎng)度。

當(dāng)這個(gè)隊(duì)列 A 中存在死信消息時(shí),RabbitMQ 就會(huì)自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的交換器 B 中。一般會(huì)專門給重要的隊(duì)列設(shè)置死信交換器 B,而交換器 B 也需要綁定一個(gè)隊(duì)列 C 才行,不然消息也會(huì)丟失。

設(shè)置隊(duì)列出現(xiàn)死信消息時(shí),將消息轉(zhuǎn)發(fā)到哪個(gè)交換器中:

channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
		arguments: new Dictionary<string, object> {
		{ "x-dead-letter-exchange", "e7_bak" } });

完整示例代碼如下所示:


using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(
	exchange: "e7_bak",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.QueueDeclare(queue: "q7_bak", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q7_bak", "e7_bak", routingKey: string.Empty);

channel.ExchangeDeclare(
	exchange: "e7",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.QueueDeclare(queue: "q7", durable: false, exclusive: false, autoDelete: false,
		arguments: new Dictionary<string, object> {
		{ "x-dead-letter-exchange", "e7_bak" } });

channel.QueueBind(queue: "q7", "e7", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	channel.BasicPublish(
	exchange: "e7",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: null,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}"));
	i++;
}

Thread.Sleep(1000);

int y = 0;
// 定義消費(fèi)者
channel.BasicQos(0, prefetchCount: 1, true);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] Received {message}");

	if (y % 2 == 0)
		channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

	// requeue 要設(shè)置為 false 才行,
	// 否則此消息被拒絕后還會(huì)被放回隊(duì)列。
	else
		channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);
	Interlocked.Add(ref y, 1);
};

// 開始消費(fèi)
channel.BasicConsume(queue: "q7",
					 autoAck: false,
					 consumer: consumer);

Console.ReadLine();

延遲隊(duì)列

RabbitMQ 本身沒有直接支持延遲隊(duì)列的功能。

那么為什么會(huì)出現(xiàn)延遲隊(duì)列這種東西呢?

主要是因?yàn)橄⑼扑秃螅幌肓⒓幢幌M(fèi)。比如說,用戶下單后,如果 10 分鐘內(nèi)沒有支付,那么該訂單會(huì)被自動(dòng)取消。所以需要做一個(gè)消息被延遲消費(fèi)的功能。

所以說,實(shí)際需求是,該消息在一定時(shí)間之后才能被消費(fèi)者消費(fèi)。

在 RabbitMQ 中做這個(gè)功能,需要使用兩個(gè)交換器,以及至少兩個(gè)隊(duì)列。

思路是定義兩個(gè)交換器 e8、e9 和兩個(gè)隊(duì)列 q8、q9,交換器 e8 和隊(duì)列 q8 綁定、交換器 e9 和 q9 綁定。

最重要的一點(diǎn)來了,q9 設(shè)置了死信隊(duì)列,當(dāng)消息 TTL 時(shí)間到時(shí),轉(zhuǎn)發(fā)到 e9 交換器中。所以,e9 交換器 - q9 隊(duì)列 接收到的都是到期(或者說過期)的消息。

在發(fā)送消息到 e8 交換器時(shí),設(shè)置 TTL 時(shí)間。當(dāng) q8 隊(duì)列中的消息過期時(shí),消息會(huì)被轉(zhuǎn)發(fā)到 e9 交換器,然后存入 q9 隊(duì)列。

消費(fèi)者只需要訂閱 q9 隊(duì)列,即可消費(fèi)到期后的消息。

全部完整代碼示例如下:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

channel.ExchangeDeclare(
	exchange: "e8",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.ExchangeDeclare(
	exchange: "e9",
	type: ExchangeType.Fanout,
	durable: false,
	autoDelete: false
	);

channel.QueueDeclare(queue: "q9", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(queue: "q9", "e9", routingKey: string.Empty);

channel.QueueDeclare(queue: "q8", durable: false, exclusive: false, autoDelete: false,
		arguments: new Dictionary<string, object> {
		{ "x-dead-letter-exchange", "e9" } });

channel.QueueBind(queue: "q8", "e8", routingKey: string.Empty);

int i = 0;
while (i < 10)
{
	var ps = channel.CreateBasicProperties();
	ps.Expiration = "6000";

	channel.BasicPublish(
	exchange: "e8",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: ps,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);
	i++;
}


var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
	var message = Encoding.UTF8.GetString(ea.Body.Span);
	Console.WriteLine($" [x] 已到期消息 {message}");
};

// 開始消費(fèi)
channel.BasicConsume(queue: "q9",
					 autoAck: true,
					 consumer: consumer);

Console.ReadLine();

消息優(yōu)先級(jí)

消息優(yōu)先級(jí)越高,就會(huì)越快被消費(fèi)者消費(fèi)。

代碼示例如下:

var ps = channel.CreateBasicProperties();
// 優(yōu)先級(jí) 0-9 
ps.Priority = 9;

	channel.BasicPublish(
	exchange: "e8",
	routingKey: string.Empty,
	mandatory: false,
	basicProperties: ps,
	body: Encoding.UTF8.GetBytes($"測(cè)試{i}")
	);

所以說,RabbitMQ 不一定可以保證消息的順序性,這一點(diǎn)跟 Kafka 是有區(qū)別的。

事務(wù)機(jī)制

事務(wù)機(jī)制是,發(fā)布者確定消息一定推送到 RabbitMQ Broker 中,往往會(huì)跟業(yè)務(wù)代碼一起使用。

比如說,用戶成功支付之后,推送一個(gè)通知到 RabbitMQ 隊(duì)列中。

數(shù)據(jù)庫(kù)當(dāng)然要做事務(wù),這樣在支付失敗后修改的數(shù)據(jù)會(huì)被回滾。但是問題來了,如果消息已經(jīng)推送了,但是數(shù)據(jù)庫(kù)卻回滾了。

這個(gè)時(shí)候會(huì)涉及到一致性,可以使用 RabbitMQ 的事務(wù)機(jī)制來處理,其思路跟數(shù)據(jù)庫(kù)事務(wù)過程差不多,也是有提交和回滾操作。

其目的是確保消息成功推送到 RabbitMQ Broker 以及跟客戶端其它代碼保持?jǐn)?shù)據(jù)一致,推送消息跟代碼操作同時(shí)成功或同時(shí)回滾。

其完整的代碼示例如下:

ConnectionFactory factory = new ConnectionFactory
{
	HostName = "localhost"
};

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// 客戶端發(fā)送 Tx.Select.將信道置為事務(wù)模式;
channel.TxSelect();

try
{
	// 發(fā)送消息
	channel.QueueDeclare(queue: "transaction_queue",
						 durable: false,
						 exclusive: false,
						 autoDelete: false,
						 arguments: null);

	string message = "Hello, RabbitMQ!";
	var body = Encoding.UTF8.GetBytes(message);

	channel.BasicPublish(exchange: "",
						 routingKey: "transaction_queue",
						 basicProperties: null,
						 body: body);


	// 執(zhí)行一系列操作

	// 提交事務(wù)
	channel.TxCommit();
	Console.WriteLine(" [x] Sent '{0}'", message);
}
catch (Exception e)
{
	// 回滾事務(wù)
	channel.TxRollback();
	Console.WriteLine("An error occurred: " + e.Message);
}

Console.ReadLine();

發(fā)送方確認(rèn)機(jī)制

發(fā)送方確認(rèn)機(jī)制,是保證消息一定推送到 RabbitMQ 的方案。

而事務(wù)機(jī)制,一般是為了保證一致性,推送消息和其它操作同時(shí)成功或同時(shí)失敗,不能出現(xiàn)兩者不一致的情況。

其完整代碼示例如下:

using IConnection connection = factory.CreateConnection();
using IModel channel = connection.CreateModel();

// 開啟發(fā)送方確認(rèn)模式
channel.ConfirmSelect();

string exchangeName = "exchange_name";
string routingKey = "routing_key";

// 定義交換器
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);

// 發(fā)送消息
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);

// 發(fā)布消息
channel.BasicPublish(exchange: exchangeName,
					 routingKey: routingKey,
					 basicProperties: null,
					 body: body);

// 等待確認(rèn)已推送到 RabbitMQ
if (channel.WaitForConfirms())
{
	Console.WriteLine(" [x] Sent '{0}'", message);
}
else
{
	Console.WriteLine("Message delivery failed.");
}

Console.ReadLine();

文章寫到這里,恰好一萬詞。

對(duì)于 RabbitMQ 集群、運(yùn)維等技術(shù),本文不再贅述。

總結(jié)

以上是生活随笔為你收集整理的万字长文:从 C# 入门学会 RabbitMQ 消息队列编程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。