日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ指南(下)

發(fā)布時間:2025/3/21 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ指南(下) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
原文出處:?Listen

在上一小節(jié)中我們改進(jìn)了log系統(tǒng),由于使用fanout類型的exchange只能進(jìn)行全局的廣播,因此我們使用direct類型的exchange做了代替, 使得我們可以選擇性的接收消息。盡管使用fanout exchange改進(jìn)了log系統(tǒng),但它仍然有限制——不能基于多個條件做路由。

Topics

在log系統(tǒng)中可能不只是基于不同的日志級別作訂閱,也可能會基于日志的來源。你也許聽過Unix下名為syslog的工具, 它把日志按照嚴(yán)重級別(info/warn/crit…)和設(shè)備(auth/cron/ker…)進(jìn)行路由。

這會給我們許多的靈活性,也許我們只想監(jiān)聽’cron’中的’critical’級別的錯誤日志,以及所有’kern’中的日志。 為了實現(xiàn)這種日志系統(tǒng),我們需要學(xué)習(xí)一個更復(fù)雜的topic類型的exchange。

Topic exchange

發(fā)送到topic exchange中的消息不能有一個任意的routing_key——它必須是一個使用點分隔的單詞列表。單詞可以是任意的, 但是通常會指定消息的一些特定。一些有效的routing key例子:”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”。 routing key的長度限制為255個字節(jié)數(shù)。

