[译]RabbitMQ教程C#版 - 发布订阅
先決條件
本教程假定RabbitMQ已經(jīng)安裝,并運(yùn)行在localhost標(biāo)準(zhǔn)端口(5672)。如果你使用不同的主機(jī)、端口或證書,則需要調(diào)整連接設(shè)置。
從哪里獲得幫助
如果您在閱讀本教程時(shí)遇到困難,可以通過郵件列表聯(lián)系我們。
1.發(fā)布/訂閱
(使用.NET客戶端)
在教程[2]中,我們創(chuàng)建了一個(gè)工作隊(duì)列,假設(shè)在工作隊(duì)列中的每一個(gè)任務(wù)都只被分發(fā)給一個(gè)Worker。那么在這一章節(jié),我們要做與之完全不同的事,那就是我們將要把一條消息分發(fā)給多個(gè)消費(fèi)者。這種模式被稱為“發(fā)布/訂閱”。
為了說明、體現(xiàn)這種模式,我們將會(huì)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它將會(huì)包含兩個(gè)程序 - 第一個(gè)用來發(fā)送日志消息,第二個(gè)用來接收并打印它們。
在我們建立的日志系統(tǒng)中,每個(gè)接收程序的運(yùn)行副本都會(huì)收到消息。這樣我們就可以運(yùn)行一個(gè)接收程序接收消息并將日志寫入磁盤;同時(shí)運(yùn)行另外一個(gè)接收程序接收消息并將日志打印到屏幕上。
實(shí)質(zhì)上,發(fā)布的日志消息將會(huì)被廣播給所有的接收者。
2.交換器
在教程的前幾部分,我們是發(fā)送消息到隊(duì)列并從隊(duì)列中接收消息。現(xiàn)在是時(shí)候介紹Rabbit中完整的消息傳遞模型了。
讓我們快速回顧一下前面教程中的內(nèi)容:
生產(chǎn)者是發(fā)送消息的用戶應(yīng)用程序。
隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。
消費(fèi)者是接收消息的用戶應(yīng)用程序。
在RabbitMQ中,消息傳遞模型的核心理念是生產(chǎn)者從來不會(huì)把任何消息直接發(fā)送到隊(duì)列,其實(shí),通常生產(chǎn)者甚至不知道消息是否會(huì)被分發(fā)到任何隊(duì)列中。
然而,生產(chǎn)者只能把消息發(fā)送給交換器。交換器非常簡(jiǎn)單,一方面它接收來自生產(chǎn)者的消息,另一方面又會(huì)把接收的消息推送到隊(duì)列中。交換器必須明確知道該如何處理收到的消息,應(yīng)該追加到一個(gè)特定隊(duì)列中?還是應(yīng)該追加到多個(gè)隊(duì)列中?或者應(yīng)該把它丟棄?這些規(guī)則都被定義在交換器類型中。
目前有這幾種的交換器類型可用:direct,topic,headers和fanout。我們重點(diǎn)關(guān)注最后一個(gè) -- fanout,讓我們來創(chuàng)建一個(gè)這種類型的交換器,將其命名為logs:
channel.ExchangeDeclare("logs", "fanout");fanout類型交換器非常簡(jiǎn)單。正如您可能從名字中猜出的那樣,它會(huì)把收到的所有消息廣播到它已知的所有隊(duì)列中。這恰巧是我們的日志系統(tǒng)所需要的。
列舉交換器
要列舉出服務(wù)器上的交換器,您可以使用非常有用的rabbitmqctl命令行工具:
執(zhí)行上述命令后,出現(xiàn)的列表中將會(huì)有一些amq.*交換器和默認(rèn)(未命名)交換器。這些是默認(rèn)創(chuàng)建的,不過目前您可能用不到它們。
默認(rèn)交換器
在教程的前些部分,我們對(duì)交換器這一概念還一無所知,但仍然可以把消息發(fā)送到隊(duì)列。之所以這樣,是因?yàn)槲覀兪褂昧艘粋€(gè)用空字符串("")標(biāo)識(shí)的默認(rèn)交換器。
回顧一下我們之前如何發(fā)布消息:
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "hello",basicProperties: null,body: body);第一個(gè)參數(shù)就是交換器的名稱,空字符串表示默認(rèn)或匿名交換器:將消息路由到routingKey指定的隊(duì)列(如果存在)中。
現(xiàn)在,我們可以把消息發(fā)布到我們指定的交換器:
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs",routingKey: "",basicProperties: null,body: body);3.臨時(shí)隊(duì)列
您是否還記得之前我們使用過的隊(duì)列,它們都有一個(gè)特定的名稱(記得應(yīng)該是hello和task_queue吧)。給隊(duì)列命名對(duì)我們來說是至關(guān)重要的 -- 因?yàn)槲覀兛赡苄枰鄠€(gè)Worker指向同一個(gè)隊(duì)列;當(dāng)您想要在生產(chǎn)者和消費(fèi)者之間共享隊(duì)列時(shí),給隊(duì)列一個(gè)名稱也是非常重要的。
但是,我們創(chuàng)建的日志系統(tǒng)并不希望如此。我們希望監(jiān)聽所有的日志消息,而不僅僅是其中一部分。我們也只對(duì)目前流動(dòng)的消息感興趣,而不是舊消息。為解決這個(gè)問題,我們需要做好兩件事。
首先,我們無論何時(shí)連接Rabbit,都需要一個(gè)新的、空的隊(duì)列。要做到這一點(diǎn),我們可以使用隨機(jī)名稱來創(chuàng)建隊(duì)列,或許,甚至更好的方案是讓服務(wù)器為我們選擇一個(gè)隨機(jī)隊(duì)列名稱。
其次,一旦我們與消費(fèi)者斷開連接,與之相關(guān)的隊(duì)列應(yīng)該被自動(dòng)刪除。
在.NET客戶端中,如果不向QueueDeclare()方法提供任何參數(shù),實(shí)際上就是創(chuàng)建了一個(gè)非持久化、獨(dú)占、且自動(dòng)刪除的隨機(jī)命名隊(duì)列:
var queueName = channel.QueueDeclare().QueueName;您可以在隊(duì)列指南中了解更多關(guān)于exclusive參數(shù)和其他隊(duì)列屬性的信息。
此時(shí),queueName包含一個(gè)隨機(jī)隊(duì)列名稱。例如,它看起來可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
4.綁定
我們已經(jīng)創(chuàng)建好了一個(gè)fanout交換器和一個(gè)隊(duì)列。現(xiàn)在我們需要告訴交換器把消息發(fā)送到我們的隊(duì)列。而交換器和隊(duì)列之間的關(guān)系就稱之為綁定。
// 把一個(gè)隊(duì)列綁定到指定交換器。channel.QueueBind(queue: queueName, ? ? ? ? ? ? ? ? ?exchange: "logs", ? ? ? ? ? ? ? ? ?routingKey: "");從現(xiàn)在起,logs交換器會(huì)把消息追加到我們的隊(duì)列中。
列舉綁定
您可以使用(您或許已經(jīng)猜到了),列舉出現(xiàn)有的綁定。
5.組合在一起
生產(chǎn)者程序負(fù)責(zé)分發(fā)消息,這與之前的教程看起來沒有太大區(qū)別。
最重要的變化是我們現(xiàn)在想把消息發(fā)布到我們的logs交換器,而不是匿名交換器。在發(fā)送時(shí)我們需要提供一個(gè)路由鍵routingKey,但是對(duì)于fanout交換器,它的值可以被忽略。這里是EmitLog.cs文件的代碼:
(EmitLog.cs源碼)
如你所見,在建立連接后,我們聲明了交換器。這一步非常有必要,因?yàn)榘l(fā)布消息到一個(gè)不存在的交換器,這種情況是被禁止的。
如果沒有隊(duì)列綁定到交換器上,消息將會(huì)丟失,但這對(duì)我們來說并沒有什么沒問題;如果沒有消費(fèi)者正在監(jiān)聽,我們是可以放心地把消息丟棄的。
ReceiveLogs.cs的代碼:
(ReceiveLogs.cs源碼)
按照教程[1]中的設(shè)置說明生成EmitLogs和ReceiveLogs項(xiàng)目。
如果您想把日志保存到文件中,只需打開一個(gè)控制臺(tái)并輸入:
cd ReceiveLogs dotnet run > logs_from_rabbit.log如果你想在屏幕上看到日志,我可以新開一個(gè)終端并運(yùn)行:
cd ReceiveLogs dotnet run當(dāng)然,分發(fā)日志需要輸入:
cd EmitLog dotnet run使用rabbitmqctl list_bindings命令,您可以驗(yàn)證代碼是否真正創(chuàng)建了我們想要的綁定和隊(duì)列。當(dāng)有兩個(gè)ReceiveLogs.cs程序運(yùn)行時(shí),您應(yīng)該看到如下所示的內(nèi)容:
sudo rabbitmqctl list_bindings# => Listing bindings ...# => logs ? ?exchange ? ? ? ?amq.gen-JzTY20BRgKO-HjmUJj0wLg ?queue ? ? ? ? ? []# => logs ? ?exchange ? ? ? ?amq.gen-vso0PVvyiRIL2WoV3i48Yg ?queue ? ? ? ? ? []# => ...done.對(duì)執(zhí)行結(jié)果的解釋簡(jiǎn)潔明了:來自logs交換器的數(shù)據(jù)轉(zhuǎn)發(fā)到了兩個(gè)由服務(wù)器隨機(jī)分配名稱的隊(duì)列。這正是我們期待的結(jié)果。
想要了解如何監(jiān)聽消息的這一塊內(nèi)容,讓我們繼續(xù)閱讀教程[4]。
6.寫在最后
本文翻譯自RabbitMQ官方教程C#版本。本文介紹如與官方有所出入,請(qǐng)以官方最新內(nèi)容為準(zhǔn)。
水平有限,翻譯的不好請(qǐng)見諒,如有翻譯錯(cuò)誤還請(qǐng)指正。
原文鏈接:RabbitMQ tutorial - Publish/Subscribe
實(shí)驗(yàn)環(huán)境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
最后更新:2018-06-11
作者:Esofar
出處:http://www.cnblogs.com/esofar/p/rabbitmq-publish-subscribe.html
.NET社區(qū)新聞,深度好文,歡迎訪問公眾號(hào)文章匯總 http://www.csharpkit.com
總結(jié)
以上是生活随笔為你收集整理的[译]RabbitMQ教程C#版 - 发布订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Core微服务之基于Ocelo
- 下一篇: c# char unsigned_dll