日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【RabbitMQ】 WorkQueues

發(fā)布時間:2023/11/30 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【RabbitMQ】 WorkQueues 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

消息分發(fā)

在【RabbitMQ】 HelloWorld中我們寫了發(fā)送/接收消息的程序。這次我們將創(chuàng)建一個Work Queue用來在多個消費者之間分配耗時任務(wù)。

Work Queues(又稱為:Task Queues)的主要思想是:盡可能的減少執(zhí)行資源密集型任務(wù)時的等待時間。我們將任務(wù)封裝為消息并發(fā)送到隊列,在后臺的工作進程將彈出任務(wù)并進行作業(yè)。當你運行很多worker,任務(wù)將在他們之間共享。

這個概念在WEB應(yīng)用中尤為有效,因為在一個HTTP請求進行復雜操作是不可能的。

準備

在上一節(jié)我們發(fā)送了一條包含“Hello World”的消息?,F(xiàn)在我們將要發(fā)送代表復雜任務(wù)的字符串。我們沒有真實場景的復雜任務(wù),例如調(diào)整圖片大小或呈現(xiàn)PDF文件,讓我們假裝自己很忙 - 通過Thread.sleep()。我們將根據(jù)字符串中“.”的數(shù)量來衡量任務(wù)復雜度;每一個“.”增加1秒鐘的工作時間。例如:一個“Hello...”將消耗3秒鐘。

稍微修改下上一節(jié)中Send.java的代碼,讓我們可以從命令行參數(shù)中輸入任意字符作為消息。這個程序?qū)⒔o我們的工作隊列安排消息,命名為NewTask.java

String message = getMessage(argv);channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

一些封裝方法來幫助我們從命令行參數(shù)中得到消息(簡單來說就是將所有的命令行參數(shù)當做一條完整消息):

private static String getMessage(String[] strings){if (strings.length < 1)return "Hello World!";return joinStrings(strings, " "); }private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString(); }

老的Recv.java程序也需要一些修改:他需要為消息中的每一個“.”偽造1秒鐘的工作時間。稱為Worker.java

final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}} }; boolean autoAck = true; // acknowledgment is covered below 消息確認,在后面會詳細講解 channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

模擬任務(wù)執(zhí)行:

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);} }

循環(huán)調(diào)度

使用Task Queue的優(yōu)點之一就是可以輕松的進行并行工作。如果我們正在構(gòu)建一個積壓的工作,我們可以僅僅通過添加更多的workers來解決。

首先,同時運行兩個worker實例,他們都會從隊列中得到消息,但事實上是什么樣的呢?讓我們看一看:

在IDEA中運行兩次Worker.java,然后他們兩個都會處于等待消息狀態(tài)。運行NewTask.java,并攜帶命令行參數(shù),可以在Edit Configurations中設(shè)置Program arguements。下面為官方教程中的命令行版本:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....

主要觀察兩個worker的輸出:

worker1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.'[x] Received 'Third message...'[x] Received 'Fifth message.....' worker2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'

默認的,RabbitMQ將會按照順序,以此發(fā)送每一條消息到每一個消費者。平均每個消費者是可以獲得相同數(shù)量的消息的。這種分發(fā)消息的方式稱為循環(huán)。

消息確認

?完成一個任務(wù)需要消耗一定時間,你可能想知道如果一個消費者開始了一個很長的任務(wù),在僅僅完成了一部分的時候,死掉了,將會發(fā)生什么。在我們當前的代碼中,一旦RabbitMQ分發(fā)一條消息給消費者,立即就會將該條消息從內(nèi)存中刪除。這種情況下,如果你殺掉一個worker,我們將會丟失它正在操作的消息。我們也會失去所有分發(fā)給他的還未處理的消息。

但是我們不想丟失任何消息。如果worker死掉,我們期望這個任務(wù)被重新分發(fā)給另一個worker。

為了確保消息從來沒有丟失,RabbitMQ支持消息確認(acknowledgments)。一個確認是從消費者處發(fā)送以告訴RabbitMQ指定的消息收到了,處理完成了,RabbitMQ可以刪除它了。

