RabbitMQ系列教程之四:路由(Routing)
在上一個(gè)教程中,我們構(gòu)建了一個(gè)簡(jiǎn)單的日志系統(tǒng),我們能夠向許多消息接受者廣播發(fā)送日志消息。
在本教程中,我們將為其添加一項(xiàng)功能 ,這個(gè)功能是我們將只訂閱消息的一個(gè)子集成為可能。?例如,我們可以只將關(guān)鍵的錯(cuò)誤消息輸出到日志文件(以節(jié)省磁盤空間),同時(shí)仍然可以在控制臺(tái)上打印所有日志消息。
1、綁定
在以前的例子中,我們已經(jīng)創(chuàng)建了綁定。 你可能會(huì)記得如下代碼:
【綁定】是【消息交換機(jī)】和【隊(duì)列】之間的關(guān)系紐帶,通過綁定把二者關(guān)聯(lián)起來。 這可以簡(jiǎn)單地理解為:隊(duì)列可以接收來自此【消息交換機(jī)】的消息。
【綁定】可以占用額外的路由選擇參數(shù)。 為了避免與BasicPublish參數(shù)混淆,我們將其稱為【綁定鍵】。 以下代碼就是如何用一個(gè)鍵值來創(chuàng)建一個(gè)綁定:
【綁定鍵】的含義取決于交換類型。 以前我們使用的【Fanout】類型的【消息交換機(jī)】忽略了它的取值。
2、直接交換
?? 在上一個(gè)教程中,我們的日志記錄系統(tǒng)向所有【消費(fèi)者】發(fā)送所有消息。 我們希望將其擴(kuò)展為允許基于其嚴(yán)重性過濾消息。 例如,我們可能希望將寫入磁盤日志消息的腳本僅接受嚴(yán)重錯(cuò)誤,而不會(huì)在警告或信息日志消息上浪費(fèi)磁盤空間。
?? 我們正在使用一個(gè)【Fanout】類型的【消息交換機(jī)】,它不會(huì)給我們帶來很大的靈活性 - 它只能無意識(shí)地發(fā)送。
?? 我們將使用一個(gè)【Direct】類型的【消息交換機(jī)】。 直接轉(zhuǎn)換路由的背后的算法其實(shí)是很簡(jiǎn)單的 - 把消息傳遞到【綁定鍵?binding key】和消息的【路由鍵?routing key】完全匹配的隊(duì)列中。
?? 為了說明,請(qǐng)考慮以下設(shè)置:
? ??
?? 在這個(gè)設(shè)置中,我們可以看到【Direct】類型的【消息交換機(jī)】X與兩個(gè)隊(duì)列相綁定。 第一個(gè)隊(duì)列與【綁定鍵】的值是Orange相綁定的,第二個(gè)隊(duì)列有兩個(gè)綁定,一個(gè)【綁定鍵】的值是black,另一個(gè)【綁定鍵】的值是green。
?? 在這樣的設(shè)置中,發(fā)布到具有【路由鍵】為orange的【消息交換機(jī)】的消息將被路由到隊(duì)列Q1。 具有black或green【路由鍵】的消息將轉(zhuǎn)到Q2。 所有其他消息將被丟棄。
3、多重綁定
? ? ? ?
? 使用相同的【綁定鍵】綁定多個(gè)隊(duì)列是完全合法的。 在我們的示例中,我們可以在X和Q1之間添加【綁定鍵】是black的綁定。 在這種情況下,【direct】類型的【消息交換機(jī)】將表現(xiàn)得像【Fanout】類型的【消息交換機(jī)】,并將消息發(fā)送到所有匹配的隊(duì)列。 具有【路由鍵】是black的消息將傳送到Q1和Q2。
4、發(fā)出日志
?? 我們將發(fā)送消息到【Direct】類型的【消息交換機(jī)】來替換【fanout】類型的【消息交換機(jī)】,在我們現(xiàn)在的日志系統(tǒng)將使用此模型。 我們將提供日志嚴(yán)重性作為【路由鍵】。 這樣接收腳本就能夠選擇想要接收的嚴(yán)重性。 我們首先關(guān)注發(fā)出日志。
?? 像以前一樣,我們首先要建立一個(gè)【消息交換機(jī)】:
??? 現(xiàn)在,我們準(zhǔn)備發(fā)送消息:
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);
??? 為了簡(jiǎn)化事情,我們假設(shè)“嚴(yán)重性”可以是“信息”,“警告”,“錯(cuò)誤”之一。
5、訂閱
?? 接收消息將像上一個(gè)教程一樣工作,除了一個(gè)例外 - 我們將為每個(gè)我們感興趣的嚴(yán)重性創(chuàng)建一個(gè)新的綁定。
6、整合
?????
? 以下是EmitLogDirect.cs類的代碼:
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogDirect
{
? ? public static void Main(string[] args)
? ? {
? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };
? ? ? ? using(var connection = factory.CreateConnection())
? ? ? ? using(var channel = connection.CreateModel())
? ? ? ? {
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "direct_logs",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? type: "direct");
? ? ? ? ? ? var severity = (args.Length > 0) ? args[0] : "info";
? ? ? ? ? ? var message = (args.Length > 1)
? ? ? ? ? ? ? ? ? ? ? ? ? ? string.Join(" ", args.Skip( 1 ).ToArray())
? ? ? ? ? ? ? ? ? ? ? ? ? : "Hello World!";
? ? ? ? ? ? var body = Encoding.UTF8.GetBytes(message);
? ? ? ? ? ? channel.BasicPublish(exchange: "direct_logs",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?routingKey: severity,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?basicProperties: null,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?body: body);
? ? ? ? ? ? Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
? ? ? ? }
? ? ? ? Console.WriteLine(" Press [enter] to exit.");
? ? ? ? Console.ReadLine();
? ? }
}
以下是ReceiveLogsDirect.cs類的代碼:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogsDirect
{
? ? public static void Main(string[] args)
? ? {
? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };
? ? ? ? using(var connection = factory.CreateConnection())
? ? ? ? using(var channel = connection.CreateModel())
? ? ? ? {
? ? ? ? ? ? channel.ExchangeDeclare(exchange: "direct_logs",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? type: "direct");
? ? ? ? ? ? var queueName = channel.QueueDeclare().QueueName;
? ? ? ? ? ? if(args.Length < 1)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Environment.GetCommandLineArgs()[0]);
? ? ? ? ? ? ? ? Console.WriteLine(" Press [enter] to exit.");
? ? ? ? ? ? ? ? Console.ReadLine();
? ? ? ? ? ? ? ? Environment.ExitCode = 1;
? ? ? ? ? ? ? ? return;
? ? ? ? ? ? }
? ? ? ? ? ? foreach(var severity in args)
? ? ? ? ? ? {
? ? ? ? ? ? ? ? channel.QueueBind(queue: queueName,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? exchange: "direct_logs",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? routingKey: severity);
? ? ? ? ? ? }
? ? ? ? ? ? Console.WriteLine(" [*] Waiting for messages.");
? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);
? ? ? ? ? ? consumer.Received += (model, ea) =>
? ? ? ? ? ? {
? ? ? ? ? ? ? ? var body = ea.Body;
? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(body);
? ? ? ? ? ? ? ? var routingKey = ea.RoutingKey;
? ? ? ? ? ? ? ? Console.WriteLine(" [x] Received '{0}':'{1}'",
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? routingKey, message);
? ? ? ? ? ? };
? ? ? ? ? ? channel.BasicConsume(queue: queueName,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?noAck: true,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer: consumer);
? ? ? ? ? ? Console.WriteLine(" Press [enter] to exit.");
? ? ? ? ? ? Console.ReadLine();
? ? ? ? }
? ? }
}
如果您只想將“警告”和“錯(cuò)誤”(而不是“信息”)保存到文件中,只需打開控制臺(tái)并鍵入:
如果您想查看屏幕上的所有日志消息,請(qǐng)打開一個(gè)新終端,然后執(zhí)行以下操作:
而且,例如,要發(fā)出錯(cuò)誤日志消息,只需鍵入:
以下是原文地址:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
今天這篇文章終于翻譯完了,整個(gè)系列還有幾篇沒翻譯。英文水平有限,錯(cuò)誤在所難免,歡迎大家提出來,共同學(xué)習(xí)。
相關(guān)文章:
RabbitMQ系列教程之一:我們從最簡(jiǎn)單的事情開始!Hello World
RabbitMQ系列教程之二:工作隊(duì)列(Work Queues)
RabbitMQ系列教程之三:發(fā)布/訂閱(Publish/Subscribe)
如何優(yōu)雅的使用RabbitMQ
.NET 使用 RabbitMQ 圖文簡(jiǎn)介
RabbitMQ 高可用集群搭建及電商平臺(tái)使用經(jīng)驗(yàn)總結(jié)
搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驅(qū)動(dòng)連接
RabbitMQ消息隊(duì)列應(yīng)用
體驗(yàn)Rabbitmq強(qiáng)大的【優(yōu)先級(jí)隊(duì)列】之輕松面對(duì)現(xiàn)實(shí)業(yè)務(wù)場(chǎng)景
原文地址:http://www.cnblogs.com/PatrickLiu/p/7095080.html
.NET社區(qū)新聞,深度好文,微信中搜索dotNET跨平臺(tái)或掃描二維碼關(guān)注
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ系列教程之四:路由(Routing)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 实现自己的.NET Core配置Prov
- 下一篇: DDD理论学习系列(3)-- 限界上下文