RabbitMQ指南(上)
RabbitMQ是一個消息中間件,在一些需要異步處理、發(fā)布/訂閱等場景的時候,使用RabbitMQ可以完成我們的需求。 下面是我在學習RabbitMQ的過程中的一些記錄,內容主要翻譯自RabbitMQ官網的Tutorials, 再加上我的一些個人理解。我將會用三篇文章來從RabbitMQ的Hello World介紹起,到最后的通過RabbitMQ實現RPC調用, 相信看完這三篇文章大家應該會對RabbitMQ的基本概念和使用有一定的了解。
說明:
Hello World
首先需要安裝RabbitMQ,關于RabbitMQ的安裝這里就不贅述了,可以到RabbitMQ的官網去看相應的OS的安裝方法。 安裝完成后使用rabbitmq-server即可啟動RabbitMQ,RabbitMQ還提供了一個UI管理界面,本地默認的地址為localhost:15672, 用戶名和密碼均為guest。
安裝完成之后,按照慣例,先來完成一個簡單的Hello World的例子。 最簡單的一種消息發(fā)送的模型為一個消息發(fā)送者(Producer)將消息發(fā)送到Queue中,另一端的消息接受者(Consumer)從Queue中接受消息, 大致模型如下圖所示:
先來看發(fā)送的代碼,新建一個類命名為Send.java,代碼的第一步為連接server
| 1 2 3 4 | ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); |
connection抽象了socket的連接,并且為我們處理了協議版本的協商、權限認證等等。這里我們連接的是本地的中間件, 也就是localhost,接下來我們創(chuàng)建一個channel,這是大多數API完成任務的所在,也就是說我們的API操作基本都是通過channel來完成的。
| 1 2 3 4 | channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); |
首先是通過channel來聲明一個queue,并且聲明queue的操作是冪等的,也即是說只有在這個queue不存在的情況下才會新創(chuàng)建一個queue。 這里發(fā)送一個Hello World!的消息,實際傳遞的消息內容為字節(jié)數組。
| 1 2 | channel.close(); connection.close(); |
最后關閉channel和connection的連接,注意關閉的順序,是先關閉channel的連接,再關閉connection的連接。
完整的Send.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public class Send { ????private static final String QUEUE_NAME = "hello"; ????public static void main(String[] args) { ????????ConnectionFactory factory = new ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection connection = factory.newConnection(); ????????Channel channel = connection.createChannel(); ????????channel.queueDeclare(QUEUE_NAME, false, false, false, null); ????????String message = "Hello World!"; ????????channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); ????????System.out.println(" [x] Sent '" + message + "'"); ????????channel.close(); ????????connection.close(); ????} } |
完成發(fā)送的代碼之后是接受消息的代碼,新建一個類為Recv.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class Recv { ????private final static String QUEUE_NAME = "hello"; ????public static void main(String[] argv) throws Exception { ??????ConnectionFactory factory = new ConnectionFactory(); ??????factory.setHost("localhost"); ??????Connection connection = factory.newConnection(); ??????Channel channel = connection.createChannel(); ??????channel.queueDeclare(QUEUE_NAME, false, false, false, null); ??????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ??????Consumer consumer = new DefaultConsumer(channel) { ????????@Override ????????public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) ????????????throws IOException { ??????????String message = new String(body, "UTF-8"); ??????????System.out.println(" [x] Received '" + message + "'"); ????????} ??????}; ??????channel.basicConsume(QUEUE_NAME, true, consumer); ????} } |
可以發(fā)現一開始的連接部分的代碼是相同的,在接收的時候我們也要聲明一個queue,注意這里queue的名稱和之前發(fā)送消息聲明的queue的名稱必須是相同的, 否則就收不到消息了。
DefaultConsumer類實現了Consumer接口,由于發(fā)送消息是異步的,因此在這里提供了一個callback來緩沖消息, 直到我們準備使用這些消息,最后分別運行Send.java和Recv.java,就能看到Hello World!消息了。
Work Queues
在第一部分的Hello World中通過一個命名的queue來傳遞消息,在這一部分,我們會創(chuàng)建Work Queue來將耗時的任務分發(fā)至多個worker。 假設一個消息就是一個耗時的任務,比如文件I/O等等,那么可以通過幾個worker來共同完成這些工作。
在Web應用中這是非常有用的,因為在一次非常短的HTTP請求窗口中完成一個非常復雜的任務是很困難的。
準備
這一部分是建立在上一部分Hello World的基礎之上的,我們將發(fā)送字符串來表示一些復雜的任務, 由于并沒有一些真實的復雜的工作,因此使用Thread.sleep()來模擬這是一個很耗時的任務, 并且在發(fā)送的字符串當中含有一個點號就表示這個任務需要耗時1秒,比如發(fā)送Hello...表示將要耗時3秒。
在前一部分的Send.java的基礎上做一些修改,得到一個新的類稱為NewTask.java。
| 1 2 3 4 | String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); |
getMessage方法為從命令行中獲取參數
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | private static String getMessage(String[] strings){ ????if (strings.length < 1) ????????return "Hello World!"; ????return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { ????int length = strings.length; ????if (length == 0) return ""; ????StringBuilder words = new StringBuilder(strings[0]); ????for (int i = 1; i < length; i++) { ????????words.append(delimiter).append(strings[i]); ????} ????return words.toString(); } |
我們之前的Recv.java也需要做一些變化,它需要模擬一些耗時的任務,消息內容中一個.表示1秒,并且它會處理消息, 我們稱它為Worker.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | final Consumer consumer = new DefaultConsumer(channel) { ??@Override ??public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ????String message = new String(body, "UTF-8"); ????System.out.println(" [x] Received '" + message + "'"); ????try { ??????doWork(message); ????} finally { ??????System.out.println(" [x] Done"); ????} ??} }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); |
這里有一個autoAck變量的作用在后面會提到。doWork方法就是模擬的耗時任務
| 1 2 3 4 5 | private static void doWork(String task) throws InterruptedException { ????for (char ch: task.toCharArray()) { ????????if (ch == '.') Thread.sleep(1000); ????} } |
循環(huán)發(fā)送
使用任務隊列其中的一個好處是可以非常方便的并行處理這些任務。如果我們在處理一些積壓的工作, 只需要增加更多的worker即可,非常容易擴展。
首先,來試試兩個worker實例的情況。很顯然,兩個worker都會接受到消息,但是具體的情況是怎么樣的呢? 我們在控制臺啟動兩個實例,C1和C2表示兩個consumer,然后使用Producer來發(fā)送消息,一共發(fā)送五條消息,來看看具體的情況。
首先是第一個worker打印出的消息
| 1 2 3 4 | [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....' |
第二個worker打印出的消息
| 1 2 3 | [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....' |
默認的,RabbitMQ會順序的把消息發(fā)送到下一個Consumer,上面打印出的消息也印證了這一點。 平均來說每個Consumer接收到的消息數量是相同的,這種發(fā)送消息的方式稱為循環(huán)發(fā)送(round-robin), 思考下有三個或者更多Worker的情況。
消息接收(Message acknowledgment)
處理一個任務需要耗費幾秒鐘的時間。你也許想知道如果一個consumer在處理一個任務的時候只處理了一部分就掛了會出現什么情況。 在我們現在的代碼下,一旦RabbitMQ將一個消息傳遞到consumer,它馬上會從內存中刪除這條消息, 也就是說如果殺掉了一個正在處理任務的worker,那么將會失去所有的這個worker正在處理的所有消息, 同樣也會失去發(fā)送給這個worker但是還未處理的消息。
一般情況下,我們不希望丟失消息,如果某個worker掛了,能將消息發(fā)送給另一個worker來處理。 為了確保消息不會丟失,RabbitMQ支持消息接收(message acknowledgments)。 當consumer確認收到某個消息,并且已經處理完成,RabbitMQ可以刪除它時,consumer會向RabbitMQ發(fā)送一個ack(nowledgement)。
如果一個consumer掛了(channel關閉了、connection關閉了或者TCP連接斷了)而沒有發(fā)送ack,RabbitMQ就會知道這個消息沒有被完全處理, 將會對這條消息做re-queue處理。如果此時有另一個consumer連接,消息會被重新發(fā)送至另一個consumer。 使用這種方式可以保證消息不會丟失。
消息不會超時;RabbitMQ會在consumer掛了之后重新發(fā)送消息。即使處理消息耗時非常長也是沒有問題的。
消息接收是默認開啟的,在之前的例子中我們通過autoAck=true標志顯式的關閉了它,true則表示自動接收,不需要發(fā)送ack。 現在是時候來開啟ack。當consumer處理完成之后,向rabbitMQ發(fā)送ack。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { ??@Override ??public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ????String message = new String(body, "UTF-8"); ????System.out.println(" [x] Received '" + message + "'"); ????try { ??????doWork(message); ????} finally { ??????System.out.println(" [x] Done"); ??????channel.basicAck(envelope.getDeliveryTag(), false); ????} ??} }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); |
使用以上代碼能夠保證即使在一個worker處使消息的時候用CTRL+C來殺掉這個worker,也不會丟失消息。 在這個worker掛掉之后所有未接收(ack)的消息將被重新發(fā)送。
消息持久化
我們學習了如何在worker掛掉的情況下不丟消息,但是在RabbitMQ server停止之后消息還是會丟失。 如果不進行任何配置,在RabbitMQ退出或崩潰的時候,將會失去所有的queue和消息。 要保證在這種情況下消息不丟失需要做兩件事情:需要同時標志queue和message是持久化的。
首先,需要確保RabbitMQ不會丟失queue
| 1 2 | boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null); |
我們重新聲明一個queue(不能修改已經聲明為不持久化的queue為持久化),名字為task_queue, 第二個布爾參數表示是否持久化的意思,這里設置為true, 包括consumer和producer聲明queue的時候都需要聲明durable為true。現在,即使重啟RabbitMQ,task_queue這個queue也不會丟失了。
接下來我們將消息做持久化配置處理,通過設置MessageProperties(實現了BasicProperties)中的PERSISTENT_TEXT_PLAIN屬性。
| 1 2 3 | channel.basicPublish("", "task_queue", ????????????MessageProperties.PERSISTENT_TEXT_PLAIN, ????????????message.getBytes()); |
公平分發(fā)(Fair dispatch)
在某種場景下有兩個worker,當所有奇數的消息處理起來都比較耗時,而偶數的消息處理起來都比較快, 這就會發(fā)生一個worker總是處于busy狀態(tài),而另一個worker則總是處于空閑狀態(tài),RabbitMQ并不知道這個情況, 仍然只是正常的發(fā)送消息。
出現這種情況的原因在于當消息在queue中的時候RabbitMQ只是發(fā)送這些消息而已,它不會去關注某個consumer未ack的消息的數量, 它只是盲目的將某個消息發(fā)送到某個consumer。
為了處理這種情況我們可以使用basicQos方法來設置prefetchCount = 1。 這告訴RabbitMQy一次只給worker一條消息,換句話來說,就是直到worker發(fā)回ack,然后再向這個worker發(fā)送下一條消息。
| 1 2 | int prefetchCount = 1; channel.basicQos(prefetchCount); |
完整的NewTask.java代碼
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public class NewTask { ??private static final String TASK_QUEUE_NAME = "task_queue"; ??public static void main(String[] argv) ??????????????????????throws java.io.IOException { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????Connection connection = factory.newConnection(); ????Channel channel = connection.createChannel(); ????channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); ????String message = getMessage(argv); ????channel.basicPublish( "", TASK_QUEUE_NAME, ????????????MessageProperties.PERSISTENT_TEXT_PLAIN, ????????????message.getBytes()); ????System.out.println(" [x] Sent '" + message + "'"); ????channel.close(); ????connection.close(); ??}????? ??//... } |
Worker.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | public class Worker { ??private static final String TASK_QUEUE_NAME = "task_queue"; ??public static void main(String[] argv) throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????final Connection connection = factory.newConnection(); ????final Channel channel = connection.createChannel(); ????channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); ????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ????channel.basicQos(1); ????final Consumer consumer = new DefaultConsumer(channel) { ??????@Override ??????public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ????????String message = new String(body, "UTF-8"); ????????System.out.println(" [x] Received '" + message + "'"); ????????try { ??????????doWork(message); ????????} finally { ??????????System.out.println(" [x] Done"); ??????????channel.basicAck(envelope.getDeliveryTag(), false); ????????} ??????} ????}; ????boolean autoAck = false; ????channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); ??} ??private static void doWork(String task) { ????for (char ch : task.toCharArray()) { ??????if (ch == '.') { ????????try { ??????????Thread.sleep(1000); ????????} catch (InterruptedException _ignored) { ??????????Thread.currentThread().interrupt(); ????????} ??????} ????} ??} } |
可以運行上面兩個類來驗證這一小節(jié)的所有內容。
from:?http://www.importnew.com/24319.html
總結
以上是生活随笔為你收集整理的RabbitMQ指南(上)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Comparable与Comparato
- 下一篇: RabbitMQ指南(中)