日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

轻松搞定RabbitMQ(二)——工作队列之消息分发机制

發布時間:2025/3/15 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 轻松搞定RabbitMQ(二)——工作队列之消息分发机制 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? ? ?上一篇博文中簡單介紹了一下RabbitMQ的基礎知識,并寫了一個經典語言入門程序——HelloWorld。本篇博文中我們將會創建一個工作隊列用來在工作者(consumer)間分發耗時任務。同樣是翻譯的官網實例。

工作隊列


? ? ? ?在前一篇博文中,我們完成了一個簡單的對聲明的隊列進行發送和接受消息程序。下面我們將創建一個工作隊列,來向多個工作者(consumer)分發耗時任務。

? ? ? ?工作隊列(又名:任務隊列)的主要任務是為了避免立即做一個資源密集型的卻又必須等待完成的任務。相反的,我們進行任務調度:將任務封裝為消息并發給隊列。在后臺運行的工作者(consumer)將其取出,然后最終執行。當你運行多個工作者(consumer),隊列中的任務被工作進行共享執行。

? ? ? ?這樣的概念對于在一個HTTP短鏈接的請求窗口中處理復雜任務的web應用程序,是非常有用的。

準備

? ? ? ?使用Thread.Sleep()方法來模擬耗時。采用小數點的數量來表示任務的復雜性。每一個點將住哪用1s的“工作”。例如,Hello... 處理完需要3s的時間。

? ? ? ?發送端(生產者):NewTask.java

