日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ系列教程之二:工作队列(Work Queues)

發布時間:2023/12/4 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ系列教程之二:工作队列(Work Queues) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

今天開始RabbitMQ教程的第二講,廢話不多說,直接進入話題。?? (使用.NET 客戶端 進行事例演示)

??

? ?在第一個教程中,我們編寫了一個從命名隊列中發送和接收消息的程序。在本教程中,我們將創建一個工作隊列,這個隊列將用于在多個工人之間分配耗時的任務。


??? 工作隊列【又名:任務隊列】背后主要的思想是避免立刻執行耗時的工作任務,并且一直要等到它結束為止。相反,我們規劃任務并晚些執行。我們封裝一個任務作為消息發送到一個命名的消息隊列中,后臺運行的工作線程將獲取任務并且最終執行該任務。當你運行很多的任務的時候他們會  共享工作線程和隊列。

??? 這個概念在Web應用程序中是尤其有用的,異步執行可以在短時間內處理一個復雜Http請求。

1、準備工作

??? 在本系列教程的前一個教程中,我們發送了一個包含“Hello World!”的消息,現在我們發送一個代表復雜任務的字符串。我們不會創建一個真實的任務,比如對圖像文件進行處理或PDF文件的渲染,因此讓我們假裝我們很忙-通過采用Thread.Sleep()功能來實現復雜和繁忙。我們將根據字符串中的點的數量作為它的復雜性,每一個點將占一秒鐘的“工作”。例如,一個假的任務描述Hello…,有三個點,我們就需要三秒。

??? 我們將稍微修改一下我們以前的例子中Send 程序的代碼,允許從命令行發送任意消息。這個程序將把任務發送到我們的消息隊列中,所以我們叫它NewTask:

?? 像教程一,我們需要生成兩個項目。

dotnet new console --name NewTask

? ? mv NewTask/Program.cs NewTask/NewTask.cs

? ? dotnet new console --name Worker

? ? mv Worker/Program.cs Worker/Worker.cs

? ? cd NewTask

? ?dotnet add package RabbitMQ.Client

? ?dotnet restore

? ?cd ../Worker

? ?dotnet add package RabbitMQ.Client

? ?dotnet restore



? ?var message = GetMessage(args);

? ?var body = Encoding.UTF8.GetBytes(message);


? ?var properties = channel.CreateBasicProperties();

? ?properties.Persistent = true;


? ?channel.BasicPublish(exchange: "",

? ? ? ? ? ? ? ? ? ? ?routingKey: "task_queue",

? ? ? ? ? ? ? ? ? ? ?basicProperties: properties,

? ? ? ? ? ? ? ? ? ? ?body: body);

信息數據我們可以從命令行的參數獲得:

private static string GetMessage(string[] args) { ?
?
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); }


??? 我們的舊Receive.cs代碼也需要一些修改:需要為消息體中每個點都需要消耗一秒鐘的工作,先要計算出消息體內有幾個點號,然后在乘以1000,就是這個復雜消息所消耗的時間,同時表示這是一個復雜任務。RabbitMQ將處理和發送理消息,并且執行這個任務,讓我們拷貝以下代碼黏貼到Worker的項目中,并進行相應的修改:?

var consumer = new EventingBasicConsumer(channel);

? ?consumer.Received += (model, ea) =>

? ?{

? ? ? ?var body = ea.Body;

? ? ? ?var message = Encoding.UTF8.GetString(body);

? ? ? ?Console.WriteLine(" [x] Received {0}", message);


? ? ? ?int dots = message.Split('.').Length - 1;

? ? ? ?Thread.Sleep(dots * 1000);


? ? ? ?Console.WriteLine(" [x] Done");

? ?};

? ?channel.BasicConsume(queue: "task_queue", noAck: true, consumer: consumer);


?我們自己假設的任務的模擬執行時間就是:

int dots = message.Split('.').Length - 1;Thread.Sleep(dots * 1000);


2、輪詢調度

