日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

使用EasyNetQ组件操作RabbitMQ消息队列服务

發(fā)布時(shí)間:2023/12/13 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用EasyNetQ组件操作RabbitMQ消息队列服务 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanved Message Queue)的開(kāi)源實(shí)現(xiàn),是實(shí)現(xiàn)消息隊(duì)列應(yīng)用的一個(gè)中間件,消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合,異步消息,流量削鋒等問(wèn)題。實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。EasyNetQ則是基于官方.NET組件RabbitMQ.Client 的又一層封裝,使用起來(lái)更加方便。本篇隨筆主要大概介紹下RabbitMQ的基礎(chǔ)知識(shí)和環(huán)境的準(zhǔn)備,以及使用EasyNetQ的相關(guān)開(kāi)發(fā)調(diào)用。

1、RabbitMQ基礎(chǔ)知識(shí)

AMQP,即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。
RabbitMQ?是一個(gè)開(kāi)源的AMQP實(shí)現(xiàn),服務(wù)器端用Erlang語(yǔ)言編寫(xiě),支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。

RabbitMQ的特點(diǎn)強(qiáng)大的應(yīng)用程序消息傳遞;使用方便;運(yùn)行在所有主要操作系統(tǒng)上;支持大量開(kāi)發(fā)人員平臺(tái);開(kāi)源商業(yè)支持。消息隊(duì)列的模式有兩種模式:P2P(Point to Point),P2P模式包含三個(gè)角色:消息隊(duì)列(Queue),發(fā)送者(Sender),接收者(Receiver)。每個(gè)消息都被發(fā)送到一個(gè)特定的隊(duì)列,接收者從隊(duì)列中獲取消息。隊(duì)列保留著消息,直到他們被消費(fèi)或超時(shí)。Publish/Subscribe(Pub/Sub),包含三個(gè)角色主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber) 。多個(gè)發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個(gè)訂閱者。

?EasyNetQ?的目標(biāo)是提供一個(gè)使.NET中的RabbitMQ盡可能簡(jiǎn)單的庫(kù)。在EasyNetQ中消息應(yīng)由.NET類型表示,消息應(yīng)通過(guò)其.NET類型進(jìn)行路由。EasyNetQ按消息類型進(jìn)行路由。發(fā)布消息時(shí),EasyNetQ會(huì)檢查其類型,并根據(jù)類型名稱,命名空間和裝配體給出一個(gè)路由密鑰。在消費(fèi)方面,用戶訂閱類型。訂閱類型后,該類型的消息將路由到訂戶。默認(rèn)情況下,EasyNetQ使用Newtonsoft.Json庫(kù)將.NET類型序列化為JSON。這具有消息是人類可讀的優(yōu)點(diǎn),因此您可以使用RabbitMQ管理應(yīng)用程序等工具來(lái)調(diào)試消息問(wèn)題。

? ?EasyNetQ是在RabbitMQ.Client庫(kù)之上提供服務(wù)的組件集合。這些操作可以像序列化,錯(cuò)誤處理,線程編組,連接管理等。它們由mini-IoC容器組成。您可以輕松地用自己的實(shí)現(xiàn)替換任何組件。因此,如果您希望XML序列化而不是內(nèi)置的JSON,只需編寫(xiě)一個(gè)ISerializer的實(shí)現(xiàn)并將其注冊(cè)到容器。以下是官方提供的一個(gè)結(jié)構(gòu)圖,這個(gè)結(jié)構(gòu)圖可以很好的解析該組件的結(jié)構(gòu):

?

2、RabbitMQ的環(huán)境準(zhǔn)備

本處主要介紹在Windows系統(tǒng)中安裝RabbitMQ。

?1. 下載安裝erlang?

? ? ? 下載地址?http://www.erlang.org/downloads(根據(jù)操作系統(tǒng)選擇32還64位)??

? 2. 下載安裝rabbitmq-server

? ? ?下載地址?http://www.rabbitmq.com/install-windows.html

下載后獲得兩個(gè)安裝文件,按照順序安裝即可