public class NewTask {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException {/*** 創建連接連接到MabbitMQ*/ConnectionFactory factory = new ConnectionFactory();// 設置MabbitMQ所在主機ip或者主機名factory.setHost("127.0.0.1");// 創建一個連接Connection connection = factory.newConnection();// 創建一個頻道Channel channel = connection.createChannel();// 指定一個隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 發送的消息String message = "Hello World...";// 往隊列中發出一條消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");// 關閉頻道和連接channel.close();connection.close();} }

? ? ? ?工作者(消費者)Worker.java

public class Worker {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws IOException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");// 打開連接和創建頻道,與發送端一樣Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 創建隊列消費者final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());try {for (char ch: message.toCharArray()) {if (ch == '.') {Thread.sleep(1000);}}} catch (InterruptedException e) {} finally {System.out.println(" [x] Done! at " +new Date().toLocaleString());}}};channel.basicConsume(QUEUE_NAME, true, consumer);} } ? ? ? ?運行結果如下:



任務分發機制

? ? ? ?正主來了。。。下面開始介紹各種任務分發機制。


Round-robin(輪詢分發)

? ? ? ?使用任務隊列的優點之一就是可以輕易的并行工作。如果我們積壓了好多工作,我們可以通過增加工作者(消費者)來解決這一問題,使得系統的伸縮性更加容易。

修改一下NewTask,使用for循環模擬多次發送消息的過程:

for (int i = 0; i < 5; i++) {// 發送的消息String message = "Hello World"+Strings.repeat(".", i);// 往隊列中發出一條消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}

? ? ? ?我們先啟動1個生產者實例,2個工作者實例,看一下如何執行:


? ? ? ?從上述的結果中,我們可以得知,在默認情況下,RabbitMQ將逐個發送消息到在序列中的下一個消費者(而不考慮每個任務的時長等等,且是提前一次性分配,并非一個一個分配)。平均每個消費者獲得相同數量的消息。這種方式分發消息機制稱為Round-Robin(輪詢)。


Fair dispatch(公平分發)

? ? ? ?您可能已經注意到,任務分發仍然沒有完全按照我們想要的那樣。比如:現在有2個消費者,所有的奇數的消息都是繁忙的,而偶數則是輕松的。按照輪詢的方式,奇數的任務交給了第一個消費者,所以一直在忙個不停。偶數的任務交給另一個消費者,則立即完成任務,然后閑得不行。而RabbitMQ則是不了解這些的。

? ? ? ?這是因為當消息進入隊列,RabbitMQ就會分派消息。它不看消費者為應答的數目,只是盲目的將第n條消息發給第n個消費者。


? ? ? ?為了解決這個問題,我們使用basicQos(?prefetchCount = 1)方法,來限制RabbitMQ只發不超過1條的消息給同一個消費者。當消息處理完畢后,有了反饋,才會進行第二次發送。

int prefetchCount = 1; channel.basicQos(prefetchCount); ? ? ? ?注:如果所有的工作者都處于繁忙狀態,你的隊列有可能被填充滿。你可能會觀察隊列的使用情況,然后增加工作者,或者使用別的什么策略。
? ? ? ?還有一點需要注意,使用公平分發,必須關閉自動應答,改為手動應答。這些內容會在下篇博文中講述。

? ? ? ?整體代碼如下:生產者NewTask.java

public class NewTask {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException {/*** 創建連接連接到MabbitMQ*/ConnectionFactory factory = new ConnectionFactory();// 設置MabbitMQ所在主機ip或者主機名factory.setHost("127.0.0.1");// 創建一個連接Connection connection = factory.newConnection();// 創建一個頻道Channel channel = connection.createChannel();// 指定一個隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);int prefetchCount = 1;//限制發給同一個消費者不得超過1條消息channel.basicQos(prefetchCount);for (int i = 0; i < 5; i++) {// 發送的消息String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);// 往隊列中發出一條消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// 關閉頻道和連接channel.close();connection.close();} }

? ? ? ?消費者Worker.java

public class Worker {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws IOException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");// 打開連接和創建頻道,與發送端一樣Connection connection = factory.newConnection();final Channel channel = connection.createChannel();// 聲明隊列,主要為了防止消息接收者先運行此程序,隊列還不存在時創建隊列。channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);//保證一次只分發一個// 創建隊列消費者final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {for (char ch: message.toCharArray()) {if (ch == '.') {Thread.sleep(1000);}}} catch (InterruptedException e) {} finally {System.out.println(" [x] Done! at " +new Date().toLocaleString());channel.basicAck(envelope.getDeliveryTag(), false); }}};channel.basicConsume(QUEUE_NAME, false, consumer);} } ? ? ? ?運行結果如下:




總結

以上是生活随笔為你收集整理的轻松搞定RabbitMQ(二)——工作队列之消息分发机制的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产精品偷伦视频免费观看了 | 在线看福利影 | 狠狠爱综合网 | 伊人二区 | 黄色在线观看www | 国产视频三级 | 狠狠躁日日躁夜夜躁2022麻豆 | 亚洲成人资源 | 青青操操 | 久久久久婷 | 日韩激情一区二区三区 | 美日韩成人av | 亚洲欧美中文字幕5发布 | 色www.| 精品国产一区二区三 | 日韩精品人妻一区二区三区免费 | 最好看的2019年中文视频 | 加勒比一区二区 | 亚洲视频在线免费 | 成人国产精品蜜柚视频 | 少妇一区二区三区四区 | 一级黄色在线 | www日韩欧美| 神马午夜在线观看 | 国产日韩二区 | 国产原创一区 | 国产69精品久久久久久久 | 三级特黄视频 | 在线播放波多野结衣 | 欧美精品观看 | 久久精品国产亚洲av麻豆蜜芽 | 国内精品一区二区 | 在线观看污网站 | 性猛╳xxx乱大交 | h视频网站在线观看 | 国产无遮掩| 色香影视 | 性爱视频在线免费 | 午夜福利三级理论电影 | 古代玷污糟蹋np高辣h文 | 久久五| 国产香蕉尹人视频在线 | 日本在线观看一区 | 无码人妻av一区二区三区波多野 | 男人的天堂色偷偷 | chinese hd av| 日韩二区在线观看 | 日韩精品中文字幕一区二区三区 | 精品一区欧美 | 亚洲欧洲一区二区 | 91精品国产色综合久久不卡98 | 裸体黄色片 | 欧美粗暴jizz性欧美20 | 中文在线资源天堂 | 久久精品欧美 | 久久精品视频在线免费观看 | 男女啪啪国产 | 国产毛片久久久久久久 | 日本精品黄色 | www.在线观看麻豆 | 成人免费一区二区 | 亚洲色图婷婷 | 国产精品久久久久久免费免熟 | 青青草免费观看视频 | 999免费视频 | 日韩国产网站 | 人日人视频 | 欧美成人黄色小视频 | 十八岁世界在线观看高清免费韩剧 | 日本99视频| 性少妇mdms丰满hdfilm | 在线观看h视频 | 国产51页 | 精品一区二区在线观看 | 亚洲免费国产 | 老版水浒传83版免费播放 | 日韩三级欧美 | 成人亚洲精品777777ww | 美女又爽又黄免费视频 | 亚洲av无码精品色午夜 | 国产日韩在线视频 | 99精品在线播放 | 在线观看久草 | 日韩在线精品视频一区二区涩爱 | 国产香蕉在线观看 | 风间ゆみ大战黑人 | 男女黄色录像 | 四虎黄色影院 | 国产成人激情 | 亚洲va久久久噜噜噜久久天堂 | 亚洲香蕉在线观看 | 成年人在线观看视频免费 | 探花视频在线免费观看 | 亚洲视频福利 | 亚洲天堂资源网 | 日韩精品免费一区二区三区竹菊 | 亚洲天堂资源网 | 影音av资源 | 少妇人禽zoz0伦视频 |