日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

如何优雅的使用RabbitMQ?

發(fā)布時(shí)間:2024/9/20 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何优雅的使用RabbitMQ? 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

RabbitMQ無(wú)疑是目前最流行的消息隊(duì)列之一,對(duì)各種語(yǔ)言環(huán)境的支持也很豐富,作為一個(gè).NET developer有必要學(xué)習(xí)和了解這一工具。消息隊(duì)列的使用場(chǎng)景

大概有3種:

1、系統(tǒng)集成,分布式系統(tǒng)的設(shè)計(jì)。各種子系統(tǒng)通過(guò)消息來(lái)對(duì)接,這種解決方案也逐步發(fā)展成一種架構(gòu)風(fēng)格,即“通過(guò)消息傳遞的架構(gòu)”。

2、當(dāng)系統(tǒng)中的同步處理方式嚴(yán)重影響了吞吐量,比如日志記錄。假如需要記錄系統(tǒng)中所有的用戶行為日志,如果通過(guò)同步的方式記錄日志勢(shì)必會(huì)影響系統(tǒng)的響應(yīng)速度,當(dāng)我們將日志消息發(fā)送到消息隊(duì)列,記錄日志的子系統(tǒng)就會(huì)通過(guò)異步的方式去消費(fèi)日志消息。

3、系統(tǒng)的高可用性,比如電商的秒殺場(chǎng)景。當(dāng)某一時(shí)刻應(yīng)用服務(wù)器或數(shù)據(jù)庫(kù)服務(wù)器收到大量請(qǐng)求,將會(huì)出現(xiàn)系統(tǒng)宕機(jī)。如果能夠?qū)⒄?qǐng)求轉(zhuǎn)發(fā)到消息隊(duì)列,再由服務(wù)器去消費(fèi)這些消息將會(huì)使得請(qǐng)求變得平穩(wěn),提高系統(tǒng)的可用性。

一、開(kāi)始使用RabbitMQ

RabbitMQ官網(wǎng)提供了詳細(xì)的安裝步驟,另外官網(wǎng)還提供了RabbitMQ在六種場(chǎng)景的使用教程。其中教程1、3、6將覆蓋99%的使用場(chǎng)景,所以正常來(lái)說(shuō)只需要搞清楚這3個(gè)教程即可快速上手。

二、簡(jiǎn)單分析

我們以官方提供的教程1做個(gè)簡(jiǎn)單梳理:該教程展示了Producer如何向一個(gè)消息隊(duì)列(message queue)發(fā)送一個(gè)消息(message),消息消費(fèi)者(Consumer)收到該消息后消費(fèi)該消息。

1、producer端:

var factory = new ConnectionFactory() { HostName = "localhost" };

?using (var connection = factory.CreateConnection())

?{

?????while (Console.ReadLine() != null)

?????{

?????????using (var channel = connection.CreateModel())

?????????{

?????????????//創(chuàng)建一個(gè)名叫"hello"的消息隊(duì)列

?????????????channel.QueueDeclare(queue: "hello",

?????????????????durable: false,

?????????????????exclusive: false,

?????????????????autoDelete: false,

?????????????????arguments: null);

?

?????????????var message = "Hello World!";

?????????????var body = Encoding.UTF8.GetBytes(message);

?

?????????????//向該消息隊(duì)列發(fā)送消息message

?????????????channel.BasicPublish(exchange: "",

?????????????????routingKey: "hello",

?????????????????basicProperties: null,

?????????????????body: body);

?????????????Console.WriteLine(" [x] Sent {0}", message);

?????????}

?????}

?}

該段代碼非常簡(jiǎn)單,幾乎到了無(wú)法精簡(jiǎn)的地步:創(chuàng)建了一個(gè)信道(channel)->創(chuàng)建一個(gè)隊(duì)列->向該隊(duì)列發(fā)送消息。

2、Consumer端

var factory = new ConnectionFactory() { HostName = "localhost" };

?using (var connection = factory.CreateConnection())

