RabbitMQ入门教程——发布/订阅
什么是發(fā)布訂閱
發(fā)布訂閱是一種設(shè)計(jì)模式定義了一對(duì)多的依賴關(guān)系,讓多個(gè)訂閱者對(duì)象同時(shí)監(jiān)聽某一個(gè)主題對(duì)象。這個(gè)主題對(duì)象在自身狀態(tài)變化時(shí),會(huì)通知所有的訂閱者對(duì)象,使他們能夠自動(dòng)更新自己的狀態(tài)。
為了描述這種模式,我們將會(huì)構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它包括兩個(gè)程序——第一個(gè)程序負(fù)責(zé)發(fā)送日志消息,第二個(gè)程序負(fù)責(zé)獲取消息并輸出內(nèi)容。在我們的這個(gè)日志系統(tǒng)中,所有正在運(yùn)行的接收方程序都會(huì)接受消息。我們用其中一個(gè)接收者(receiver)把日志寫入硬盤中,另外一個(gè)接受者(receiver)把日志輸出到屏幕上。最終,日志消息被廣播給所有的接受者(receivers)。
Exchanges
RabbitMQ消息模型的核心理念是生產(chǎn)者永遠(yuǎn)不會(huì)直接發(fā)送任何消息給隊(duì)列,生產(chǎn)者只能發(fā)送消息給到exchange,exchange比較簡(jiǎn)單,一邊從生產(chǎn)者就收消息,一邊把消息推送到隊(duì)列中。exchange必須清楚的知道消息應(yīng)該按照什么規(guī)則路由到對(duì)應(yīng)的隊(duì)列中,而具體使用那種路由算法是由exchange type決定的。AMQP協(xié)議提供了四種交換機(jī)類型:
Name(交換機(jī)類型) | Default pre-declared names(預(yù)聲明的默認(rèn)名稱) |
Direct exchange(直連交換機(jī)) | (Empty string) and amq.direct |
Fanout exchange(扇型交換機(jī)) | amq.fanout |
Topic exchange(主題交換機(jī)) | amq.topic |
Headers exchange(頭交換機(jī)) | amq.match (and amq.headers in RabbitMQ) |
除交換機(jī)類型外,在聲明交換機(jī)時(shí)還可以附帶許多其他的屬性,其中最重要的幾個(gè)分別是:
- Name
- Durability (消息代理重啟后,交換機(jī)是否還存在)
- Auto-delete (當(dāng)所有與之綁定的消息隊(duì)列都完成了對(duì)此交換機(jī)的使用后,刪掉它)
- Arguments(依賴代理本身)
交換機(jī)可以有兩個(gè)狀態(tài):持久(durable)、暫存(transient)。持久化的交換機(jī)會(huì)在消息代理(broker)重啟后依舊存在,而暫存的交換機(jī)則不會(huì)(它們需要在代理再次上線后重新被聲明)。然而并不是所有的應(yīng)用場(chǎng)景都需要持久化的交換機(jī)。
本文中具體講解下以下兩種交換機(jī):直連交換機(jī)(前面幾個(gè)例子中使用的交換機(jī)類型),扇形交換機(jī)(本文中要使用的交換機(jī)類型)
直連交換機(jī)
直連交換機(jī)(direct exchange)可以使用消息攜帶的路由鍵(routing key)將消息投遞給對(duì)應(yīng)的隊(duì)列中。用來(lái)處理消息的單播路由(unicast routing),也可以處理多播路由。
那么它具體是如何工作的呢
- 將一個(gè)隊(duì)列綁定到某個(gè)交換機(jī)上,同時(shí)給該綁定指定一個(gè)路由鍵(routing key)
- 當(dāng)一個(gè)攜帶路由鍵為R的消息被發(fā)送到直連交換機(jī)時(shí),交換機(jī)會(huì)把它路由給綁定值同樣為R的隊(duì)列。
直連交換機(jī)經(jīng)常用來(lái)循環(huán)分發(fā)任務(wù)給多個(gè)工作者,當(dāng)這樣做時(shí),一定要明白,這時(shí)消息的負(fù)載均衡是發(fā)生在消費(fèi)者(consumer)之間的,而不是隊(duì)列(queue)中。
直連交換機(jī)圖例:
扇形交換機(jī)
扇形交換機(jī)(funout exchange)將消息路由給綁定到它身上的所有隊(duì)列,不關(guān)心所綁定的路由鍵(routing key)。扇形交換機(jī)用來(lái)處理消息的廣播路由(broadcast routing)。
由于扇形交換機(jī)投遞消息到所有綁定他的隊(duì)列,以下幾個(gè)場(chǎng)景比較適合使用扇形交換機(jī):
- 大規(guī)模多用戶在線(MMO)游戲可以使用它來(lái)處理排行榜更新等全局事件
- 體育新聞網(wǎng)站可以用它來(lái)近乎實(shí)時(shí)地將比分更新分發(fā)給移動(dòng)客戶端
- 分發(fā)系統(tǒng)使用它來(lái)廣播各種狀態(tài)和配置更新
- 在群聊的時(shí)候,它被用來(lái)分發(fā)消息給參與群聊的用戶。(AMQP沒有內(nèi)置presence的概念,因此XMPP可能會(huì)是個(gè)更好的選擇)
扇形交換機(jī)圖例
創(chuàng)建exchange
?
??????????????????? channel.ExchangeDeclare(exchange: "log_exchange", //exchange 名稱
??????????????????????? type: ExchangeType.Fanout, //exchange 類型
??????????????????????? durable: false,
??????????????????????? autoDelete: false,
??????????????????????? arguments: null);
?
臨時(shí)隊(duì)列
之前的幾個(gè)示例中我們?cè)跒槊恳粋€(gè)聲名的隊(duì)列都指定了一個(gè)名字,因?yàn)槲覀兿M鹀onsumer指向正確的隊(duì)列。當(dāng)我們希望在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列時(shí),為隊(duì)列命名就非常的重要了。
不過(guò)我們要實(shí)現(xiàn)的日志系統(tǒng)只是想要得到所有的消息,而且只對(duì)當(dāng)前正在傳遞的消息感興趣,并不關(guān)心隊(duì)列的名稱,所以為了滿足我們的需求,要做兩件事情:
無(wú)論什么時(shí)間連接到RabbitMQ我們都需要一個(gè)新的空的隊(duì)列。為了達(dá)到目的我們可以使用隨機(jī)數(shù)創(chuàng)建隊(duì)列,或讓服務(wù)器給我們提供一個(gè)隨機(jī)的名稱。
一旦消費(fèi)者與RabbitMQ斷開,消費(fèi)者所接受的隊(duì)列都應(yīng)該被自動(dòng)刪除。
創(chuàng)建臨時(shí)隊(duì)列
?
??????????????????? //創(chuàng)建一個(gè)未命名的新的消息隊(duì)列,
??????????????????? QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊(duì)列名稱,為空時(shí)有系統(tǒng)自動(dòng)分配
??????????????????????? durable: false,
??????????????????????? exclusive: false,
??????????????????????? autoDelete: true,//自動(dòng)刪除,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列。
??????????????????????? arguments: null);
??????????????????? //或
??????????????????? //queue = channel.QueueDeclare();
?
綁定
我們已經(jīng)創(chuàng)建了一個(gè)扇型交換機(jī)(fanout)和一個(gè)隊(duì)列。現(xiàn)在我們需要告訴交換機(jī)如何發(fā)送消息給我們的隊(duì)列。交換器和隊(duì)列之間的聯(lián)系我們稱之為綁定(binding)
創(chuàng)建交換機(jī)與隊(duì)列的關(guān)系
?
//扇形交換機(jī)(funout exchange)將消息路由給綁定到它身上的所有隊(duì)列,不關(guān)心所綁定的路由鍵(routing key)
??????????????????? //fanout exchange不需要指定routing key 指定了也沒用
??????????????????? //通過(guò)綁定告訴exchange 需要發(fā)送消息到哪些消息隊(duì)列
??????????????????? channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
?
完整代碼:
生產(chǎn)者??Pub_SubProducer.cs
?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
?
namespace RabbitMQProducer
{
??? public class Pub_SubProducer
??? {
??????? const string EXCHANGE_NAME = "log_exchange";
??????? const string ROUTING_KEY = "";
?
??????? //直接發(fā)送消息到交換機(jī)
??????? public static void Publish()
??????? {
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (IModel channel = connection.CreateModel())
??????????????? {
??????????????????? channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱
??????????????????????? type: ExchangeType.Fanout, //exchange 類型
??????????????????????? durable: false,
??????????????????????? autoDelete: false,
??????????????????????? arguments: null);
?
??????????????????? Parallel.For(1, 100, item =>
??????????????????? {
??????????????????????? string message = $"日志內(nèi)容{DateTime.Now.ToString()}";
??????????????????????? channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
??????????????????????? Console.WriteLine(message);
??????????????????? });
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
??? }
}
?
消費(fèi)者?Pub_SubConsumer.cs
?
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using System.IO;
?
namespace RabbitMQConsumer
{
??? public class Pub_SubConsumer
??? {
??????? const string EXCHANGE_NAME = "log_exchange";
??????? const string ROUTING_KEY = "";
??????? //輸出到屏幕
??????? public static void Subscribe()
??????? {
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (IModel channel = connection.CreateModel())
??????????????? {
??????????????????? channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱
??????????????????????? type: ExchangeType.Fanout, //exchange 類型
??????????????????????? durable: false,
??????????????????????? autoDelete: false,
??????????????????????? arguments: null);
?
??????????????????? //創(chuàng)建一個(gè)未命名的新的消息隊(duì)列,
??????????????????? QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊(duì)列名稱,為空時(shí)有系統(tǒng)自動(dòng)分配
??????????????????????? durable: false,
??????????????????????? exclusive: false,
??????????????????????? autoDelete: true,//自動(dòng)刪除,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列。
??????????????????????? arguments: null);
??????????????????? //或
??????????????????? //queue = channel.QueueDeclare();
?
??????????????????? string queueName = queue.QueueName;
??????????????????? //扇形交換機(jī)(funout exchange)將消息路由給綁定到它身上的所有隊(duì)列,不關(guān)心所綁定的路由鍵(routing key)
??????????????????? //fanout exchange不需要指定routing key 指定了也沒用
??????????????????? //通過(guò)綁定告訴exchange 需要發(fā)送消息到哪些消息隊(duì)列
??????????????????? channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
?
??????????????????? EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
??????????????????? consumer.Received += (sender, args) =>
??????????????????? {
??????????????????????? string message = Encoding.UTF8.GetString(args.Body);
??????????????????????? Console.WriteLine(message);
??????????????????? };
?
??????????????????? channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
?
??????? /// <summary>
??????? /// 輸出到文件
??????? /// </summary>
??????? public static void SubscribeFile()
??????? {
??????????? var factory = new ConnectionFactory()
??????????? {
??????????????? HostName = "127.0.0.1"
??????????? };
??????????? using (var connection = factory.CreateConnection())
??????????? {
??????????????? using (IModel channel = connection.CreateModel())
??????????????? {
??????????????????? channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名稱
??????????????????????? type: ExchangeType.Fanout, //exchange 類型
??????????????????????? durable: false,
??????????????????????? autoDelete: false,
??????????????????????? arguments: null);
?
??????????????????? //創(chuàng)建一個(gè)未命名的新的消息隊(duì)列,
??????????????????? QueueDeclareOk queue = channel.QueueDeclare(queue: "", //隊(duì)列名稱,為空時(shí)有系統(tǒng)自動(dòng)分配
??????????????????????? durable: false,
??????????????????????? exclusive: false,
??????????????????????? autoDelete: true,//自動(dòng)刪除,如果該隊(duì)列沒有任何訂閱的消費(fèi)者的話,該隊(duì)列會(huì)被自動(dòng)刪除。這種隊(duì)列適用于臨時(shí)隊(duì)列。
??????????????????????? arguments: null);
??????????????????? //或
??????????????????? //queue = channel.QueueDeclare();
?
??????????????????? string queueName = queue.QueueName;
??????????????????? //扇形交換機(jī)(funout exchange)將消息路由給綁定到它身上的所有隊(duì)列,不關(guān)心所綁定的路由鍵(routing key)
??????????????????? //fanout exchange不需要指定routing key 指定了也沒用
??????????????????? //通過(guò)綁定告訴exchange 需要發(fā)送消息到哪些消息隊(duì)列
??????????????????? channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
?
??????????????????? EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
??????????????????? consumer.Received += (sender, args) =>
??????????????????? {
??????????????????????? string message = Encoding.UTF8.GetString(args.Body);
?
??????????????????????? //寫入日志到txt文件
??????????????????????? using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))
??????????????????????? {
??????????????????????????? writer.WriteLine(message);
??????????????????????????? writer.Close();
??????????????????????? }
?
??????????????????????? Console.WriteLine(message);
??????????????????? };
?
??????????????????? channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
?
??????????????????? Console.WriteLine(" Press [enter] to exit.");
??????????????????? Console.ReadLine();
??????????????? }
??????????? }
??????? }
??? }
}
?
運(yùn)行以上實(shí)例代碼發(fā)現(xiàn),每個(gè)訂閱者實(shí)例 都能得到相同的內(nèi)容。
轉(zhuǎn)載于:https://www.cnblogs.com/AlvinLee/p/6150680.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的RabbitMQ入门教程——发布/订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java重载遇到泛型
- 下一篇: 用户(三次)登录--作业小编完成