日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ源码分析之request-reply特性

發布時間:2023/12/3 编程问答 63 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ源码分析之request-reply特性 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.什么是request-reply?

??RocketMQ4.6.0版本中增加了request-reply新特性,該特性允許producer在發送消息后同步或者異步等待consumer消費完消息并返回響應消息,類似rpc調用效果。
2. 使用場景

  • 快速搭建服務總線,實現rpc框架
  • 調用鏈追蹤分析
  • 跨網絡區域實現系統間同步調用

3.使用方法

  • producer端

??producer端調用request(final Message msg, final long timeout)方法以同步方式等待consumer端消費完消息并返回響應消息;調用request(final Message msg, final RequestCallback requestCallback, final long timeout)方法以異步方式等待consumer端消費完消息并返回響應消息。

同步方式:

public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "RequestTopic0218";String topic = "RequestTopic";long ttl = 300000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();} }

異步方式:

public class AsyncRequestProducer {private static final InternalLogger log = ClientLogger.getLog();public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "AsynRequestTopic";long ttl = 3000;DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();producer.request(msg, new RequestCallback() {@Overridepublic void onSuccess(Message message) {long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);}@Overridepublic void onException(Throwable e) {System.err.printf("request to <%s> fail.", topic);}}, ttl);} catch (Exception e) {log.warn("", e);}/* shutdown after your request callback is finished */ // producer.shutdown();} }
  • consumer端

??consumer端程序在原來的基礎上會增加以下內容:

??(1)創建producer用來發送消息

??(2)在消費完消息后調用RocketMQ提供的MessageUtil.createReplyMessage(final Message requestMessage, final byte[] body)方法來構建響應消息

??(3)調用send方法將響應消息發回給生產者