??? 我們使用任務隊列的好處之一就是使任務可以并行化,增加系統的并行處理能力。如果我們正在建立一個積壓的工作,我們可以緊緊增加更多的Worker實例就可以完成大量工作的處理,修改和維護就很容易。

??? 首先,讓我們同時運行兩個Worker實例。他們都會從隊列中得到消息,但具體如何?讓我想想。

??? 你需要打開三個控制臺的應用程序。兩個控制臺程序將運行Wroker程序。這些控制臺程序將是我們的兩個消費者C1和C2。

# shell 1

? ? cd Worker

? ? dotnet run

? ? # => [*] Waiting for messages. To exit press CTRL+C


? ? # shell 2

? ? cd Worker

? ? dotnet run

? ? # => [*] Waiting for messages. To exit press CTRL+C

? ?在第三個控制臺應用程序中我們將發布新的任務。只要你已經啟動了消費者程序,你可以看到一些發布的信息:

# shell 3

? ?cd NewTask

? ?dotnet run "First message."

? ?dotnet run "Second message.."

? ?dotnet run "Third message..."

? ?dotnet run "Fourth message...."

? ?dotnet run "Fifth message....."

?讓我們看看交付了什么東西在Workers:

# shell 1

? ?# => [*] Waiting for messages. To exit press CTRL+C

? ?# => [x] Received 'First message.'

? ?# => [x] Received 'Third message...'

? ?# => [x] Received 'Fifth message.....'


? ?# shell 2

? ?# => [*] Waiting for messages. To exit press CTRL+C

? ?# => [x] Received 'Second message..'

? ?# => [x] Received 'Fourth message....'

默認情況下,RabbitMQ將會發送每一條消息給序列中每一個消費者。每個消費者都會得到相同數量的信息。這種分發消息的方式叫做輪詢。我們嘗試這三個或更多的Workers。?


3、消息確認

???? 處理一個任務可能需要幾秒鐘。如果有一個消費者開始了一個長期的任務,并且只做了一部分就發生了異常,你可能想知道到底發生了什么。我們目前的代碼,一旦RabbitMQ發送一個消息給客戶立即從內存中移除。在這種情況下,如果你關掉了一個Worker,我們將失去它正在處理的信息。我們也將丟失發送給該特定員工但尚未處理的所有信息。

??? 但我們不想失去任何任務。如果一個Worker出現了問題,我們希望把這個任務交給另一個Woker。

??? 為了確保消息不會丟失,RabbitMQ支持消息確認機制。ACK(nowledgement)確認消息是從【消息使用者】發送回來告訴RabbitMQ結果的一種特殊消息,確認消息告訴RabbitMQ指定的接受者已經收到、處理,并且RabbitMQ你可以自由刪除它。

??? 如果一個【消費者Consumer】死亡(其通道關閉,連接被關閉,或TCP連接丟失)不會發送ACK,RabbitMQ將會知道這個消息并沒有完全處理,將它重新排隊。如果有其他用戶同時在線,它就會快速地傳遞到另一個【消費者】。這樣你就可以肯定,沒有消息丟失,即使【Worker】偶爾死了或者出現問題。

??? 在沒有任何消息超時;當【消費者】死亡的時候RabbitMQ會重新發送消息。只要是正常的,即使處理消息需要很長很長的時間也會重發消息給【消費者】。

?? 消息確認的機制默認是打開的。在以前的例子中,我們明確地把它們關閉設置noAck(“沒有手動確認”)參數為true。是時候刪除這個標志了,并且從Worker發送一個適當確認消息,一旦我們完成了工作任務。

var consumer = new EventingBasicConsumer(channel);

? ?consumer.Received += (model, ea) =>

? ?{

? ? ? ?var body = ea.Body;

? ? ? ?var message = Encoding.UTF8.GetString(body);

? ? ? ?Console.WriteLine(" [x] Received {0}", message);


? ? ? ?int dots = message.Split('.').Length - 1;

? ? ? ?Thread.Sleep(dots * 1000);


? ? ? ?Console.WriteLine(" [x] Done");


? ? ? ?channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

? ?};

? ?channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);


