RocketMQ源码分析之request-reply特性
1.什么是request-reply?
??RocketMQ4.6.0版本中增加了request-reply新特性,該特性允許producer在發送消息后同步或者異步等待consumer消費完消息并返回響應消息,類似rpc調用效果。
2. 使用場景
- 快速搭建服務總線,實現rpc框架
- 調用鏈追蹤分析
- 跨網絡區域實現系統間同步調用
3.使用方法
- producer端
??producer端調用request(final Message msg, final long timeout)方法以同步方式等待consumer端消費完消息并返回響應消息;調用request(final Message msg, final RequestCallback requestCallback, final long timeout)方法以異步方式等待consumer端消費完消息并返回響應消息。
同步方式:
public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "RequestTopic0218";String topic = "RequestTopic";long ttl = 300000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();} }異步方式:
public class AsyncRequestProducer {private static final InternalLogger log = ClientLogger.getLog();public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "AsynRequestTopic";long ttl = 3000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();producer.request(msg, new RequestCallback() {@Overridepublic void onSuccess(Message message) {long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);}@Overridepublic void onException(Throwable e) {System.err.printf("request to <%s> fail.", topic);}}, ttl);} catch (Exception e) {log.warn("", e);}/* shutdown after your request callback is finished */ // producer.shutdown();} }- consumer端
??consumer端程序在原來的基礎上會增加以下內容:
??(1)創建producer用來發送消息
??(2)在消費完消息后調用RocketMQ提供的MessageUtil.createReplyMessage(final Message requestMessage, final byte[] body)方法來構建響應消息
??(3)調用send方法將響應消息發回給生產者
public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "ReplyProducer0218";String consumerGroup = "ResponseConsumer0218";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 300000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();System.out.printf("Consumer Started.%n");} }在RocketMQ中producer端可以通過調用以下兩個方法發送消息并等待consumer端返回響應消息:
- request(final Message msg, final long timeout)
- request(final Message msg, final RequestCallback requestCallback, final long timeout)
下面以producer同步等待consumer響應消息為例分析整個request-reply的過程:
public Message request(Message msg,long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginTimestamp = System.currentTimeMillis();prepareSendRequest(msg, timeout);final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);try {final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);long cost = System.currentTimeMillis() - beginTimestamp;this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {requestResponseFuture.setSendRequestOk(true);}@Overridepublic void onException(Throwable e) {requestResponseFuture.setSendRequestOk(false);requestResponseFuture.putResponseMessage(null);requestResponseFuture.setCause(e);}}, timeout - cost);return waitResponse(msg, timeout, requestResponseFuture, cost);} finally {RequestFutureTable.getRequestFutureTable().remove(correlationId);}}(1)獲取系統當前時間,方便后續進行超時判斷
(2)調用prepareSendRequest(final Message msg, long timeout)函數將待發送給broker的消息進行改造,具體改造如下:
- 調用CorrelationIdUtil.createCorrelationId()生成該消息的correlationId,并將correlationId添加到消息的擴展屬性CORRELATION_ID
- 獲取producer的clientId并將其添加到消息的擴展屬性REPLY_TO_CLIENT,該屬性的作用在于后續consumer端發送響應消息時broker知道將消息發送給哪個producer端
- 將超時時間添加到消息的擴展屬性TTL
(3)構建RequestResponseFuture對象,這里需要詳細解釋RequestResponseFuture對象,RequestResponseFuture是實現request-reply特性的關鍵,producer發送的每條消息都會new一個RequestResponseFuture對象:
- correlationId是CorrelationIdUtil.createCorrelationId()方法隨機生成的UUID字符串,correlationId是用來標識從發送每條消息到conumer端發送響應消息的請求
- requestMsg是consumer端返回的響應消息
- countDownLatch在消息發送時會阻塞producer線程(調用了await實現阻塞),等到響應消息返回時激活producer線程,最后返回consumer端響應消息,所以雖然在內部實現上是以異步方式發送消息但是結合countDownLatch達到了同步的效果
- 由于是同步發送所以requestCallback為null
(4)將<correlationId, requestResponseFuture>添加到requestFutureTable,后續consumer向broker發送RequestCode.SEND_REPLY_MESSAGE_V2請求將響應消息發送到broker,broker在處理這個請求時會調用pushReplyMessage方法發送RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請求給producer,此時producer端會根據響應消息中correlationId在requestFutureTable中獲取其對應的requestResponseFuture,并且會將響應消息賦給requestResponseFuture中的responseMsg。
(5)調用sendDefaultImpl方法以異步的方式發送消息,雖然是以異步方式發送消息但是結合RequestResponseFuture中的countDownLatch到達了同步效果。此時producer發送了RequestCode.SEND_MESSAGE請求給broker,broker后續的處理過程與發送普通消息是一樣的。
(6)consumer在正常消費完消息后,需要調用MessageUtil.createReplyMessage方法構建響應消息,該方法有兩個參數,分別是producer發送消息和響應消息體內容,該方法會從producer發送的消息的擴展屬性中獲取“CLUSTER”、“REPLY_TO_CLIENT”、“CORRELATION_ID”和“TTL”,并根據這些擴展屬性以及響應消息體內容構建響應消息。這里需要注意,新構建的響應消息的topic是由producer發送的消息的擴展屬性中的CLUSTER與REPLY_TOPIC拼接起來,即“集群名稱_REPLY_TOPIC”,這個是一個系統級別的topic,是由broker自己創建的。
public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {if (requestMessage != null) {Message replyMessage = new Message();String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);replyMessage.setBody(body);if (cluster != null) {String replyTopic = MixAll.getReplyTopic(cluster);replyMessage.setTopic(replyTopic);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);return replyMessage;} else {throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");}}throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");}(7)調用send方法發送響應消息到broker,在發送的過程中會判斷消息的類型,由于該消息是reply類型的,所以向broker發送的請求類型是RequestCode.SEND_REPLY_MESSAGE_V2
(8)broker處理RequestCode.SEND_REPLY_MESSAGE_V2請求的是ReplyMessageProcessor,具體操作如下:
- 根據請求中響應消息的topic、queueId、消息體內容、消息標記、消息的擴展屬性、消息產生的時間、消息的來源等信息構建MessageExtBrokerInner對象
- 調用pushReplyMessage方法構建RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請求,然后根據消息擴展屬性REPLY_TO_CLIENT獲取broker與producer連接的channel,最后將請求發送給producer。這里有個問題:RocketMQ如何保證請求原路返回?首先producer產生的消息會發送到broker上,此時broker中存儲的producer產生的消息的擴展屬性中是包含存儲的broker的集群名稱的,接著consumer消息該消息并根據該消息構造出響應消息,在構造響應消息時,其topic是“集群名稱_REPLY_TOPIC”,這樣就保證了consumer在發送響應消息到broker是原路返回,即這里的broker是與producer連接的broker。
- 判斷broker端的配置文件中storeReplyMessageEnable配置項的值是否為true,如果為true,則會將響應消息存儲在broker端。storeReplyMessageEnable的默認值是true。
(9)producer處理broker發送的RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請求的是ClientRemotingProcessor,具體如下:
- 根據請求還原響應消息MessageExt
- 獲取響應消息擴展屬性CORRELATION_ID的值correlationId,在producer端的requestFutureTable中根據correlationId獲取該消息對應的requestResponseFuture,然后將響應消息放入到requestResponseFuture中的responseMsg并將countDownLatch的值減一,此時producer端調用request方法的線程就激活了
- 從requestFutureTable中刪除key為correlationId的數據項
(10)producer端調用request方法線程激活后會調用waitResponse方法返回requestResponseFuture中的responseMsg,這里最終調用的waitResponseMessage方法中帶有一個參數:超時時間,如果到了超時時間后consumer端的響應消息沒有被producer端收到,線程也會被激活,這樣的設置也是防止producer線程一直被阻塞。
參考資料: 官方視頻鏈接.
總結
以上是生活随笔為你收集整理的RocketMQ源码分析之request-reply特性的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux traceroute no
- 下一篇: 十大诱人垃圾食物