binding key也必須是相同的形式。topic exchange背后的邏輯類似于direct——一條使用特定的routing key發(fā)送的消息將會被傳遞至所有使用與該routing key相同的binding key進(jìn)行綁定的隊列中。 然而,對binding key來說有兩種特殊的情況:

  • *(star)可以代替任意一個單詞
  • #(hash)可以代替0個或多個單詞
  • 使用一張圖可以很簡單地來說明:

    在圖中,我們將要發(fā)送被描述的動物的消息。消息的routing key將由三個單詞組成(通過兩個點分隔)。routing key中的第一個單詞將描述速度, 第二個是顏色,第三個是物種:"<speed>.<colour>.<species>"。

    我們創(chuàng)建三個綁定:Q1使用binding key"*.orange.*"來綁定,Q2使用"*.*.rabbit"以及l(fā)azy.#綁定。

    這些綁定可以被總結(jié)為:

    • Q1對所有橘色的的動物感興趣
    • Q2想要接收所有關(guān)于兔子的消息以及所有關(guān)于lazy的動物的消息

    一條使用routing key"quick.orange.rabbit"發(fā)送的消息將被同時傳遞到兩個隊列中。消息"lazy.orange.elephant"同樣如此。 另一方面,"quick.orange.fox"只會被第一個queue接收,"lazy.brown.fox"只會被第二個queue接收。?"lazy.pink.rabbit"只會被傳遞到Q2一次,即使它對兩個binding key都匹配。"quick.brown.fox"與兩個queue的binding key都不匹配, 因此將被丟棄。

    如果打破我們的約定,使用一個單詞或者四個單詞的routing key例如"orange","quick.orange.male.rabbit"發(fā)送消息將會發(fā)生什么? 這些消息不會匹配任何綁定,因此會丟失。

    但是對于"lazy.orange.male.rabbit",即使它有四個單詞,但是它與第二個queue的binding key匹配,因此將會被發(fā)送到第二個queue中。

    當(dāng)一個queue使用"#"(hash)作為binding key,那么它將會接收所有的消息,忽略routing key,就好像使用了fanout exchange。 當(dāng)特殊字符”*“(star)和”#“(hash)在綁定中沒有用到,topic exchange將會與direct exchange的行為相同。

    了解了topic exchange之后,我們將它用在我們的log系統(tǒng)中,我們定義的routing key將會有兩個單詞組成:"<facility>.<severity>"。

    完成的EmitLogTopic.java:

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class EmitLogTopic { ????private static final String EXCHANGE_NAME = "topic_logs"; ????public static void main(String[] argv) ??????????????????throws Exception { ????????ConnectionFactory factory = new ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection connection = factory.newConnection(); ????????Channel channel = connection.createChannel(); ????????channel.exchangeDeclare(EXCHANGE_NAME, "topic"); ????????String routingKey = getRouting(argv); ????????String message = getMessage(argv); ????????channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); ????????System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); ????????connection.close(); ????} ????//... }

    完整的ReceiveLogsTopic.java:

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class ReceiveLogsTopic { ??private static final String EXCHANGE_NAME = "topic_logs"; ??public static void main(String[] argv) throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????Connection connection = factory.newConnection(); ????Channel channel = connection.createChannel(); ????channel.exchangeDeclare(EXCHANGE_NAME, "topic"); ????String queueName = channel.queueDeclare().getQueue(); ????if (argv.length < 1) { ??????System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); ??????System.exit(1); ????} ????for (String bindingKey : argv) { ??????channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); ????} ????System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ????Consumer consumer = new DefaultConsumer(channel) { ??????@Override ??????public void handleDelivery(String consumerTag, Envelope envelope, ?????????????????????????????????AMQP.BasicProperties properties, byte[] body) throws IOException { ????????String message = new String(body, "UTF-8"); ????????System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); ??????} ????}; ????channel.basicConsume(queueName, true, consumer); ??} }

    運行的時候從命令行中輸入binding key來進(jìn)行綁定,接收不同的消息。

    Remote procedure call (RPC)

    在第二小節(jié)中我們學(xué)習(xí)了如何使用Work Queues來在多個workers中分發(fā)耗時的任務(wù)。但是如果我們需要調(diào)用遠(yuǎn)程計算機(jī)上的一個函數(shù)并等待結(jié)果返回呢? 這就是另外一個故事了。這種模式通常稱為遠(yuǎn)程過程調(diào)用或RPC。

    在這一小節(jié)我們將使用RabbitMQ來構(gòu)建一個RPC系統(tǒng):一個客戶端和一個可擴(kuò)展的RPC服務(wù)器。由于我們沒有實際的耗時任務(wù)用來分發(fā), 因此我們將創(chuàng)建一個虛擬的RPC服務(wù)返回Fibonacci數(shù)。

    Client interface

    為了說明RPC服務(wù)是如何使用的,我們將創(chuàng)建一個簡單的客戶端類。它將暴露一個名為call的方法發(fā)送一次RPC請求并且阻塞直到結(jié)果返回:

    1 2 3 FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);

    Callback queue

    使用RabbitMQ來進(jìn)行RPC是非常簡單的。客戶端發(fā)送一個請求到服務(wù)端,服務(wù)端接收后返回響應(yīng)的消息。為了接收到響應(yīng)的消息,我們需要在請求中發(fā)送一個callback 的queue地址。我們可以使用默認(rèn)的queue(在Java的client中它是exclusive的)。

    1 2 3 4 5 6 7 8 9 10 callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties ????????????????????????????.Builder() ????????????????????????????.replyTo(callbackQueueName) ????????????????????????????.build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...

    Message properties

    AMQP協(xié)議預(yù)定義了消息的14種屬性。大部分的都很少使用,除了以下這些:

    • deliveryMode:標(biāo)記一條消息是持久化的(使用值2)還是非持久化的(使用其它值)。在第二節(jié)中有過介紹。
    • contentType:用來描述mime類型的編碼。例如使用JSON的話就這樣設(shè)置屬性:application/json。
    • replyTo:一般用來命名一個回調(diào)queue。
    • correlationId:用來關(guān)聯(lián)RPC的請求和響應(yīng)。

    我們需要導(dǎo)入新的類:

    1 import com.rabbitmq.client.AMQP.BasicProperties;

    Correlation Id

    在之前的方法中我們建議為每個RPC請求創(chuàng)建一個回調(diào)queue。這顯得有點影響性能,幸運的是有一種更好的方式——每個客戶端只創(chuàng)建一個回調(diào)queue。 但這產(chǎn)生了一個新問題,無法將相應(yīng)的Response和Request對應(yīng)起來。這個時候就需要用到correlationId屬性。對于每個請求它都將有一個唯一的值。 當(dāng)我們在回調(diào)queue中接收到消息之后,檢查該屬性,看是否與Request匹配。如果是一個未知的correlationId值,那么我們可以安全的忽略這條消息, 因為它不屬于我們的請求。

    你也許會問,為什么我們應(yīng)該忽略回調(diào)queue中未知的消息而不是拋出異常?這是因為服務(wù)端可能會出現(xiàn)競爭條件。盡管不太常見,但是也有可能RPC server在發(fā)送響應(yīng)后掛了, 并且也沒有接收到客戶端發(fā)送的ack。如果發(fā)生了這種情況,RPC server在重啟后將會重新處理這個請求。這就是為什么在客戶端我們需要優(yōu)雅的處理重復(fù)的響應(yīng), RPC應(yīng)該是冪等的。

    Summary

    我們的RPC整個過程是這樣的:

  • 當(dāng)客戶端啟動,它創(chuàng)建一個匿名的并且是exclusive的回調(diào)queue。
  • 在一次RPC請求中,客戶端發(fā)送的消息有兩個屬性:replyTo,放置的是回調(diào)queue的信息。correlationId,放置的是每個請求唯一的值。
  • 請求被發(fā)送到一個rpc_queue中。
  • RPC服務(wù)端在queue的另一端等待請求。當(dāng)請求到來時,它處理任務(wù)并將消息的結(jié)果發(fā)送回客戶端,使用replyTo中設(shè)置的queue。
  • 客戶端在回調(diào)queue中等待響應(yīng)的數(shù)據(jù),當(dāng)消息出現(xiàn)時,它先檢查correlationId屬性。如果匹配的話就將結(jié)果返回到應(yīng)用中。
  • 最后來看一下完整的代碼實現(xiàn)。

    Fibonacci函數(shù):

    1 2 3 4 5 private static int fib(int n) throws Exception { ????if (n == 0) return 0; ????if (n == 1) return 1; ????return fib(n-1) + fib(n-2); }

    完整的RPCServer.java代碼

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { ????QueueingConsumer.Delivery delivery = consumer.nextDelivery(); ????BasicProperties props = delivery.getProperties(); ????BasicProperties replyProps = new BasicProperties ?????????????????????????????????????.Builder() ?????????????????????????????????????.correlationId(props.getCorrelationId()) ?????????????????????????????????????.build(); ????String message = new String(delivery.getBody()); ????int n = Integer.parseInt(message); ????System.out.println(" [.] fib(" + message + ")"); ????String response = "" + fib(n); ????channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); ????channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

    server端的代碼非常直觀:

    • 首先創(chuàng)建一個連接、channel和聲明一個queue。
    • 我們也許想要運行不止一個服務(wù)端進(jìn)程。為了在多個server間做到負(fù)載均衡,通過channel.basicQos設(shè)置prefetchCount。
    • 我們使用basicConsume來進(jìn)入queue。然后使用無限循環(huán)來等待請求的消息,處理之后再返回響應(yīng)。

    完整的RPCClient.java代碼

    1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ????ConnectionFactory factory = new ConnectionFactory(); ????factory.setHost("localhost"); ????connection = factory.newConnection(); ????channel = connection.createChannel(); ????replyQueueName = channel.queueDeclare().getQueue(); ????consumer = new QueueingConsumer(channel); ????channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { ????String response = null; ????String corrId = java.util.UUID.randomUUID().toString(); ????BasicProperties props = new BasicProperties ????????????????????????????????.Builder() ????????????????????????????????.correlationId(corrId) ????????????????????????????????.replyTo(replyQueueName) ????????????????????????????????.build(); ????channel.basicPublish("", requestQueueName, props, message.getBytes()); ????while (true) { ????????QueueingConsumer.Delivery delivery = consumer.nextDelivery(); ????????if (delivery.getProperties().getCorrelationId().equals(corrId)) { ????????????response = new String(delivery.getBody()); ????????????break; ????????} ????} ????return response; } public void close() throws Exception { ????connection.close(); }

    客戶端代碼有一點點的復(fù)雜:

    • 我們創(chuàng)建連接和channel,以及聲明一個exclusive的回調(diào)queue用來接收響應(yīng)的消息。
    • 訂閱回調(diào)queue,這樣就可以接收到RPC服務(wù)端響應(yīng)的消息。
    • call方法發(fā)出一個RPC請求。
    • 我們首先生成一個唯一的correlationId數(shù)字并且保存它——在while循環(huán)中使用它來匹配相應(yīng)的response。
    • 下一步,發(fā)送請求的消息,使用兩個屬性:replyTo和correlationId。
    • 之后就是等待響應(yīng)的消息返回。
    • 在while循環(huán)中做了一些簡單的工作,檢查響應(yīng)的消息的correlationId是否與Request相匹配。如果是的話,則保存響應(yīng)。
    • 最終向用戶返回響應(yīng)。

    發(fā)送客戶端請求:

    1 2 3 4 5 6 7 RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();

    這樣就通過RabbitMQ簡單的實現(xiàn)了RPC的通信。


    from:?http://www.importnew.com/24329.html

    總結(jié)

    以上是生活随笔為你收集整理的RabbitMQ指南(下)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。