日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > C# >内容正文

C#

RabbitMQ教程C#版 - 工作队列

發(fā)布時間:2023/12/4 C# 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ教程C#版 - 工作队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

先決條件
本教程假定RabbitMQ已經(jīng)安裝,并運行在localhost標準端口(5672)。如果你使用不同的主機、端口或證書,則需要調(diào)整連接設(shè)置。

從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表聯(lián)系我們。

1.工作隊列

(使用.NET客戶端)

在第一篇教程RabbitMQ教程C#版 “Hello World”中,我們編寫了兩個程序,用于從一個指定的隊列發(fā)送和接收消息。在本文中,我們將創(chuàng)建一個工作隊列,用于在多個工作線程間分發(fā)耗時的任務(wù)。

工作隊列(又名:任務(wù)隊列)背后的主要想法是避免立即執(zhí)行資源密集型、且必須等待其完成的任務(wù)。相反的,我們把這些任務(wù)安排在稍后完成。我們可以將任務(wù)封裝為消息并把它發(fā)送到隊列中,在后臺運行的工作進程將從隊列中取出任務(wù)并最終執(zhí)行。當您運行多個工作線程,這些任務(wù)將在這些工作線程之間共享。

這個概念在Web應(yīng)用程序中特別有用,因為在一個HTTP請求窗口中無法處理復(fù)雜的任務(wù)。

2.準備

我們將略微修改上一個示例中的Send程序,以其可以在命令行發(fā)送任意消息。
這個程序?qū)⒄{(diào)度任務(wù)到我們的工作隊列中,所以讓我們把它命名為NewTask:

像教程[1],我們需要生成兩個項目:

dotnet new console --name NewTask

mv NewTask/Program.cs NewTask/NewTask.cs


dotnet new console --name Worker

mv Worker/Program.cs Worker/Worker.cs


cd NewTask

dotnet add package RabbitMQ.Client

dotnet restore


cd ../Worker

dotnet add package RabbitMQ.Client

dotnet restore


var message = GetMessage(args);

var body = Encoding.UTF8.GetBytes(message);


var properties = channel.CreateBasicProperties();

properties.Persistent = true;


channel.BasicPublish(exchange: "",

? ? ? ? ? ? ? ? ? ? ?routingKey: "task_queue",

? ? ? ? ? ? ? ? ? ? ?basicProperties: properties,

? ? ? ? ? ? ? ? ? ? ?body: body);

從命令行參數(shù)獲取消息的幫助方法:

private static string GetMessage(string[] args)

{

? ? return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

}

我們舊的Receive.cs腳本也需要進行一些更改:它需要為消息體中的每個點模擬一秒種的時間消耗。它將處理由RabbitMQ發(fā)布的消息,并執(zhí)行任務(wù),因此我們把它復(fù)制到Worker項目并修改:

// 構(gòu)建消費者實例。

var consumer = new EventingBasicConsumer(channel);


// 綁定消息接收事件。

consumer.Received += (model, ea) =>

{

? ? var body = ea.Body;

? ? var message = Encoding.UTF8.GetString(body);

? ? Console.WriteLine(" [x] Received {0}", message);


? ? // 模擬耗時操作。

? ? int dots = message.Split('.').Length - 1;

? ? Thread.Sleep(dots * 1000);


? ? Console.WriteLine(" [x] Done");

};


channel.BasicConsume(queue: "task_queue", autoAck: true, consumer: consumer);

模擬虛擬任務(wù)的執(zhí)行時間:

int dots = message.Split('.').Length - 1;

Thread.Sleep(dots * 1000);

3.循環(huán)調(diào)度

使用任務(wù)隊列的優(yōu)點之一是能夠輕松地并行工作。如果我們正在積累積壓的工作,我們僅要增加更多的工作者,并以此方式可以輕松擴展。

首先,我們嘗試同時運行兩個Worker實例。他們都會從隊列中獲取消息,但究竟如何?讓我們來看看。

您需要打開三個控制臺,兩個運行Worker程序,這些控制臺作為我們的兩個消費者 - C1和C2。

# shell 1

cd Worker

dotnet run

# => [*] Waiting for messages. To exit press CTRL+C

# shell 2

cd Worker

dotnet run

# => [*] Waiting for messages. To exit press CTRL+C

在第三個控制臺中,我們將發(fā)布一些新的任務(wù)。一旦你已經(jīng)運行了消費者,你可以嘗試發(fā)布幾條消息:

# shell 3

cd NewTask

dotnet run "First message."

dotnet run "Second message.."

dotnet run "Third message..."

dotnet run "Fourth message...."

dotnet run "Fifth message....."

讓我們看看有什么發(fā)送到了我們的Worker程序:

# shell 1

# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'First message.'

# => [x] Received 'Third message...'

# => [x] Received 'Fifth message.....'

# shell 2

# => [*] Waiting for messages. To exit press CTRL+C

# => [x] Received 'Second message..'

# => [x] Received 'Fourth message....'

默認情況下,RabbitMQ會按順序?qū)⒚織l消息發(fā)送給下一個消費者。消費者數(shù)量平均的情況下,每個消費者將會獲得相同數(shù)量的消息。這種分配消息的方式稱為循環(huán)(Round-Robin)。請嘗試開啟三個或更多的Worker程序來驗證。