?{

?????using (var channel = connection.CreateModel())

?????{

?????????//創(chuàng)建一個(gè)名為"hello"的隊(duì)列,防止producer端沒(méi)有創(chuàng)建該隊(duì)列

?????????channel.QueueDeclare(queue: "hello",

??????????????????????????????durable: false,

??????????????????????????????exclusive: false,

??????????????????????????????autoDelete: false,

??????????????????????????????arguments: null);

?

?????????//回調(diào),當(dāng)consumer收到消息后會(huì)執(zhí)行該函數(shù)

?????????var consumer = new EventingBasicConsumer(channel);

?????????consumer.Received += (model, ea) =>

?????????{

?????????????var body = ea.Body;

?????????????var message = Encoding.UTF8.GetString(body);

?????????????Console.WriteLine(" [x] Received {0}", message);

?????????};

?

?????????//消費(fèi)隊(duì)列"hello"中的消息

?????????channel.BasicConsume(queue: "hello",

??????????????????????????????noAck: true,

??????????????????????????????consumer: consumer);

?

?????????Console.WriteLine(" Press [enter] to exit.");

?????????Console.ReadLine();

?????}

?}

該段代碼可以理解為:創(chuàng)建信道->創(chuàng)建隊(duì)列->定義回調(diào)函數(shù)->消費(fèi)消息。

該實(shí)例描述了Send/Receive模式,可以簡(jiǎn)單理解為1(producer) VS 1(consumer)的場(chǎng)景;

實(shí)例3則描述了Publish/Subscriber模式,即1(producer) VS 多個(gè)(consumer);

在以上兩個(gè)示例中,producer只需要發(fā)送消息即可,并不關(guān)心consumer的返回結(jié)果。實(shí)例6則描述了一個(gè)RPC調(diào)用場(chǎng)景,producer發(fā)送消息后還要接收consumer的返回結(jié)果,這一場(chǎng)景看起來(lái)跟使用消息隊(duì)列的目的有點(diǎn)相悖。因?yàn)槭褂孟㈥?duì)列的目的之一就是要異步,但是這一場(chǎng)景似乎又將異步變成了同步,不過(guò)這一場(chǎng)景也很有用,比如一個(gè)用戶操作產(chǎn)生了一個(gè)消息,應(yīng)用服務(wù)收到該消息后執(zhí)行了一些邏輯并使得數(shù)據(jù)庫(kù)發(fā)生了變化,UI會(huì)一直等待應(yīng)用服務(wù)的返回結(jié)果才刷新頁(yè)面。

三、 發(fā)現(xiàn)抽象

我桌子上放著一本RabbitMQ in Action,另外官網(wǎng)提供的文檔也很詳細(xì),我感覺(jué)在一個(gè)月內(nèi)我就能精通RabbitMQ,到時(shí)候簡(jiǎn)歷上又可以寫上“精通…”,感覺(jué)有點(diǎn)小得意呢... ,但是我知道這并不是使用RabbitMQ的最佳方式。

我們知道合理的抽象可以幫我們隱藏掉一些技術(shù)細(xì)節(jié),讓我們將重心放在核心業(yè)務(wù)上,比如一個(gè)人問(wèn)你:“大雁塔如何走?”你的回答可能是“小寨往東,一直走兩站,右手邊”,如果你回答:“右轉(zhuǎn)45度,向前走100米,再轉(zhuǎn)90度…”,對(duì)方就會(huì)迷失在這些細(xì)節(jié)中。

消息隊(duì)列的使用過(guò)程中實(shí)際隱藏著一種抽象——服務(wù)總線(Service Bus)。

我們?cè)诨仡^看第一個(gè)例子,這個(gè)例子隱含的業(yè)務(wù)是:ClientA發(fā)送一個(gè)指令,ClientB收到該指令后做出反應(yīng)。如果是這樣,我們?yōu)槭裁匆P(guān)心如何創(chuàng)建channel,如何創(chuàng)建一個(gè)queue? 我僅僅是要發(fā)送一個(gè)消息而已。另外這個(gè)例子寫的其實(shí)不夠健壯:

沒(méi)有重試機(jī)制:如果ClientB第一次沒(méi)有執(zhí)行成功如何對(duì)該消息處理?