? 使用這個代碼,我們可以肯定的是,即使你使用Ctrl + C關掉一個正在處理消息的Worker,也不會丟失任何東西。【Worker】被殺死后,未被確認的消息很快就會被退回。

4、忘記確認

??? 忘記調用BasicAck這是一個常見的錯誤。雖然這是一個簡單的錯誤,但后果是嚴重的。消息會被退回時,你的客戶退出(這可能看起來像是隨機的)但是RabbitMQ將會使用更多的內存保存這些任何延遲確認消息。
??? 為了調試這種錯誤,你可以使用rabbitmqctl打印messages_unacknowledged字段值:sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
??? 如果是在Window環境下,刪除掉sudo字符就可以:rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
5、持久性的消息

??? 我們已經學會了如何確保即使【消費者】死亡,任務也不會丟失。但是如果RabbitMQ服務器停止了,我們的任務仍然會丟失的。
??? 當RabbitMQ退出或死機會清空隊列和消息,除非你告訴它即使宕機也不能丟失任何東西。要確保消息不會丟失,有兩件事情我們是必需要做的:我們需要將隊列和消息都標記為持久的。
??? 首先,我們需要確保我們RabbitMQ從來都不會損失我們的的隊列。為了做到這一點,我們需要聲明我們的隊列為持久化的:

channel.QueueDeclare(queue: "hello",durable: true,exclusive: false,autoDelete: false,arguments: null);

??? 雖然這個命令本身是正確的,它不會起作用在我們目前的設置中。這是因為我們已經定義了一個叫hello的隊列,它不是持久化的。RabbitMQ不允許你使用不同的參數重新定義一個已經存在的隊列,在任何程序代碼中,都試圖返回一個錯誤。但有一個快速的解決方法-讓我們聲明一個名稱不同的隊列,例如task_queue:

channel.QueueDeclare(queue: "task_queue",durable: true,exclusive: false,autoDelete: false,arguments: null);

??? 這行代碼QueueDeclare表示隊列的聲明,創建并打開隊列,這個段代碼需要應用到【生產者】和【消費者】中。

??? 在這一點上,我們相信,task_queue隊列不會丟失任何東西即使RabbitMQ重啟了。現在我們要通過設置IbasicProperties.SetPersistent屬性值為true來標記我們的消息持久化的。

var properties = channel.CreateBasicProperties();
properties.Persistent
= true;

?????關于消息持久性的注意

? ? ?將消息標記為持久性并不能完全保證消息不會丟失。雖然該設置告訴RabbitMQ時時刻刻把保存消息到磁盤上,但是這個時間間隔還是有的,當RabbitMQ已經接受信息但并沒有保存它,此時還有可能丟失。另外,RabbitMQ不會為每個消息調用fsync(2)--它可能只是保存到緩存并沒有真正寫入到磁盤。雖然他的持久性保證不強,但它我們簡單的任務隊列已經足夠用了。如果您需要更強的保證,那么您可以使用Publisher Comfirms。

6、公平調度

?? 你可能已經注意到,調度仍然沒有像我們期望的那樣的工作。例如,在兩個Workers的情況下,當所有的奇數消息是沉重的,甚至消息是輕的,一個Worker忙個不停,而另一個Worker幾乎沒事可做。哎,RabbitMQ對上述情況一無所知,仍將消息均勻發送。

?? 發生這種情況是因為當有消息進入隊列的時候RabbitMQ才僅僅調度了消息。它根本不看【消費者】未確認消息的數量,它只是盲目的把第N個消息發送給第N個【消費者】。?

?? 為了避免上述情況的發生,我們可以使用prefetchcount = 1的設置來調用BasicQos方法。這個方法告訴RabbitMQ在同一時間不要發送多余一個消息的數據給某個【Worker】。或者,換句話說,當某個消息處理完畢,并且已經收到了消息確認之后,才可以繼續發送消息給那個【Worker】。相反,它將把消息分配給給下一個不忙的【Worker】。

channel.BasicQos(0, 1, false);

?? 注意隊列大小

