日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ指南(上)

發(fā)布時間:2025/3/21 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ指南(上) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原文出處:?Listen

RabbitMQ是一個消息中間件,在一些需要異步處理、發(fā)布/訂閱等場景的時候,使用RabbitMQ可以完成我們的需求。 下面是我在學習RabbitMQ的過程中的一些記錄,內容主要翻譯自RabbitMQ官網的Tutorials, 再加上我的一些個人理解。我將會用三篇文章來從RabbitMQ的Hello World介紹起,到最后的通過RabbitMQ實現RPC調用, 相信看完這三篇文章大家應該會對RabbitMQ的基本概念和使用有一定的了解。

說明:

  • 由于RabbitMQ支持許多種語言的client,在這里我使用的是Java語言的client。
  • 所有的圖片均來自RabbitMQ官網。
  • Hello World

    首先需要安裝RabbitMQ,關于RabbitMQ的安裝這里就不贅述了,可以到RabbitMQ的官網去看相應的OS的安裝方法。 安裝完成后使用rabbitmq-server即可啟動RabbitMQ,RabbitMQ還提供了一個UI管理界面,本地默認的地址為localhost:15672, 用戶名和密碼均為guest。

    安裝完成之后,按照慣例,先來完成一個簡單的Hello World的例子。 最簡單的一種消息發(fā)送的模型為一個消息發(fā)送者(Producer)將消息發(fā)送到Queue中,另一端的消息接受者(Consumer)從Queue中接受消息, 大致模型如下圖所示:

    先來看發(fā)送的代碼,新建一個類命名為Send.java,代碼的第一步為連接server

    1 2 3 4 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();

    connection抽象了socket的連接,并且為我們處理了協議版本的協商、權限認證等等。這里我們連接的是本地的中間件, 也就是localhost,接下來我們創(chuàng)建一個channel,這是大多數API完成任務的所在,也就是說我們的API操作基本都是通過channel來完成的。

    1 2 3 4 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

    首先是通過channel來聲明一個queue,并且聲明queue的操作是冪等的,也即是說只有在這個queue不存在的情況下才會新創(chuàng)建一個queue。 這里發(fā)送一個Hello World!的消息,實際傳遞的消息內容為字節(jié)數組。

    1 2 channel.close(); connection.close();

    最后關閉channel和connection的連接,注意關閉的順序,是先關閉channel的連接,再關閉connection的連接。

    完整的Send.java代碼

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Send { ????private static final String QUEUE_NAME = "hello"; ????public static void main(String[] args) { ????????ConnectionFactory factory = new ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection connection = factory.newConnection(); ????????Channel channel = connection.createChannel(); ????????channel.queueDeclare(QUEUE_NAME, false, false, false, null); ????????String message = "Hello World!"; ????????channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); ????????System.out.println(" [x] Sent '" + message + "'"); ????????channel.close(); ????????connection.close(); ????} }

    完成發(fā)送的代碼之后是接受消息的代碼,新建一個類為Recv.java

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class Recv { ????private final static String QUEUE_NAME = "hello"; ????public static void main(String[] argv) throws Exception { ??????ConnectionFactory factory = new ConnectionFactory(); ??????factory.setHost("localhost"); ??????Connection connection = factory.newConnection(); ??????Channel channel = connection.createChannel(); ??????channel.queueDeclare(QUEUE_NAME, false, false, false, null); ??????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ??????Consumer consumer = new DefaultConsumer(channel) { ????????@Override ????????public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) ????????????throws IOException { ??????????String message = new String(body, "UTF-8"); ??????????System.out.println(" [x] Received '" + message + "'"); ????????} ??????}; ??????channel.basicConsume(QUEUE_NAME, true, consumer); ????} }

    可以發(fā)現一開始的連接部分的代碼是相同的,在接收的時候我們也要聲明一個queue,注意這里queue的名稱和之前發(fā)送消息聲明的queue的名稱必須是相同的, 否則就收不到消息了。

    DefaultConsumer類實現了Consumer接口,由于發(fā)送消息是異步的,因此在這里提供了一個callback來緩沖消息, 直到我們準備使用這些消息,最后分別運行Send.java和Recv.java,就能看到Hello World!消息了。

    Work Queues

    在第一部分的Hello World中通過一個命名的queue來傳遞消息,在這一部分,我們會創(chuàng)建Work Queue來將耗時的任務分發(fā)至多個worker。 假設一個消息就是一個耗時的任務,比如文件I/O等等,那么可以通過幾個worker來共同完成這些工作。

    在Web應用中這是非常有用的,因為在一次非常短的HTTP請求窗口中完成一個非常復雜的任務是很困難的。

    準備

    這一部分是建立在上一部分Hello World的基礎之上的,我們將發(fā)送字符串來表示一些復雜的任務, 由于并沒有一些真實的復雜的工作,因此使用Thread.sleep()來模擬這是一個很耗時的任務, 并且在發(fā)送的字符串當中含有一個點號就表示這個任務需要耗時1秒,比如發(fā)送Hello...表示將要耗時3秒。

    在前一部分的Send.java的基礎上做一些修改,得到一個新的類稱為NewTask.java。

    1 2 3 4 String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

    getMessage方法為從命令行中獲取參數

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static String getMessage(String[] strings){ ????if (strings.length < 1) ????????return "Hello World!"; ????return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { ????int length = strings.length; ????if (length == 0) return ""; ????StringBuilder words = new StringBuilder(strings[0]); ????for (int i = 1; i < length; i++) { ????????words.append(delimiter).append(strings[i]); ????} ????return words.toString(); }

    我們之前的Recv.java也需要做一些變化,它需要模擬一些耗時的任務,消息內容中一個.表示1秒,并且它會處理消息, 我們稱它為Worker.java

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final Consumer consumer = new DefaultConsumer(channel) { ??@Override ??public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ????String message = new String(body, "UTF-8"); ????System.out.println(" [x] Received '" + message + "'"); ????try { ??????doWork(message); ????} finally { ??????System.out.println(" [x] Done"); ????} ??} }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

    這里有一個autoAck變量的作用在后面會提到。doWork方法就是模擬的耗時任務

    1 2 3 4 5 private static void doWork(String task) throws InterruptedException { ????for (char ch: task.toCharArray()) { ????????if (ch == '.') Thread.sleep(1000); ????} }

    循環(huán)發(fā)送

    使用任務隊列其中的一個好處是可以非常方便的并行處理這些任務。如果我們在處理一些積壓的工作, 只需要增加更多的worker即可,非常容易擴展。

    首先,來試試兩個worker實例的情況。很顯然,兩個worker都會接受到消息,但是具體的情況是怎么樣的呢? 我們在控制臺啟動兩個實例,C1和C2表示兩個consumer,然后使用Producer來發(fā)送消息,一共發(fā)送五條消息,來看看具體的情況。

    首先是第一個worker打印出的消息

    1 2 3 4 [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'

    第二個worker打印出的消息

    1 2 3 [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'

    默認的,RabbitMQ會順序的把消息發(fā)送到下一個Consumer,上面打印出的消息也印證了這一點。 平均來說每個Consumer接收到的消息數量是相同的,這種發(fā)送消息的方式稱為循環(huán)發(fā)送(round-robin), 思考下有三個或者更多Worker的情況。

    消息接收(Message acknowledgment)

    處理一個任務需要耗費幾秒鐘的時間。你也許想知道如果一個consumer在處理一個任務的時候只處理了一部分就掛了會出現什么情況。 在我們現在的代碼下,一旦RabbitMQ將一個消息傳遞到consumer,它馬上會從內存中刪除這條消息, 也就是說如果殺掉了一個正在處理任務的worker,那么將會失去所有的這個worker正在處理的所有消息, 同樣也會失去發(fā)送給這個worker但是還未處理的消息。

    一般情況下,我們不希望丟失消息,如果某個worker掛了,能將消息發(fā)送給另一個worker來處理。 為了確保消息不會丟失,RabbitMQ支持消息接收(message acknowledgments)。 當consumer確認收到某個消息,并且已經處理完成,RabbitMQ可以刪除它時,consumer會向RabbitMQ發(fā)送一個ack(nowledgement)。

    如果一個consumer掛了(channel關閉了、connection關閉了或者TCP連接斷了)而沒有發(fā)送ack,RabbitMQ就會知道這個消息沒有被完全處理, 將會對這條消息做re-queue處理。如果此時有另一個consumer連接,消息會被重新發(fā)送至另一個consumer。 使用這種方式可以保證消息不會丟失。

    消息不會超時;RabbitMQ會在consumer掛了之后重新發(fā)送消息。即使處理消息耗時非常長也是沒有問題的。

    消息接收是默認開啟的,在之前的例子中我們通過autoAck=true標志顯式的關閉了它,true則表示自動接收,不需要發(fā)送ack。 現在是時候來開啟ack。當consumer處理完成之后,向rabbitMQ發(fā)送ack。

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { ??@Override ??public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ????String message = new String(body, "UTF-8"); ????System.out.println(" [x] Received '" + message + "'"); ????try { ??????doWork(message); ????} finally { ??????System.out.println(" [x] Done"); ??????channel.basicAck(envelope.getDeliveryTag(), false); ????} ??} }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

    使用以上代碼能夠保證即使在一個worker處使消息的時候用CTRL+C來殺掉這個worker,也不會丟失消息。 在這個worker掛掉之后所有未接收(ack)的消息將被重新發(fā)送。

    消息持久化

    我們學習了如何在worker掛掉的情況下不丟消息,但是在RabbitMQ server停止之后消息還是會丟失。 如果不進行任何配置,在RabbitMQ退出或崩潰的時候,將會失去所有的queue和消息。 要保證在這種情況下消息不丟失需要做兩件事情:需要同時標志queue和message是持久化的。

    首先,需要確保RabbitMQ不會丟失queue

    1 2 boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);

    我們重新聲明一個queue(不能修改已經聲明為不持久化的queue為持久化),名字為task_queue, 第二個布爾參數表示是否持久化的意思,這里設置為true, 包括consumer和producer聲明queue的時候都需要聲明durable為true。現在,即使重啟RabbitMQ,task_queue這個queue也不會丟失了。

    接下來我們將消息做持久化配置處理,通過設置MessageProperties(實現了BasicProperties)中的PERSISTENT_TEXT_PLAIN屬性。

    1 2 3 channel.basicPublish("", "task_queue", ????????????MessageProperties.PERSISTENT_TEXT_PLAIN, ????????????message.getBytes());

    公平分發(fā)(Fair dispatch)

    在某種場景下有兩個worker,當所有奇數的消息處理起來都比較耗時,而偶數的消息處理起來都比較快, 這就會發(fā)生一個worker總是處于busy狀態(tài),而另一個worker則總是處于空閑狀態(tài),RabbitMQ并不知道這個情況, 仍然只是正常的發(fā)送消息。

    出現這種情況的原因在于當消息在queue中的時候RabbitMQ只是發(fā)送這些消息而已,它不會去關注某個consumer未ack的消息的數量, 它只是盲目的將某個消息發(fā)送到某個consumer。

    為了處理這種情況我們可以使用basicQos方法來設置prefetchCount = 1。 這告訴RabbitMQy一次只給worker一條消息,換句話來說,就是直到worker發(fā)回ack,然后再向這個worker發(fā)送下一條消息。

    1 2 int prefetchCount = 1; channel.basicQos(prefetchCount);

    完整的NewTask.java代碼

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class NewTask { ??private static final String TASK_QUEUE_NAME = "task_queue"; ??public static void main(String[] argv) ??????????????????????throws java.io.IOException { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????Connection connection = factory.newConnection(); ????Channel channel = connection.createChannel(); ????channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); ????String message = getMessage(argv); ????channel.basicPublish( "", TASK_QUEUE_NAME, ????????????MessageProperties.PERSISTENT_TEXT_PLAIN, ????????????message.getBytes()); ????System.out.println(" [x] Sent '" + message + "'"); ????channel.close(); ????connection.close(); ??}????? ??//... }

    Worker.java

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class Worker { ??private static final String TASK_QUEUE_NAME = "task_queue"; ??public static void main(String[] argv) throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????final Connection connection = factory.newConnection(); ????final Channel channel = connection.createChannel(); ????channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); ????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ????channel.basicQos(1); ????final Consumer consumer = new DefaultConsumer(channel) { ??????@Override ??????public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ????????String message = new String(body, "UTF-8"); ????????System.out.println(" [x] Received '" + message + "'"); ????????try { ??????????doWork(message); ????????} finally { ??????????System.out.println(" [x] Done"); ??????????channel.basicAck(envelope.getDeliveryTag(), false); ????????} ??????} ????}; ????boolean autoAck = false; ????channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); ??} ??private static void doWork(String task) { ????for (char ch : task.toCharArray()) { ??????if (ch == '.') { ????????try { ??????????Thread.sleep(1000); ????????} catch (InterruptedException _ignored) { ??????????Thread.currentThread().interrupt(); ????????} ??????} ????} ??} }

    可以運行上面兩個類來驗證這一小節(jié)的所有內容。


    from:?http://www.importnew.com/24319.html

    總結

    以上是生活随笔為你收集整理的RabbitMQ指南(上)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。