public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "ReplyProducer0218";String consumerGroup = "ResponseConsumer0218";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes();// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 300000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();System.out.printf("Consumer Started.%n");} }
  • 源碼分析
  • 在RocketMQ中producer端可以通過調用以下兩個方法發送消息并等待consumer端返回響應消息:

    • request(final Message msg, final long timeout)
    • request(final Message msg, final RequestCallback requestCallback, final long timeout)

    下面以producer同步等待consumer響應消息為例分析整個request-reply的過程:

    public Message request(Message msg,long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginTimestamp = System.currentTimeMillis();prepareSendRequest(msg, timeout);final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);try {final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);long cost = System.currentTimeMillis() - beginTimestamp;this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {requestResponseFuture.setSendRequestOk(true);}@Overridepublic void onException(Throwable e) {requestResponseFuture.setSendRequestOk(false);requestResponseFuture.putResponseMessage(null);requestResponseFuture.setCause(e);}}, timeout - cost);return waitResponse(msg, timeout, requestResponseFuture, cost);} finally {RequestFutureTable.getRequestFutureTable().remove(correlationId);}}

    (1)獲取系統當前時間,方便后續進行超時判斷

    (2)調用prepareSendRequest(final Message msg, long timeout)函數將待發送給broker的消息進行改造,具體改造如下:

    • 調用CorrelationIdUtil.createCorrelationId()生成該消息的correlationId,并將correlationId添加到消息的擴展屬性CORRELATION_ID
    • 獲取producer的clientId并將其添加到消息的擴展屬性REPLY_TO_CLIENT,該屬性的作用在于后續consumer端發送響應消息時broker知道將消息發送給哪個producer端
    • 將超時時間添加到消息的擴展屬性TTL

    (3)構建RequestResponseFuture對象,這里需要詳細解釋RequestResponseFuture對象,RequestResponseFuture是實現request-reply特性的關鍵,producer發送的每條消息都會new一個RequestResponseFuture對象:

    • correlationId是CorrelationIdUtil.createCorrelationId()方法隨機生成的UUID字符串,correlationId是用來標識從發送每條消息到conumer端發送響應消息的請求
    • requestMsg是consumer端返回的響應消息
    • countDownLatch在消息發送時會阻塞producer線程(調用了await實現阻塞),等到響應消息返回時激活producer線程,最后返回consumer端響應消息,所以雖然在內部實現上是以異步方式發送消息但是結合countDownLatch達到了同步的效果
    • 由于是同步發送所以requestCallback為null
    public class RequestResponseFuture {private final String correlationId;private final RequestCallback requestCallback;private final long beginTimestamp = System.currentTimeMillis();private final Message requestMsg = null;private long timeoutMillis;private CountDownLatch countDownLatch = new CountDownLatch(1);private volatile Message responseMsg = null;private volatile boolean sendRequestOk = true;private volatile Throwable cause = null;

    (4)將<correlationId, requestResponseFuture>添加到requestFutureTable,后續consumer向broker發送RequestCode.SEND_REPLY_MESSAGE_V2請求將響應消息發送到broker,broker在處理這個請求時會調用pushReplyMessage方法發送RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請求給producer,此時producer端會根據響應消息中correlationId在requestFutureTable中獲取其對應的requestResponseFuture,并且會將響應消息賦給requestResponseFuture中的responseMsg。

    (5)調用sendDefaultImpl方法以異步的方式發送消息,雖然是以異步方式發送消息但是結合RequestResponseFuture中的countDownLatch到達了同步效果。此時producer發送了RequestCode.SEND_MESSAGE請求給broker,broker后續的處理過程與發送普通消息是一樣的。

    (6)consumer在正常消費完消息后,需要調用MessageUtil.createReplyMessage方法構建響應消息,該方法有兩個參數,分別是producer發送消息和響應消息體內容,該方法會從producer發送的消息的擴展屬性中獲取“CLUSTER”、“REPLY_TO_CLIENT”、“CORRELATION_ID”和“TTL”,并根據這些擴展屬性以及響應消息體內容構建響應消息。這里需要注意,新構建的響應消息的topic是由producer發送的消息的擴展屬性中的CLUSTER與REPLY_TOPIC拼接起來,即“集群名稱_REPLY_TOPIC”,這個是一個系統級別的topic,是由broker自己創建的。

    public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {if (requestMessage != null) {Message replyMessage = new Message();String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);replyMessage.setBody(body);if (cluster != null) {String replyTopic = MixAll.getReplyTopic(cluster);replyMessage.setTopic(replyTopic);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);return replyMessage;} else {throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");}}throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");}

    (7)調用send方法發送響應消息到broker,在發送的過程中會判斷消息的類型,由于該消息是reply類型的,所以向broker發送的請求類型是RequestCode.SEND_REPLY_MESSAGE_V2

    (8)broker處理RequestCode.SEND_REPLY_MESSAGE_V2請求的是ReplyMessageProcessor,具體操作如下:

    • 根據請求中響應消息的topic、queueId、消息體內容、消息標記、消息的擴展屬性、消息產生的時間、消息的來源等信息構建MessageExtBrokerInner對象
    • 調用pushReplyMessage方法構建RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請求,然后根據消息擴展屬性REPLY_TO_CLIENT獲取broker與producer連接的channel,最后將請求發送給producer。這里有個問題:RocketMQ如何保證請求原路返回?首先producer產生的消息會發送到broker上,此時broker中存儲的producer產生的消息的擴展屬性中是包含存儲的broker的集群名稱的,接著consumer消息該消息并根據該消息構造出響應消息,在構造響應消息時,其topic是“集群名稱_REPLY_TOPIC”,這樣就保證了consumer在發送響應消息到broker是原路返回,即這里的broker是與producer連接的broker。
    • 判斷broker端的配置文件中storeReplyMessageEnable配置項的值是否為true,如果為true,則會將響應消息存儲在broker端。storeReplyMessageEnable的默認值是true。
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner); this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt); }

    (9)producer處理broker發送的RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT請求的是ClientRemotingProcessor,具體如下:

    • 根據請求還原響應消息MessageExt
    • 獲取響應消息擴展屬性CORRELATION_ID的值correlationId,在producer端的requestFutureTable中根據correlationId獲取該消息對應的requestResponseFuture,然后將響應消息放入到requestResponseFuture中的responseMsg并將countDownLatch的值減一,此時producer端調用request方法的線程就激活了
    • 從requestFutureTable中刪除key為correlationId的數據項
    private void processReplyMessage(MessageExt replyMsg) {final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);if (requestResponseFuture != null) {requestResponseFuture.putResponseMessage(replyMsg);RequestFutureTable.getRequestFutureTable().remove(correlationId);if (requestResponseFuture.getRequestCallback() != null) {requestResponseFuture.getRequestCallback().onSuccess(replyMsg);} else {requestResponseFuture.putResponseMessage(replyMsg);}} else {String bornHost = replyMsg.getBornHostString();log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",correlationId, bornHost));}}

    (10)producer端調用request方法線程激活后會調用waitResponse方法返回requestResponseFuture中的responseMsg,這里最終調用的waitResponseMessage方法中帶有一個參數:超時時間,如果到了超時時間后consumer端的響應消息沒有被producer端收到,線程也會被激活,這樣的設置也是防止producer線程一直被阻塞。

    參考資料: 官方視頻鏈接.

    總結

    以上是生活随笔為你收集整理的RocketMQ源码分析之request-reply特性的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。