?? 如果所有的工人都很忙,你的隊列可以填滿。你要留意這一點,也許會增加更多的【Worker】,或者有其他的策略。


7、把所有的代碼放在一起

NewTask.cs類最終的代碼是:

using System;

using RabbitMQ.Client;

using System.Text;


class NewTask

{

? ? public static void Main(string[] args)

? ? {

? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };

? ? ? ? using(var connection = factory.CreateConnection())

? ? ? ? using(var channel = connection.CreateModel())

? ? ? ? {

? ? ? ? ? ? channel.QueueDeclare(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);


? ? ? ? ? ? var message = GetMessage(args);

? ? ? ? ? ? var body = Encoding.UTF8.GetBytes(message);


? ? ? ? ? ? var properties = channel.CreateBasicProperties();

? ? ? ? ? ? properties.Persistent = true;


? ? ? ? ? ? channel.BasicPublish(exchange: "",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?routingKey: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?basicProperties: properties,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?body: body);

? ? ? ? ? ? Console.WriteLine(" [x] Sent {0}", message);

? ? ? ? }


? ? ? ? Console.WriteLine(" Press [enter] to exit.");

? ? ? ? Console.ReadLine();

? ? }


? ? private static string GetMessage(string[] args)

? ? {

? ? ? ? return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");

? ? }

}

Worker.cs完整源碼如下

using System;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using System.Text;

using System.Threading;


class Worker

{

? ? public static void Main()

? ? {

? ? ? ? var factory = new ConnectionFactory() { HostName = "localhost" };

? ? ? ? using(var connection = factory.CreateConnection())

? ? ? ? using(var channel = connection.CreateModel())

? ? ? ? {

? ? ? ? ? ? channel.QueueDeclare(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?durable: true,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?exclusive: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?autoDelete: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?arguments: null);


? ? ? ? ? ? channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);


? ? ? ? ? ? Console.WriteLine(" [*] Waiting for messages.");


? ? ? ? ? ? var consumer = new EventingBasicConsumer(channel);

? ? ? ? ? ? consumer.Received += (model, ea) =>

? ? ? ? ? ? {

? ? ? ? ? ? ? ? var body = ea.Body;

? ? ? ? ? ? ? ? var message = Encoding.UTF8.GetString(body);

? ? ? ? ? ? ? ? Console.WriteLine(" [x] Received {0}", message);


? ? ? ? ? ? ? ? int dots = message.Split('.').Length - 1;

? ? ? ? ? ? ? ? Thread.Sleep(dots * 1000);


? ? ? ? ? ? ? ? Console.WriteLine(" [x] Done");


? ? ? ? ? ? ? ? channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

? ? ? ? ? ? };

? ? ? ? ? ? channel.BasicConsume(queue: "task_queue",

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?noAck: false,

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?consumer: consumer);


? ? ? ? ? ? Console.WriteLine(" Press [enter] to exit.");

? ? ? ? ? ? Console.ReadLine();

? ? ? ? }

? ? }

}

使用消息確認和BasicQos方法可以建立一個工作隊列。持久化的選項可以讓我們的任務隊列保持存活即使RabbitMQ重啟。 ?好了,寫完了,翻譯的不好,大家見諒。 ?原文地址如下:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

相關文章:

  • RabbitMQ系列教程之一:我們從最簡單的事情開始!Hello World

  • 如何優雅的使用RabbitMQ

  • .NET 使用 RabbitMQ 圖文簡介

  • RabbitMQ 高可用集群搭建及電商平臺使用經驗總結

  • 搭建高可用的rabbitmq集群 + Mirror Queue + 使用C#驅動連接

  • RabbitMQ消息隊列應用

  • 體驗Rabbitmq強大的【優先級隊列】之輕松面對現實業務場景

原文地址:http://www.cnblogs.com/PatrickLiu/p/6943830.html


.NET社區新聞,深度好文,微信中搜索dotNET跨平臺或掃描二維碼關注

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的RabbitMQ系列教程之二:工作队列(Work Queues)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。