?安裝erlang環(huán)境后,一般會(huì)添加了一個(gè)ERLANG_HOME的系統(tǒng)變量,指向erlang的安裝目錄路徑,如下所示(一般都添加了,確認(rèn)下

?

安裝RabbitMQ后,在程序里面可以看到

?我們使用它的命令行來(lái)啟動(dòng)RabbitMQ的服務(wù)

查看安裝是否成功命令 :rabbitmqctl status

安裝成功,在瀏覽器中輸入?http://127.0.0.1:15672/,可以看到如下界面,使用默認(rèn)的賬號(hào)密碼均為guest登陸進(jìn)行管理

?guest 賬號(hào)是管理員賬號(hào),可以添加Exchanges,Queues,Admin。但我們一般不使用guest賬號(hào),可以選擇用命令來(lái)添加賬號(hào)和權(quán)限,也可以使用管理界面進(jìn)行添加相應(yīng)的內(nèi)容。

例如我添加相應(yīng)的用戶賬號(hào)

一般我們還需要添加虛擬機(jī),默認(rèn)的虛擬機(jī)為/,我這里添加了一個(gè)虛擬機(jī)myvhost。

然后綁定賬號(hào)到虛擬機(jī)上即可。

?

?3、EasyNetQ組件的使用

EasyNetQ組件的使用方式比較簡(jiǎn)單,跟很多組件都類似,例如:建立連接,進(jìn)行操作做等等,對(duì)于EasyNetQ組件也是如此。

在.NET中使用EasyNetQ,要求至少基于 .NET4.5的框架基礎(chǔ)上進(jìn)行開(kāi)發(fā),可以直接在VS項(xiàng)目上使用NuGet的程序包進(jìn)行添加EasyNetQ的引用。

一般添加引用后,至少包含了下面圖示的幾個(gè)引用DLL。

?

??1)創(chuàng)建連接:

使用EasyNetQ連接RabbitMQ,是在應(yīng)用程序啟動(dòng)時(shí)創(chuàng)建一個(gè)IBus對(duì)象,并且,在應(yīng)用程序關(guān)閉時(shí)釋放該對(duì)象。

RabbitMQ連接是基于IBus接口的,當(dāng)IBus中的方法被調(diào)用,連接才會(huì)開(kāi)啟。創(chuàng)建一個(gè)IBus對(duì)象的方法如下:

var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=admin;password=123456”);

與RabbitMQ服務(wù)器的延遲連接由IBus接口表示,創(chuàng)建連接的方式連接字符串由格式為key = value的鍵/值對(duì)組成,每一個(gè)用分號(hào)(;)分隔。

  • host,host=localhost 或者h(yuǎn)ost =192.168.1.102或者h(yuǎn)ost=my.rabbitmq.com,如果用到集群配置的話,那么可以用逗號(hào)將服務(wù)地址隔開(kāi),例如host=a.com,b.com,c.com
  • virtualHost,虛擬主機(jī),默認(rèn)為'/'
  • username,用戶登錄名
  • password,用戶登錄密碼
  • requestedHeartbeat,心跳設(shè)置,默認(rèn)是10秒
  • prefetchcount,默認(rèn)是50
  • pubisherConfirms,默認(rèn)為false
  • persistentMessages,消息持久化,默認(rèn)為true
  • product,產(chǎn)品名
  • platform,平臺(tái)
  • timeout,默認(rèn)為10秒

一般我們?cè)诖a里面測(cè)試的話,簡(jiǎn)化連接代碼如下所示。

//初始化bus對(duì)象bus = RabbitHutch.CreateBus("host=localhost");

?

? ?2關(guān)閉連接:

bus.Dispose();

? ?要關(guān)閉連接,只需簡(jiǎn)單地處理總線,這將關(guān)閉EasyNetQ使用的連接,渠道,消費(fèi)者和所有其他資源。

如果我們?cè)赪inform窗體里面初始化一個(gè)IBus對(duì)象,那么在窗體關(guān)閉的時(shí)候,關(guān)閉這個(gè)接口即可。

