RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统
博客翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環(huán)分發(fā)、消息確認(rèn)、持久化、公平分發(fā)
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調(diào)隊(duì)列callback queue、關(guān)聯(lián)標(biāo)識(shí)correlation id、實(shí)現(xiàn)簡(jiǎn)單的RPC系統(tǒng)
RabbitMQ(七):常用方法說(shuō)明 與 學(xué)習(xí)小結(jié)
遠(yuǎn)程過(guò)程調(diào)用(RPC):
在第二篇博客中,我們學(xué)會(huì)了如何使用工作隊(duì)列將耗時(shí)的任務(wù)分發(fā)給多個(gè)工作者。但假如我們想調(diào)用遠(yuǎn)程電腦上的一個(gè)函數(shù)(或方法)并等待函數(shù)執(zhí)行的結(jié)果,這時(shí)候該怎么辦呢?好吧,這是一個(gè)不同的故事。這種模式通常稱為遠(yuǎn)程過(guò)程調(diào)用RPC(Remote Procedure Call)。
在今天的教程中,我們將會(huì)使用RabbitMQ來(lái)建立一個(gè)RPC系統(tǒng):一個(gè)客戶端和一個(gè)可擴(kuò)展的RPC服務(wù)端。因?yàn)槲覀儧](méi)有任何現(xiàn)成的耗時(shí)任務(wù),我們將會(huì)創(chuàng)建一個(gè)假的RPC服務(wù),它將返回斐波那契數(shù)(Fibonacci numbers)。
客戶端接口(Client interface):
為了演示如何使用RPC服務(wù),我們將創(chuàng)建一個(gè)簡(jiǎn)單的客戶端類。它負(fù)責(zé)暴露一個(gè)名為call的方法,該方法將發(fā)送一個(gè)RPC請(qǐng)求并阻塞,直到接收到回答。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);回調(diào)隊(duì)列(Callback queue):
使用RabbitMQ來(lái)做RPC很容易。客戶端發(fā)送一個(gè)請(qǐng)求消息,服務(wù)端以一個(gè)響應(yīng)消息回應(yīng)。為了可以接收到響應(yīng),需要與請(qǐng)求(消息)一起,發(fā)送一個(gè)回調(diào)的隊(duì)列。我們使用默認(rèn)的隊(duì)列(Java獨(dú)有的):
callbackQueueName = channel.queueDeclare().getQueue();BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish("", "rpc_queue", props, message.getBytes());// ... then code to read a response message from the callback_queue ...消息屬性
AMQP 0-9-1協(xié)議預(yù)定義了消息的14種屬性。大部分屬性都很少用到,除了下面的幾種:
- ① deliveryMode:標(biāo)記一個(gè)消息是持久的(值為2)還是短暫的(2以外的任何值),你可能還記得我們的第二個(gè)教程中用到過(guò)這個(gè)屬性。
- ② contentType:描述編碼的mime-type(mime-type of the encoding)。比如最常使用JSON格式,就可以將該屬性設(shè)置為application/json。
- ③?replyTo:通常用來(lái)命名一個(gè)回調(diào)隊(duì)列。
- ④?correlationId:用來(lái)關(guān)聯(lián)RPC的響應(yīng)和請(qǐng)求。
我們需要引入一個(gè)新的類:
import com.rabbitmq.client.AMQP.BasicProperties;關(guān)聯(lián)標(biāo)識(shí)(Correlation Id):
在上面的方法中,我們?yōu)槊恳粋€(gè)RPC請(qǐng)求都創(chuàng)建了一個(gè)新的回調(diào)隊(duì)列。這樣做顯然很低效,但幸好我們有更好的方式:讓我們?yōu)?span style="color:#f33b45;">每一個(gè)客戶端創(chuàng)建一個(gè)回調(diào)隊(duì)列。
這樣做又引入了一個(gè)新的問(wèn)題,在回調(diào)隊(duì)列中收到響應(yīng)后不知道到底是屬于哪個(gè)請(qǐng)求的。這時(shí)候,CorrelationId就可以派上用場(chǎng)了。對(duì)每一個(gè)請(qǐng)求,我們都創(chuàng)建一個(gè)唯一性的值作為CorrelationId。之后,當(dāng)我們從回調(diào)隊(duì)列中收到消息的時(shí)候,就可以查找這個(gè)屬性,基于這一點(diǎn),我們就可以將一個(gè)響應(yīng)和一個(gè)請(qǐng)求進(jìn)行關(guān)聯(lián)。如果我們看到一個(gè)不知道的?CorrelationId值,我們就可以安全地丟棄該消息,因?yàn)樗粚儆谖覀兊恼?qǐng)求。
你可能會(huì)問(wèn),為什么要忽視回調(diào)隊(duì)列中的不知道的消息,而不是直接以一個(gè)錯(cuò)誤失敗(failing with an error)。這是由于服務(wù)端可能存在的競(jìng)爭(zhēng)條件。盡管不會(huì),但這種情況仍有可能發(fā)生:RPC服務(wù)端在發(fā)給我們答案之后就掛掉了,還沒(méi)來(lái)得及為請(qǐng)求發(fā)送一個(gè)確認(rèn)信息。如果發(fā)生這種情況,重啟后的RPC服務(wù)端將會(huì)重新處理該請(qǐng)求(因?yàn)闆](méi)有給RabbitMQ發(fā)送確認(rèn)消息,RabbitMQ會(huì)重新發(fā)送消息給RPC服務(wù))。這就是為什么我們要在客戶端優(yōu)雅地處理重復(fù)響應(yīng),并且理想情況下,RPC服務(wù)要是冪等的。
總結(jié):
我們的RPC系統(tǒng)的工作流程如下:
當(dāng)客戶端啟動(dòng)后,它會(huì)創(chuàng)建一個(gè)異步的獨(dú)特的回調(diào)隊(duì)列。對(duì)于一個(gè)RPC請(qǐng)求,客戶端將會(huì)發(fā)送一個(gè)配置了兩個(gè)屬性的消息:一個(gè)是replyTo屬性,設(shè)置為這個(gè)回調(diào)隊(duì)列;另一個(gè)是correlation id屬性,每一個(gè)請(qǐng)求都會(huì)設(shè)置為一個(gè)具有唯一性的值。這個(gè)請(qǐng)求將會(huì)發(fā)送到rpc_queue隊(duì)列。
RPC工作者(即圖中的server)將會(huì)等待rpc_queue隊(duì)列的請(qǐng)求。當(dāng)有請(qǐng)求到來(lái)時(shí),它就會(huì)開(kāi)始干活(計(jì)算斐波那契數(shù))并將結(jié)果通過(guò)發(fā)送消息來(lái)返回,該返回消息發(fā)送到replyTo指定的隊(duì)列。
客戶端將等待回調(diào)隊(duì)列返回?cái)?shù)據(jù)。當(dāng)返回的消息到達(dá)時(shí),它將檢查correlation id屬性。如果該屬性值和請(qǐng)求匹配,就將響應(yīng)返回給程序。
放在一塊:
計(jì)算斐波那契數(shù)的任務(wù)如下:
private static int fib(int n) {if (n == 0) return 0;if (n == 1) return 1;return fib(n-1) + fib(n-2); }我們定義了斐波那契函數(shù),它假設(shè)只會(huì)輸入正整數(shù)(不要期望該函數(shù)在輸入很大的數(shù)的時(shí)候可以好好工作,它可能是最慢的遞歸實(shí)現(xiàn))。
RPC服務(wù)RPCServer.java的代碼如下:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope;import java.io.IOException; import java.util.concurrent.TimeoutException;public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";//模擬的耗時(shí)任務(wù),即計(jì)算斐波那契數(shù)private static int fib(int n) {if (n == 0) return 0;if (n == 1) return 1;return fib(n - 1) + fib(n - 2);}public static void main(String[] argv) {//創(chuàng)建連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = null;try {connection = factory.newConnection();final Channel channel = connection.createChannel();//聲明隊(duì)列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);//一次只從隊(duì)列中取出一個(gè)消息channel.basicQos(1);System.out.println(" [x] Awaiting RPC requests");//監(jiān)聽(tīng)消息(即RPC請(qǐng)求)Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();//收到RPC請(qǐng)求后開(kāi)始處理String response = "";try {String message = new String(body, "UTF-8");int n = Integer.parseInt(message);System.out.println(" [.] fib(" + message + ")");response += fib(n);} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {//處理完之后,返回響應(yīng)(即發(fā)布消息)System.out.println("[server current time] : " + System.currentTimeMillis());channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(RPC_QUEUE_NAME, false, consumer);//loop to prevent reaching finally blockwhile (true) {try {Thread.sleep(100);} catch (InterruptedException _ignore) {}}} catch (IOException | TimeoutException e) {e.printStackTrace();} finally {if (connection != null)try {connection.close();} catch (IOException _ignore) {}}} }RPC服務(wù)的代碼很直白:
- (1)開(kāi)始先建立連接、通道并聲明隊(duì)列。
- (2)我們可能會(huì)運(yùn)行多個(gè)服務(wù)進(jìn)程,為了負(fù)載均衡我們通過(guò)設(shè)置?prefetchCount =1將任務(wù)分發(fā)給多個(gè)服務(wù)進(jìn)程
- (3)我們使用了basicConsume來(lái)連接隊(duì)列,并通過(guò)一個(gè)DefaultConsumer對(duì)象提供回調(diào)。這個(gè)DefaultConsumer對(duì)象將進(jìn)行工作并返回響應(yīng)。
我們的RPC客戶端RPCClient代碼如下:
package com.maxwell.rabbitdemo;import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope;import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException;public class RPCClient {private Connection connection;private Channel channel;private String requestQueueName = "rpc_queue";private String replyQueueName;//定義一個(gè)RPC客戶端public RPCClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();replyQueueName = channel.queueDeclare().getQueue();}//真正地請(qǐng)求public String call(String message) throws IOException, InterruptedException {final String corrId = UUID.randomUUID().toString();AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {if (properties.getCorrelationId().equals(corrId)) {System.out.println("[client current time] : " + System.currentTimeMillis());response.offer(new String(body, "UTF-8"));}}});return response.take();}//關(guān)閉連接public void close() throws IOException {connection.close();}public static void main(String[] argv) {RPCClient fibonacciRpc = null;String response = null;try {//創(chuàng)建一個(gè)RPC客戶端fibonacciRpc = new RPCClient();System.out.println(" [x] Requesting fib(30)");//RPC客戶端發(fā)送調(diào)用請(qǐng)求,并等待影響,直到接收到response = fibonacciRpc.call("30");System.out.println(" [.] Got '" + response + "'");} catch (IOException | TimeoutException | InterruptedException e) {e.printStackTrace();} finally {if (fibonacciRpc != null) {try {//關(guān)閉RPC客戶的連接fibonacciRpc.close();} catch (IOException _ignore) {}}}} }客戶端代碼看起來(lái)有一些復(fù)雜:
- (1)建立連接和通道,并聲明了一個(gè)獨(dú)特的回調(diào)隊(duì)列。
- (2)訂閱這個(gè)回調(diào)隊(duì)列,所以我們可以接收RPC響應(yīng)。
- (3)call方法執(zhí)行RPC請(qǐng)求。在call方法中,我們首先生成一個(gè)具有唯一性的correlationId值并存在變量corrId中。我們的DefaultConsumer中的實(shí)現(xiàn)方法handleDelivery會(huì)使用這個(gè)值來(lái)獲取爭(zhēng)取的響應(yīng)。然后,我們發(fā)布了這個(gè)請(qǐng)求消息,并設(shè)置了replyTo和correlationId這兩個(gè)屬性。好了,現(xiàn)在我們可以坐下來(lái)耐心等待響應(yīng)到來(lái)了。
- (4)由于我們的消費(fèi)者處理(指handleDelivery方法)是在子線程進(jìn)行的,因此我們需要在響應(yīng)到來(lái)之前暫停主線程(否則主線程結(jié)束了,子線程接收到了影響傳給誰(shuí)啊)。使用BlockingQueue是一種解決方案。在這里我們創(chuàng)建了一個(gè)阻塞隊(duì)列ArrayBlockingQueue并將它的容量設(shè)為1,因?yàn)槲覀冎恍枰邮芤粋€(gè)響應(yīng)就可以啦。handleDelivery方法所做的很簡(jiǎn)單,當(dāng)有響應(yīng)來(lái)的時(shí)候,就檢查是不是和correlationId匹配,匹配的話就放到阻塞隊(duì)列ArrayBlockingQueue中。
- 同時(shí),主線程正等待影響。
- (5)最終將影響返回給用戶了。
現(xiàn)在,可以動(dòng)手實(shí)驗(yàn)了。首先,執(zhí)行RPC服務(wù)端,讓它等待請(qǐng)求的到來(lái)。
[x] Awaiting RPC requests然后,執(zhí)行RPC客戶端,即RPCClient中的main方法,發(fā)起請(qǐng)求:
[x] Requesting fib(30) [client current time] : 1500474305838[.] Got '832040'可以看到,客戶端很快就接受到了請(qǐng)求,回頭看RPC服務(wù)端的時(shí)間:
[.] fib(30) [server current time] : 1500474305835上面這種設(shè)計(jì)并不是RPC服務(wù)端的唯一實(shí)現(xiàn),但是它有以下幾個(gè)重要的優(yōu)勢(shì):
- ① 如果RPC服務(wù)端很慢,你可以通過(guò)運(yùn)行多個(gè)實(shí)例就可以實(shí)現(xiàn)擴(kuò)展。
- ② 在RPC客戶端,RPC要求發(fā)送和接受一個(gè)消息。非同步的方法queueDeclare是必須的。這樣,RPC客戶端只需要為一個(gè)RPC請(qǐng)求只進(jìn)行一次網(wǎng)絡(luò)往返。
但我們的代碼仍然太簡(jiǎn)單,并沒(méi)有處理更復(fù)雜但也非常重要的問(wèn)題,像:
- ① 如果沒(méi)有服務(wù)端在運(yùn)行,客戶端該怎么辦
- ② 客戶端應(yīng)該為一次RPC設(shè)置超時(shí)嗎
- ③ 如果服務(wù)端發(fā)生故障并拋出異常,它還應(yīng)該返回給客戶端嗎?
- ④ 在處理消息前,先通過(guò)邊界檢查、類型判斷等手段過(guò)濾掉無(wú)效的消息等
?
說(shuō)明:
①與原文略有出入,如有疑問(wèn),請(qǐng)參閱原文
②原文均是編譯后通過(guò)javacp命令直接運(yùn)行程序,我是在IDE中進(jìn)行的,相應(yīng)的操作做了修改。
③添加了客戶端和服務(wù)端執(zhí)行時(shí)間。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ(六):回调队列callback queue、关联标识correlation id、实现简单的RPC系统的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: RabbitMQ(五):Exchange
- 下一篇: ubuntu系统安装Anaconda与使