4.消息確認

處理一項任務(wù)可能會需要幾秒鐘的時間。如果其中一個消費者開啟了一項長期的任務(wù)并且只完成了部分就掛掉了,您可能想知道會發(fā)生什么?在我們當前的代碼中,一旦RabbitMQ把消息分發(fā)給了消費者,它會立即將這條消息標記為刪除。在這種情況下,如果您停掉某一個Worker,我們將會丟失這條正在處理的消息,也將丟失所有分發(fā)到該Worker但尚未處理的消息。

但是我們不想丟失任何一個任務(wù)。如果一個Worker掛掉了,我們希望這個任務(wù)能被重新分發(fā)給其他Worker。

為了確保消息永遠不會丟失,RabbitMQ支持消息確認機制。消費者回發(fā)一個確認信號Ack(nowledgement)給RabbitMQ,告訴它某個消息已經(jīng)被接收、處理并且可以自由刪除它。

如果一個消費者在還沒有回發(fā)確認信號之前就掛了(其通道關(guān)閉,連接關(guān)閉或者TCP連接丟失),RabbitMQ會認為該消息未被完全處理,并將其重新排隊。如果有其他消費者同時在線,該消息將會被會迅速重新分發(fā)給其他消費者。這樣,即便Worker意外掛掉,也可以確保消息不會丟失。

沒有任何消息會超時;當消費者死亡時,RabbitMQ將會重新分發(fā)消息。即使處理消息需要非常非常長的時間也沒關(guān)系。

默認情況下,手動消息確認模式是開啟的。在前面的例子中,我們通過將autoAck(“自動確認模式”)參數(shù)設(shè)置為true來明確地關(guān)閉手動消息確認模式。一旦完成任務(wù),是時候刪除這個標志并且從Worker手動發(fā)送一個恰當?shù)拇_認信號給RabbitMQ。

// 構(gòu)建消費者實例。

var consumer = new EventingBasicConsumer(channel);


// 綁定消息接收事件。

consumer.Received += (model, ea) =>

{

? ? var body = ea.Body;

? ? var message = Encoding.UTF8.GetString(body);

? ? Console.WriteLine(" [x] Received {0}", message);

? ??

? ? // 模擬耗時操作。

? ? int dots = message.Split('.').Length - 1;

? ? Thread.Sleep(dots * 1000);


? ? Console.WriteLine(" [x] Done");

? ??

? ? // 手動發(fā)送消息確認信號。

? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

};


// autoAck:false - 關(guān)閉自動消息確認,調(diào)用`BasicAck`方法進行手動消息確認。

// autoAck:true? - 開啟自動消息確認,當消費者接收到消息后就自動發(fā)送ack信號,無論消息是否正確處理完畢。

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

使用上面這段代碼,我們可以確定的是,即使一個Worker在處理消息時,我們通過使用CTRL + C來終止它,也不會丟失任何消息。Worker掛掉不久,所有未確認的消息將會被重新分發(fā)。

忘記確認
遺漏BasicAck是一個常見的錯誤。這是一個很簡單的錯誤,但導(dǎo)致的后果卻是嚴重的。當客戶端退出時(看起來像是隨機分發(fā)的),消息將會被重新分發(fā),但是RabbitMQ會吃掉越來越多的內(nèi)存,因為它不能釋放未確認的消息。
為了調(diào)試這種錯誤,您可以使用rabbitmqctl來打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,刪除sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