private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e){//關(guān)閉IBus接口if(bus != null){bus.Dispose();}}

?

???3發(fā)布消息:

EasyNetQ支持最簡(jiǎn)單的消息模式是發(fā)布和訂閱。發(fā)布消息后,任意消費(fèi)者可以訂閱該消息,也可以多個(gè)消費(fèi)者訂閱。并且不需要額外配置。首先,如上文中需要先創(chuàng)建一個(gè)IBus對(duì)象,然后,在創(chuàng)建一個(gè)可序列化的.NET對(duì)象。調(diào)用Publish方法即可。

var message = new MyMessage { Text = "Hello Rabbit" }; bus.Publish(message);

?

?4訂閱消息:

EasyNetQ提供了消息訂閱,當(dāng)調(diào)用Subscribe方法時(shí)候,EasyNetQ會(huì)創(chuàng)建一個(gè)用于接收消息的隊(duì)列,不過(guò)與消息發(fā)布不同的是,消息訂閱增加了一個(gè)參數(shù),subscribe_id.代碼如下:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

第一個(gè)參數(shù)是訂閱id,另外一個(gè)是delegate參數(shù),用于處理接收到的消息。這里要注意的是,subscribe_id參數(shù)很重要,假如開(kāi)發(fā)者用同一個(gè)subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會(huì)以輪訓(xùn)的方式給每個(gè)訂閱的隊(duì)列發(fā)送消息。接收到之后,其他隊(duì)列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那么生成的每一個(gè)隊(duì)列都會(huì)收到該消息。

需要注意的是,在收到消息處理消息時(shí)候,不要占用太多的時(shí)間,會(huì)影響消息的處理效率,所以,遇到占用長(zhǎng)時(shí)間的處理方法,最好用異步處理。

為了測(cè)試發(fā)布和訂閱消息,我們可以建立幾個(gè)不同的項(xiàng)目來(lái)進(jìn)行測(cè)試,如發(fā)布放在一個(gè)Winform項(xiàng)目,訂閱放在一個(gè)Winform項(xiàng)目,另外一個(gè)項(xiàng)目放置共享的消息對(duì)象定義,如下所示。

定義消息對(duì)象類如下所示。

/// <summary>/// 定義的MQ消息類型/// </summary>public class TextMessage{public string Text { get; set; }}

然后在發(fā)布消息的Winform項(xiàng)目上創(chuàng)建一個(gè)處理的窗體,并添加如下代碼。

namespace MyRabbitMQ.Publisher {/// <summary>/// 測(cè)試RabbitMQ消息隊(duì)列的發(fā)布/// </summary>public partial class FrmPublisher : DevExpress.XtraEditors.XtraForm{//構(gòu)建一個(gè)IBus公用接口對(duì)象private IBus bus = null;public FrmPublisher(){InitializeComponent();//初始化bus對(duì)象bus = RabbitHutch.CreateBus("host=localhost");//對(duì)指定消息類型進(jìn)行回應(yīng)bus.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to: "+ request.Text});//收到消息后輸出到控制臺(tái)上顯示bus.Receive("my.queue", x => x.Add<MyMessage>(message => Console.WriteLine(message.ToJson())).Add<MyOtherMessage>(message => Console.WriteLine(message.ToJson())));}

發(fā)布消息的處理代碼,如下代碼所示。

private void btnSend_Click(object sender, EventArgs e){if (bus != null){bus.Publish(new TextMessage{Text = this.txtContent.Text});}}

然后在創(chuàng)建一個(gè)類似窗體,用來(lái)訂閱消息的處理窗體,如下所示代碼和窗體。

namespace MyRabbitMQ.Subcriber { /// <summary>/// 測(cè)試RabbitMQ消息隊(duì)列的訂閱/// </summary>public partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm{//構(gòu)建一個(gè)IBus公用接口對(duì)象private IBus bus = null;public FrmSubcriber(){InitializeComponent();//初始化bus對(duì)象bus = RabbitHutch.CreateBus("host=localhost");if(bus != null){//訂閱一個(gè)消息,并對(duì)接收到的消息進(jìn)行處理,展示在控件上bus.Subscribe<TextMessage>("test", (msg) =>{StringBuilder sb = new StringBuilder();sb.AppendLine(msg.Text + "," + DateTime.Now.ToString());sb.AppendLine(this.txtContent.Text);this.txtContent.Invoke(new MethodInvoker(delegate(){this.txtContent.Text = sb.ToString();}));});}//使用消息發(fā)送接口發(fā)送消息bus.Send("my.queue", new MyMessage { Text = "Hello Widgets!" });bus.Send("my.queue", new MyOtherMessage { Text = "Hello wuhuacong!" });}

發(fā)送請(qǐng)求獲取響應(yīng)的代碼如下所示。

private void btnRequest_Click(object sender, EventArgs e){//定義請(qǐng)求消息的對(duì)象var request = new MyRequest(){Text = string.Format("請(qǐng)求消息,{0}", DateTime.Now)};//異步獲取請(qǐng)求消息的結(jié)果并進(jìn)行處理,展示應(yīng)答消息在窗體中的var task = bus.RequestAsync<MyRequest, MyResponse>(request);task.ContinueWith(response =>{StringBuilder sb = new StringBuilder();sb.AppendLine(response.Result.Text);sb.AppendLine(this.txtContent.Text);this.txtContent.Invoke(new MethodInvoker(delegate(){this.txtContent.Text = sb.ToString();}));});}

?

兩個(gè)項(xiàng)目聯(lián)合進(jìn)行測(cè)試如下界面所示。

?

發(fā)布者多次發(fā)送消息的情況下,訂閱者中,會(huì)進(jìn)行消息的輪訓(xùn)處理,也就是進(jìn)行均勻分配。

?

??5)消息發(fā)送(Send)和接收(Receive)

與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊(duì)列名稱。

//發(fā)送端代碼 bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });//接收端代碼 bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

并且,也可以在同一個(gè)隊(duì)列上發(fā)送不同的消息類型,Receive方法可以這么寫(xiě):

bus.Receive("my.queue", x => x.Add<MyMessage>(message => deliveredMyMessage = message).Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

如果消息到達(dá)隊(duì)列,但是沒(méi)有發(fā)現(xiàn)相應(yīng)消息類型的處理時(shí),EasyNetQ會(huì)發(fā)送一條消息到error隊(duì)列,并且,帶上一個(gè)異常信息:No handler found for message type <message type>。與Subscribe類型,如果在同一個(gè)隊(duì)列,同一個(gè)消息類型,多次調(diào)用Receive方法時(shí),消息會(huì)通過(guò)輪詢的形式發(fā)送給每個(gè)Receive端。

?

? ?6)遠(yuǎn)程過(guò)程調(diào)用:

var request = new TestRequestMessage {Text = "Hello from the client! "}; bus.Request<TestRequestMessage, TestResponseMessage>(request, response => Console.WriteLine("Got response: '{0}'", response.Text));

???7RPC服務(wù)器:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => new TestResponseMessage{ Text = request.Text + " all done!" });

???8記錄器:

var logger = new MyLogger() ; var bus = RabbitHutch.CreateBus(“my connection string”, x => x.Register<IEasyNetQLogger>(_ => logger));

???9路由:

Publish方法,可以加一個(gè)topic參數(shù)。

bus.Publish(message, "X.A");

 消息訂閱方可以通過(guò)路由來(lái)過(guò)濾相應(yīng)的消息。

  * 匹配一個(gè)字符

  #匹配0個(gè)或者多個(gè)字符

  所以?X.A.2 會(huì)匹配到 "#", "X.#", "*.A.*" 但不會(huì)匹配 "X.B.*" 或者 "A". 當(dāng)消息訂閱需要用到topic時(shí)候,需要調(diào)用Subscribe的重載方法

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*")); bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));

上述這種方式,會(huì)將消息輪詢發(fā)送給兩個(gè)訂閱者,如果只需要一個(gè)訂閱者的話,可以這么調(diào)用:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));