如果一個消費者宕機(channel關(guān)閉,connection關(guān)閉,TCP連接丟失等),沒有發(fā)送ack,RabbitMQ將會知道這條消息沒有處理完成,將會重新排隊。如果此時存在其它消費者,將會迅速轉(zhuǎn)發(fā)給其它消費者。這樣你就可以確保消息不會丟失,即使進程偶爾宕機。

這里不存在消息超時,RabbitMQ在消費者宕機后會重發(fā)消息。即使處理數(shù)據(jù)用了很長很長的時間這也是沒有問題的。

默認的消息確認是被打開的。上面的例子中我們通過autoAck=true明確關(guān)閉了它。下面我們打開它,每當處理完一個任務(wù),就發(fā)送回一個適當?shù)拇_認消息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below) 每次接收一個未處理消息final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}} }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

使用現(xiàn)在的代碼,我們可以保證即使在操作消息的時候通過CTRL+C關(guān)閉了一個消費者,也不會丟失消息。不久后,所有未處理完成的消息都會被重新發(fā)送。

Forgotten acknowledgment

忘記設(shè)置basicAck是很普通的事情,但是結(jié)果卻很嚴重。當客戶端退出(這可能聽起來像隨機分發(fā))消息會被重新發(fā)送,但是RabbitMQ會吃掉越來越多的內(nèi)容,因為它不會釋放任何沒有被確認的消息。

調(diào)試這種錯誤的使用rabbitmqctl來打印messages_unacknowledged的部分:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.

消息持久化

我們學習了如何在消費者宕機的情況下保證數(shù)據(jù)不丟失。但是在RabbitMQ服務(wù)器宕機的情況下,數(shù)據(jù)依然是會丟失的。

當RabbitMQ退出或崩潰,它會忘記所有的隊列和消息,除非你告訴它不要。兩件事情來確保消息未丟失:我們需要標記隊列和消息為持久化的。

首先,我們需要確保RabbitMQ從來不會丟失隊列。因此我們需要聲明隊列為持久化的:

boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);

這行代碼是沒有問題的,但是在我們的環(huán)境下是錯誤的。這是因為我們已經(jīng)定義了一個叫做hello的非持久化隊列。RabbitMQ不允許重新定義已經(jīng)存在的隊列(使用不用參數(shù))。這里有一個快速的方法 - 定義一個不同名字的隊列,如task_queue:

boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);

這個queueDeclare需要同時更改生產(chǎn)者和消費者的代碼。

現(xiàn)在我們確保了task_queue在RabbitMQ重啟的狀態(tài)下也不會丟失?,F(xiàn)在我們需要去標記我們的消息為持久化的 - 通過設(shè)置MessageProperties(實現(xiàn)了BasicProperties)的常量值:PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意消息持久化:

標記了消息為持久化也不能完全保證消息不會丟失。盡管告訴了RabbitMQ將消息保存在磁盤中,RabbitMQ剛剛接收數(shù)據(jù),還沒有保存的時候,這個時間區(qū)間是無法持久化的。同事,RabbitMQ沒有對每條消息都進行fsync(2) -- 也許僅僅保存在緩存中并沒有真正寫入硬盤。持久化并不健壯,但是對于處理簡單的任務(wù)隊列已經(jīng)足夠了。如果你需要更加強健的保證可以使用publisher confirms

公平分發(fā)

你可能注意到有時候分發(fā)還是無法解決我們的某些問題。例如在某種情況下,有兩個消費者,當所有的奇數(shù)消息非常大,偶數(shù)消息很小,一個消費者將會持續(xù)不斷的工作,另一個消費者基本不工作。但是RabbitMQ并不知道這種情況,依然是依次分發(fā)。

這是因為RabbitMQ在消息進入隊列是進行分發(fā)。并不探查消息的數(shù)量。僅僅是發(fā)送第n條消息給第n個消費者。