5.消息持久化

我們已經(jīng)學(xué)習(xí)了如何確保即使消費者掛掉,任務(wù)也不會丟失。但是如果RabbitMQ服務(wù)器停止,我們的任務(wù)還是會丟失。

當RabbitMQ退出或崩潰時,它會忘記已存在的隊列和消息,除非告訴它不要這樣做。為了確保消息不會丟失,有兩件事是必須的:我們需要將隊列和消息標記為持久

首先,我們需要確保RabbitMQ永遠不會丟失我們的隊列。為了做到這一點,我們需要把隊列聲明是持久的(Durable):

// 聲明隊列,通過指定durable參數(shù)為`true`,對消息進行持久化處理。?

channel.QueueDeclare(queue: "hello",

? ? ? ? ? ? ? ? ? ? ?durable: true,

? ? ? ? ? ? ? ? ? ? ?exclusive: false,

? ? ? ? ? ? ? ? ? ? ?autoDelete: false,

? ? ? ? ? ? ? ? ? ? ?arguments: null);

雖然這個命令本身是正確的,但是它在當前設(shè)置中不會起作用。那是因為我們已經(jīng)定義過一個名為hello的隊列,并且這個隊列不是持久化的。RabbitMQ不允許使用不同的參數(shù)重新定義已經(jīng)存在的隊列,并會向嘗試執(zhí)行該操作的程序返回一個錯誤。但有一個快速的解決辦法 - 讓我們用不同的名稱聲明一個隊列,例如task_queue:

channel.QueueDeclare(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ?durable: true,

? ? ? ? ? ? ? ? ? ? ?exclusive: false,

? ? ? ? ? ? ? ? ? ? ?autoDelete: false,

? ? ? ? ? ? ? ? ? ? ?arguments: null);


注意,該聲明隊列QueueDeclare方法的更改需要同時應(yīng)用于生產(chǎn)者和消費者代碼。

此時,我們可以確定的是,即使RabbitMQ重新啟動,task_queue隊列也不會丟失。現(xiàn)在我們需要將我們的消息標記為持久的(Persistent)?- 通過將IBasicProperties.Persistent設(shè)置為true。

// 將消息標記為持久性。

var properties = channel.CreateBasicProperties();

properties.Persistent = true;

關(guān)于消息持久性的說明
將消息標記為Persistent并不能完全保證消息不會丟失。盡管它告訴RabbitMQ將消息保存到磁盤,但當RabbitMQ接收到消息并且尚未保存消息時仍有一段時間間隔。此外,RabbitMQ不會為每條消息執(zhí)行fsync(2)?- 它可能只是保存到緩存中,并沒有真正寫入磁盤。消息的持久化保證并不健壯,但對于簡單的任務(wù)隊列來說已經(jīng)足夠了。如果您需要一個更加健壯的保證,可以使用發(fā)布者確認。

6.公平調(diào)度

您可能已經(jīng)注意到調(diào)度仍然無法完全按照我們期望的方式工作。例如,在有兩個Worker的情況下,假設(shè)所有奇數(shù)消息都很龐大、偶數(shù)消息都很輕量,那么一個Worker將會一直忙碌,而另一個Worker幾乎不做任何工作。是的,RabbitMQ并不知道存在這種情況,它仍然會平均地分發(fā)消息。

發(fā)生這種情況是因為RabbitMQ只是在消息進入隊列后就將其分發(fā)。它不會去檢查每個消費者所擁有的未確認消息的數(shù)量。它只是盲目地將第n條消息分發(fā)給第n位消費者。

為了改變上述這種行為,我們可以使用參數(shù)設(shè)置prefetchCount = 1的basicQos方法。

這就告訴RabbitMQ同一時間不要給一個Worker發(fā)送多條消息。或者換句話說,不要向一個Worker發(fā)送新的消息,直到它處理并確認了前一個消息。
相反,它會這個消息調(diào)度給下一個不忙碌的Worker。

channel.BasicQos(0, 1, false);

關(guān)于隊列大小的說明
如果所有的Worker都很忙,您的隊列可能會被填滿。請留意這一點,可以嘗試添加更多的Worker,或者使用其他策略。

7.組合在一起

我們NewTask.cs類的最終代碼:

using System;

using RabbitMQ.Client;

using System.Text;


class NewTask

{

? ? public static void Main(string[] args)

? ? {

? ? ? ? // 實例化連接工廠。

? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };

? ? ? ??

? ? ? ? // 創(chuàng)建連接、信道。

? ? ? ? using(var connection = factory.CreateConnection())

? ? ? ? using(var channel = connection.CreateModel())

? ? ? ? {

? ? ? ? ? ? // 聲明隊列,標記為持久性。

? ? ? ? ? ? channel.QueueDeclare(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);

? ? ? ? ? ??

? ? ? ? ? ? // 獲取發(fā)送消息。

? ? ? ? ? ? var message = GetMessage(args);

? ? ? ? ? ? var body = Encoding.UTF8.GetBytes(message);

? ? ? ? ? ??

? ? ? ? ? ? // 將消息標記為持久性。

? ? ? ? ? ? var properties = channel.CreateBasicProperties();

? ? ? ? ? ? properties.Persistent = true;

? ? ? ? ? ??

? ? ? ? ? ? // 發(fā)送數(shù)據(jù)包。

? ? ? ? ? ? channel.BasicPublish(exchange: "",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?routingKey: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?basicProperties: properties,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?body: body);

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

? ? ? ? ? ? Console.WriteLine(" [x] Sent {0}", message);

? ? ? ? }


? ? ? ? Console.WriteLine(" Press [enter] to exit.");

? ? ? ? Console.ReadLine();

? ? }


? ? private static string GetMessage(string[] args)

? ? {

? ? ? ? return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

? ? }

}

(NewTask.cs源碼)

還有我們的Worker.cs:

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

using System.Threading;


class Worker

{

? ? public static void Main()

? ? {

? ? ? ? // 實例化連接工廠。

? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };

? ? ? ??

? ? ? ? ?// 創(chuàng)建連接、信道。

? ? ? ? using(var connection = factory.CreateConnection())

? ? ? ? using(var channel = connection.CreateModel())

? ? ? ? {

? ? ? ? ? ? // 聲明隊列,標記為持久性。

? ? ? ? ? ? channel.QueueDeclare(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);

? ? ? ? ? ??

? ? ? ? ? ? // 告知RabbitMQ,在未收到當前Worker的消息確認信號時,不再分發(fā)給消息,確保公平調(diào)度。

? ? ? ? ? ? channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


? ? ? ? ? ? Console.WriteLine(" [*] Waiting for messages.");


? ? ? ? ? ? // 構(gòu)建消費者實例。

? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ??

? ? ? ? ? ? // 綁定消息接收事件。

? ? ? ? ? ? consumer.Received += (model, ea) =>

? ? ? ? ? ? {

? ? ? ? ? ? ? ? var body = ea.Body;

? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(body);

? ? ? ? ? ? ? ? Console.WriteLine(" [x] Received {0}", message);


? ? ? ? ? ? ? ? // 模擬耗時操作。

? ? ? ? ? ? ? ? int dots = message.Split('.').Length - 1;

? ? ? ? ? ? ? ? Thread.Sleep(dots * 1000);


? ? ? ? ? ? ? ? Console.WriteLine(" [x] Done");


? ? ? ? ? ? ? ? // 手動發(fā)送消息確認信號。

? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

? ? ? ? ? ? };

? ? ? ? ? ??

? ? ? ? ? ? channel.BasicConsume(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoAck: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer: consumer);


? ? ? ? ? ? Console.WriteLine(" Press [enter] to exit.");

? ? ? ? ? ? Console.ReadLine();

? ? ? ? }

? ? }

}

(Worker.cs源碼)

使用消息確認機制和BasicQ您可以創(chuàng)建一個工作隊列。即使RabbitMQ重新啟動,通過持久性選項也可讓任務(wù)繼續(xù)存在。

有關(guān)IModel方法和IBasicProperties的更多信息,您可以在線瀏覽RabbitMQ .NET客戶端API參考。

現(xiàn)在,我們可以繼續(xù)閱讀教程[3],學(xué)習(xí)如何向多個消費者發(fā)送相同的消息。

8.寫在最后

本文翻譯自RabbitMQ官方教程C#版本。本文介紹如與官方有所出入,請以官方最新內(nèi)容為準。