沒(méi)有錯(cuò)誤處理機(jī)制:如果ClientB在重試了N次之后還是異常如何處理該消息?

沒(méi)有熔斷機(jī)制;

如何對(duì)ClientA做一個(gè)schedule(計(jì)劃安排),比如定時(shí)發(fā)送等;

沒(méi)有消息審計(jì)機(jī)制;

無(wú)法對(duì)消息的各個(gè)狀態(tài)做追蹤;

事物處理等。

服務(wù)總線正是這種場(chǎng)景的抽象,并且為我們提供了這些機(jī)制,讓我們趕快來(lái)看個(gè)究竟吧。

四、初識(shí)MassTransit

MassTransit是.NET平臺(tái)下的一款開(kāi)源免費(fèi)的ESB產(chǎn)品,官網(wǎng):http://masstransit-project.com/,GitHub 700 star,500 Fork,類似的產(chǎn)品還有NServiceBus,之所以要選用MassTransit是因?yàn)樗萅ServiceBus輕量級(jí),另外在MassTransit開(kāi)發(fā)之初就選用了RabbitMQ作為消息傳輸組建;同時(shí)我想拿他跟NServiceBus做個(gè)比較,看看他們到底有哪些側(cè)重點(diǎn)。

1、新建控制臺(tái)應(yīng)用程序:Masstransit.RabbitMQ.GreetingClient

使用MassTransit可以從Nuget中安裝:

1

Install-Package MassTransit.RabbitMQ

2、創(chuàng)建服務(wù)總線,發(fā)送一個(gè)命令

static void Main(string[] args)

{

????Console.WriteLine("Press 'Enter' to send a message.To exit, Ctrl + C");

?

????var bus = BusCreator.CreateBus();

????var sendToUri = new Uri($"{RabbitMqConstants.RabbitMqUri}{RabbitMqConstants.GreetingQueue}");

?

????while (Console.ReadLine()!=null)

????{

????????Task.Run(() => SendCommand(bus, sendToUri)).Wait();

????}

?

????Console.ReadLine();

}

?

private static async void SendCommand(IBusControl bus,Uri sendToUri)

{

????var endPoint =await bus.GetSendEndpoint(sendToUri);

????var command = new GreetingCommand()

????{

????????Id = Guid.NewGuid(),

????????DateTime = DateTime.Now

????};

?

????await endPoint.Send(command);

?

????Console.WriteLine($"send command:id={command.Id},{command.DateTime}");

}

這一段代碼隱藏了眾多關(guān)于消息隊(duì)列的細(xì)節(jié),將我們的注意力集中在發(fā)送消息上,同時(shí)ServiceBus提供的API也更接近業(yè)務(wù),我們雖然發(fā)送的是一個(gè)消息,但是在這種場(chǎng)景下體現(xiàn)出來(lái)是一個(gè)命令,Send(command)這一API描述了我們的意圖。

3、服務(wù)端接收這一命令

新建一個(gè)命令臺(tái)控制程序:Masstransit.RabbitMQ.GreetingServer

var bus = BusCreator.CreateBus((cfg, host) =>

{

????cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingQueue, e =>

????{

????????e.Consumer<GreetingConsumer>();

?

????});

});

這一代碼可以理解為服務(wù)端在監(jiān)聽(tīng)消息,我們?cè)诜?wù)端注冊(cè)了一個(gè)名為“GreetingConsumer”的消費(fèi)者,GreetingConsumer的定義:

public class GreetingConsumer :IConsumer<GreetingCommand>

{

????public async Task Consume(ConsumeContext<GreetingCommand> context)

????{

?

????????await Console.Out.WriteLineAsync($"receive greeting commmand: {context.Message.Id},{context.Message.DateTime}");

????}

}

該consumer可以消費(fèi)類型為GreetingCommand的消息。這一實(shí)例幾乎隱藏了有關(guān)RabbitMQ的技術(shù)細(xì)節(jié),將代碼中心放在了業(yè)務(wù)中,將這兩個(gè)控制臺(tái)應(yīng)用跑起來(lái)試試:

五、實(shí)現(xiàn)Publish/Subscribe模式

發(fā)布/訂閱模式使得基于消息傳遞的軟件架構(gòu)成為可能,這一能力表現(xiàn)為ClientA發(fā)送消息X,ClientB和ClientC都可以訂閱消息X。

1、我們?cè)谏厦娴睦又懈脑煲幌?#xff0c;當(dāng)GreetingConsumer收到GreetingCommand后發(fā)送一個(gè)GreetingEvent:

var greetingEvent = new GreetingEvent()

?{

?????Id = context.Message.Id,

?????DateTime = DateTime.Now

?};

?

?await context.Publish(greetingEvent);

2、新建控制臺(tái)程序Masstransit.RabbitMQ.GreetingEvent.SubscriberA用來(lái)訂閱GreetingEvent消息:

var bus = BusCreator.CreateBus((cfg, host) =>

?{

?????cfg.ReceiveEndpoint(host, RabbitMqConstants.GreetingEventSubscriberAQueue, e =>

?????{

?????????e.Consumer<GreetingEventConsumer>();

?????});

?});

?

?bus.Start();

定義GreetingEventConsumer:

public class GreetingEventConsumer:IConsumer<Greeting.Message.GreetingEvent>

?{

?????public async Task Consume(ConsumeContext<Greeting.Message.GreetingEvent> context)

?????{

?????????await Console.Out.WriteLineAsync($"receive greeting event: id {context.Message.Id}");

?????}

?}

這一代碼跟Masstransit.RabbitMQ.GreetingServer接受一個(gè)命令幾乎一模一樣,唯一的區(qū)別在于:

在Send/Receive模式中Client首先要獲得對(duì)方(Server)的終結(jié)點(diǎn)(endpoint),直接向該終結(jié)點(diǎn)發(fā)送命令。Server方監(jiān)聽(tīng)自己的終結(jié)點(diǎn)并消費(fèi)命令。

而Publish/Subscribe模式中Client publish一個(gè)事件,SubscriberA在自己的終結(jié)點(diǎn)(endpointA)監(jiān)聽(tīng)事件,SubscriberB在自己的終結(jié)點(diǎn)(endpointB)監(jiān)聽(tīng)事件。

3、根據(jù)上面的分析再定義一個(gè)Masstransit.RabbitMQ.GreetingEvent.SubscriberB

4、將4個(gè)控制臺(tái)應(yīng)用程序跑起來(lái)看看

六、實(shí)現(xiàn)RPC模式

這一模式在Masstransit中被稱作Request/Response模式,通過(guò)IRequestClient<IRequest, IResponse> 接口來(lái)實(shí)現(xiàn)相關(guān)操作。一個(gè)相關(guān)的例子在官方的github。

?

結(jié)束語(yǔ):本篇文章分析了如何使用Masstransit來(lái)抽象業(yè)務(wù),避免直接使用具體的消息隊(duì)列,當(dāng)然本文提到的眾多服務(wù)總線機(jī)制,如“重試、熔斷等”并沒(méi)有在該文中出現(xiàn),需要大家進(jìn)一步去了解該項(xiàng)目。

通過(guò)對(duì)Masstransit的一些試用和NServiceBus的對(duì)比,Masstransit在實(shí)際項(xiàng)目中很容易上手并且免費(fèi),各種API定義的也非常清晰,但是官方的文檔有點(diǎn)過(guò)于簡(jiǎn)單,實(shí)際使用中還需要去做深入的研究。作為.NET平臺(tái)下為數(shù)不多的ESB開(kāi)源產(chǎn)品,其關(guān)注程度還是不夠,期待大家為開(kāi)源項(xiàng)目做出貢獻(xiàn)。

來(lái)源:https://jingyan.baidu.com/article/e75057f2eaa4c6ebc91a893b.html 與50位技術(shù)專家面對(duì)面20年技術(shù)見(jiàn)證,附贈(zèng)技術(shù)全景圖

總結(jié)

以上是生活随笔為你收集整理的如何优雅的使用RabbitMQ?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。