日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息

發(fā)布時(shí)間:2025/3/21 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 廣播消息
    • 廣播消息概述
    • 演示步驟
  • 延時(shí)消息
    • 概述
    • 使用場景
    • 延時(shí)機(jī)制
    • 實(shí)現(xiàn)原理
    • 示例
  • 批量消息
    • 批量消息概述
    • 示例
  • 代碼


廣播消息

廣播消息概述

廣播消息就是向所有用戶發(fā)送消息。 如果我們希望所有訂閱者都能收到有關(guān)某個(gè)主題的消息,可以使用廣播消息。

舉個(gè)例子 生產(chǎn)者發(fā)送10條消息,有2個(gè)訂閱者,則這兩個(gè)訂閱者會(huì)分別收到10條消息, 而與廣播模式相對(duì)應(yīng)的集群模式這是 2個(gè)訂閱者一共收到10條消息。

Rocketmq 消費(fèi)者默認(rèn)是集群的方式消費(fèi)的,使用廣播模式進(jìn)行消費(fèi)需要顯示設(shè)置

核心:消費(fèi)端設(shè)置消息模型 consumer.setMessageModel(MessageModel.BROADCASTING);


演示步驟

  • 啟動(dòng)2個(gè)或者2個(gè)以上的消費(fèi)者
  • 啟動(dòng)生產(chǎn)者發(fā)送消息
  • 觀察2個(gè)消費(fèi)者的消息接收情況 :兩個(gè)Consumer收到了同樣的消息,OK.

生產(chǎn)者:

package com.artisan.rocketmq.broadcast;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper;/*** @author 小工匠* @version v1.0* @create 2019-11-10 19:22* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class BroadcastProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();for (int i = 0; i < 4; i++){Message msg = new Message("TopicTest","TagA","OrderID188",("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();} }

消費(fèi)者:

package com.artisan.rocketmq.broadcast;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 19:27* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class BroadcastConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//廣播,全量消費(fèi)consumer.setMessageModel(MessageModel.BROADCASTING);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt ext : msgs){System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Broadcast Consumer Started.%n");} }

測試結(jié)果:

生產(chǎn)者:

消費(fèi)者1:

消費(fèi)者2:


延時(shí)消息

概述

定時(shí)消息是指消息發(fā)到 Broker 后,不能立刻被 Consumer 消費(fèi),要到特定的時(shí)間點(diǎn)或者等待特定的時(shí)間后才能被消費(fèi)。


使用場景

舉個(gè)例子: 電商系統(tǒng),提交了一個(gè)訂單就可以發(fā)送一個(gè)延時(shí)消息,1h后去檢查這個(gè)訂單的狀態(tài),如果還是未付款就取消訂單釋放庫存。


延時(shí)機(jī)制

org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel

當(dāng)前支持的延遲時(shí)間

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

分別對(duì)應(yīng)級(jí)別

1 2 3....................

設(shè)置消息時(shí)延

Message message = new Message; message.setDelayTimeLevel(3)

現(xiàn)在RocketMq并不支持任意時(shí)間的延時(shí),需要設(shè)置幾個(gè)固定的延時(shí)等級(jí),從1s到2h分別對(duì)應(yīng)著等級(jí)1到18 消息消費(fèi)失敗會(huì)進(jìn)入延時(shí)消息隊(duì)列,消息發(fā)送時(shí)間與設(shè)置的延時(shí)等級(jí)和重試次數(shù)有關(guān)。


實(shí)現(xiàn)原理

延遲隊(duì)列的核心思路: 【利用中間隊(duì)列臨時(shí)存儲(chǔ)】—>所有的延遲消息由producer消息發(fā)憷之后,都會(huì)存放在一個(gè)topic下 (SHCEDULE_TOPIC_XXXX), 不同的延遲級(jí)別對(duì)應(yīng)不同的隊(duì)列序號(hào),當(dāng)延遲時(shí)間到了之后,由定時(shí)線程讀取轉(zhuǎn)換為普通的消息存到真實(shí)指定的topic下,此時(shí)對(duì)于consumer端此消息才可見,從而被consumer消費(fèi)。


示例

生產(chǎn)者:

package com.artisan.rocketmq.schedule;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;import java.util.Date;/*** @author 小工匠* @version v1.0* @create 2019-11-10 17:23* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();int totalMessagesToSend = 3;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());//延時(shí)消費(fèi) 6-->2分鐘message.setDelayTimeLevel(6);// Send the messageproducer.send(message);}System.out.printf("message send is completed .%n" + new Date());producer.shutdown();} }

消費(fèi)者:

package com.artisan.rocketmq.schedule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt;import java.util.Date; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 17:23* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.subscribe("TestTopic", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println(new Date() + "Receive message[msgId=" + message.getMsgId() + "] "+ "message content is :" + new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//System.out.printf("Consumer Started.%n");} }

設(shè)置的延遲level為6 ,對(duì)應(yīng)的時(shí)間間隔是兩分鐘,OK。


批量消息

批量消息概述

批量發(fā)送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應(yīng)該有相同的topic,相同的waitStoreMsgOK,而且不能是延時(shí)消息

此外,這一批消息的總大小不應(yīng)超過4MB。rocketmq建議每次批量消息大小大概在1MB。當(dāng)消息大小超過4MB時(shí),需要將消息進(jìn)行分割

示例

生產(chǎn)者

package com.artisan.rocketmq.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message;import java.util.ArrayList; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 21:27* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class BatchProducer {public static void main(String[] args) throws Exception {/*** rocketMq 支持消息批量發(fā)送* 同一批次的消息應(yīng)具有:相同的主題,相同的waitStoreMsgOK,并且不支持定時(shí)任務(wù)。* <strong> 同一批次消息建議大小不超過~1M </strong>,消息最大不能超過4M,需要* 對(duì)msg進(jìn)行拆分*/DefaultMQProducer producer = new DefaultMQProducer("batch_group");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String topic = "BatchTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));ListSplitter splitter = new ListSplitter(messages);/*** 對(duì)批量消息進(jìn)行拆分*/while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();}}producer.shutdown();}}

消息拆分

package com.artisan.rocketmq.batch;import org.apache.rocketmq.common.message.Message;import java.util.Iterator; import java.util.List; import java.util.Map;/*** @author 小工匠* @version v1.0* @create 2019-11-10 21:35* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MBprivate final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;//遍歷消息準(zhǔn)備拆分for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; //for log overheadif (tmpSize > SIZE_LIMIT) {if (nextIndex - currIndex == 0) {nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;} }

消費(fèi)者

package com.artisan.rocketmq.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 21:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/ public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");consumer.subscribe("BatchTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs){System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");} }

代碼

請移步: https://github.com/yangshangwei/rocketmqMaster

總結(jié)

以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(09)-广播消息、延时消息、批量消息的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。