為了解決這個問題,我們可以使用basicQos方法,設(shè)置參數(shù)為prefetchCount = 1。這會告訴RabbitMQ每次只給一個消費者一條消息?;蛘哒f,不要在消費者正在處理和確認消息的時候發(fā)送新的消息給他們。相反,它將分發(fā)消息給下一個不忙的消費者。

int prefetchCount = 1; channel.basicQos(prefetchCount);

注意隊列的大小

如果所有的消費者都處于繁忙狀態(tài),隊列會填滿??梢蕴砑痈嗟南M者或者其它方案。

Putting it all together

NewTask.java

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}private static String getMessage(String[] strings) {if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();} }

Worker.java

import com.rabbitmq.client.*;import java.io.IOException;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(TASK_QUEUE_NAME, false, consumer);}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}} }

?

轉(zhuǎn)載于:https://www.cnblogs.com/shiyu404/p/6251773.html

總結(jié)

以上是生活随笔為你收集整理的【RabbitMQ】 WorkQueues的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 成人免费看类便视频 | 中文字幕 欧美日韩 | 99热黄色 | 精品国产一区二区三区四区精华 | 91福利在线导航 | 久草色在线 | 欧美在线观看一区二区三区 | 国产精品99久久久久久久女警 | 五月激情婷婷丁香 | 欧美一区二区三区免费观看 | 4虎最新网址| 久久国产精品久久精品国产 | 波多野结衣二区三区 | 国产chinese中国hdxxxx | 美国少妇性做爰 | 免费黄色小网站 | 看特级毛片 | 成人影视在线看 | 一区不卡在线 | 无码av天堂一区二区三区 | 一级成人毛片 | 久久精品综合视频 | 亚洲一区二区国产精品 | 久久青娱乐 | www黄色com | 99久国产 | www.啪啪| 一区二区在线视频免费观看 | 国产成人精品无码免费看夜聊软件 | 欧洲久久久 | 在线免费观看高清视频 | 波多野结衣激情视频 | 美女十八毛片 | 狂野欧美性猛交xxxx巴西 | 欧美国产精品一二三 | 性调教学院高h学校 | 午夜视频观看 | 麻豆影视在线播放 | 国产毛片久久久久久 | 激情视频在线免费观看 | 美女视频在线免费观看 | 久久er99热精品一区二区介绍 | 91网入口 | 国内成人自拍视频 | 91超碰国产在线 | 91午夜视频| 亚洲av永久一区二区三区蜜桃 | h视频网站在线观看 | 国产精品福利片 | 成人做受黄大片 | 天堂在线成人 | 亚洲一区你懂的 | 国产成人久久777777 | 台湾佬美性中文娱乐网 | 九色影视 | 狠狠干in| 国产精品丝袜一区 | 国产大学生视频 | 欧日韩不卡在线视频 | 亚欧乱色 | av专区在线| 一卡二卡三卡四卡五卡 | 操人小视频 | 久久深夜 | 亚洲AV成人无码网站天堂久久 | 欧美日韩国产在线一区 | 日日爱99| japan高清日本乱xxxxx | 国产微拍精品一区 | 国产一区二区三区免费观看视频 | 美女毛片视频 | 国产精品99久久久久久宅男 | 亚洲av无码一区二区三区四区 | 日韩大片免费在线观看 | 免费99精品国产自在在线 | 99热国产在线 | 国精产品99永久一区一区 | 亚洲精品一区二三区 | 久久精品电影 | 亚洲国产精品影院 | 日韩福利视频一区 | 18视频在线观看网站 | 明日花绮罗高潮无打码 | 人妻少妇精品无码专区 | 毛片网站免费观看 | 办公室荡乳欲伦交换bd电影 | 亚洲AV无码精品黑人黑人 | 欧美成人乱码一二三四区免费 | 亚洲97色| 不卡日本 | 国产精品综合视频 | 性猛交娇小69hd | 91久久久久 | 亚洲综合在线一区 | 亚洲精品黄色 | 污片网址 | 精品亚洲永久免费精品 | 国产jzjzjz丝袜老师水多 | 欧美视频xxxx |