水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。

  • 原文鏈接:RabbitMQ tutorial - Work Queues

  • 實驗環(huán)境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code

  • 最后更新:2018-04-03


相關(guān)文章:?

  • .net core 使用Redis的發(fā)布訂閱

  • RabbitMQ知多少

  • RabbitMQ系列教程之四:路由(Routing)

  • RabbitMQ系列教程之三:發(fā)布/訂閱(Publish/Subscribe)

  • RabbitMQ系列教程之二:工作隊列(Work Queues)

  • 如何優(yōu)雅的使用RabbitMQ

  • .NET 使用 RabbitMQ 圖文簡介

  • RabbitMQ 高可用集群搭建及電商平臺使用經(jīng)驗總結(jié)

  • .NET Core 使用RabbitMQ

  • ASP.NET Core Web API下事件驅(qū)動型架構(gòu)的實現(xiàn)(三):基于RabbitMQ的事件總線

  • RabbitMQ教程C#版 “Hello World”

原文地址:https://www.cnblogs.com/esofar/p/rabbitmq-work-queues.html?


.NET社區(qū)新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com

總結(jié)

以上是生活随笔為你收集整理的RabbitMQ教程C#版 - 工作队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 精品一区二区三区不卡 | 香蕉尹人网 | 奶波霸巨乳一二三区乳 | 亚洲专区一 | 不卡一区二区在线视频 | 欧美性爱视频久久 | 91们嫩草伦理 | 片黄在线观看 | 91九色在线播放 | 国产女人在线视频 | 婷婷在线免费 | 特级西西人体444www | 欧美五月婷婷 | 天天色视频 | 偷偷操不一样的久久 | 蜜臀99久久精品久久久久久软件 | 求免费黄色网址 | 精品99久久久久成人网站免费 | wwyoujizzcom| 国产在线拍揄自揄拍无码 | 青草福利 | 一本色道久久亚洲综合精品蜜桃 | 中字幕一区二区三区乱码 | 午夜精品久久久久久久99老熟妇 | 久久精品国产av一区二区三区 | 97超碰导航 | 少妇毛片一区二区三区粉嫩av | 黄色大片aaa | 少妇又色又紧又黄又刺激免费 | 爱操视频| 午夜av在线免费观看 | 在线无遮挡 | 蜜臀av一区二区三区激情综合 | sese亚洲| 夜av | 香蕉污视频在线观看 | 亚洲色图二区 | 91免费精品 | たちの熟人妻av一区二区 | 亚洲精品在线视频观看 | 欧美一级xxx | 欧美成人另类 | 亚洲欧美日韩在线播放 | 欧美精品xxxxx | 修女也疯狂3免费观看完整版 | 欧美国产在线一区 | 女攻总攻大胸奶汁(高h) | 中文字幕9 | 久久精品视频8 | 国精产品乱码一区一区三区四区 | 久国产视频 | 天天干天天色综合 | 欧美在线黄 | 乌克兰少妇性做爰 | 成人午夜免费在线 | 激情九九| 免费久草视频 | 国产精品区一区二 | 深夜在线免费视频 | 亚洲色图在线观看视频 | 午夜888| 耳光调教vk | 越南a级片 | 丰满少妇一级片 | 99视频久| 疯狂少妇 | 寂寞人妻瑜伽被教练日 | 亚洲天堂网一区二区 | 国产日韩欧美视频在线 | 久久午夜国产精品 | 免费黄色大片网站 | av有声小说一区二区三区 | 在线免费观看视频黄 | 国产精品国产三级国产专区51区 | 美女网站免费黄 | 一级片免费在线观看 | 国产美女自拍视频 | 中国黄色一级毛片 | 亚洲一区二区三区午夜 | 中文字幕人妻一区二区在线视频 | 99在线免费视频 | 99青草| 我的好妈妈在线观看 | 精品人妻一区二区三区四区在线 | 手机看片91| 麻豆91茄子在线观看 | 日韩av无码一区二区三区 | 91羞羞网站 | 亚洲美免无码中文字幕在线 | 国产人成精品 | 国产一级片 | 五月天激情电影 | 清纯唯美亚洲色图 | 蜜桃视频无码区在线观看 | 麻豆精品视频免费观看 | 911看片 | 内射合集对白在线 | 韩国美女啪啪 | av在线浏览 |