RabbitMQ具有非常好的功能,基于主題的路由,允許訂閱者基于多個(gè)標(biāo)準(zhǔn)過(guò)濾消息。*(星號(hào))匹配一個(gè)字。#(哈希)匹配為零個(gè)或多個(gè)單詞。

?RabbitMQ的應(yīng)用場(chǎng)景,一般在快速處理訂單,以及異步的多任務(wù)處理中可以得到很好的體現(xiàn),下面是幾個(gè)應(yīng)用場(chǎng)景。

郵件和短消息的處理

訂單的解耦處理

RabbitMQ的服務(wù)器架構(gòu)

?

3、RabbitMQ查詢狀態(tài)出現(xiàn)錯(cuò)誤的處理

安裝成功之后使用rabbitmqctl status命令之后出現(xiàn)如下錯(cuò)誤。

Status of node rabbit@WUHUACONG ... Error: unable to perform an operation on node 'rabbit@WUHUACONG'. Please see diagnostics information and suggestions below.Most common reasons for this are:* Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)* CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)* Target node is not runningIn addition to the diagnostics info below:* See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more* Consult server logs on node rabbit@WUHUACONGDIAGNOSTICS ===========attempted to contact: [rabbit@WUHUACONG]rabbit@WUHUACONG:* connected to epmd (port 4369) on WUHUACONG* epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic* TCP connection succeeded but Erlang distribution failed* Authentication failed (rejected by the remote node), please check the Erlang cookieCurrent node details:* node name: rabbitmqcli100@WUHUACONG* effective user's home directory: C:\Users\Administrator* Erlang cookie hash: RgaUM2coc+rxIhJrfLS7Jw==

這個(gè)問(wèn)題出現(xiàn)比較常見(jiàn),主要原因是兩個(gè)目錄的.erlang.cookie文件內(nèi)容不一樣。

要確保.erlang.cookie文件的一致性,不知道什么原因?qū)е铝薈:\Users\{UserName}\.erlang.cookie和默認(rèn)情況下C:\WINDOWS\System32\config\systemprofile\.erlang.cookie不一致了,將Windows目錄下的拷貝到用戶目錄下就可以了。

反正無(wú)論如何,兩個(gè)地址的Cookie內(nèi)容一致就可以了,然后重啟下RabbitMQ服務(wù)器即可正常運(yùn)行,并可以正常獲取它的狀態(tài)信息。

?

轉(zhuǎn)載于:https://www.cnblogs.com/wuhuacong/p/8927096.html

總結(jié)

以上是生活随笔為你收集整理的使用EasyNetQ组件操作RabbitMQ消息队列服务的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。