rocketmq发送顺序消息(四)
生活随笔
收集整理的這篇文章主要介紹了
rocketmq发送顺序消息(四)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
rocketmq怎么發送消息可參考我的上一篇博客:rocketmq發送第一條消息。此處我們講解如何發送rocketmq順序消息
producer
public class ProducerOrder {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("testGrp");// 設置nameserver地址 nameserver具備路由功能(發現服務,有點注冊中心的意思),讓其分配合理的broker來進行消息發送producer.setNamesrvAddr("192.168.52.11:9876");try {producer.start();} catch (MQClientException e) {e.printStackTrace();}for (int i = 0; i < 20; i++) {Message message = new Message("monkeyOrderMsgTopic", ("這是順序消息:" + i).getBytes());producer.send(message,// 自定義選擇Queuenew MessageQueueSelector() {/**** @param list 當前topic里所有的queue* @param message 要發送的消息* @param o 對應到 send() 里的 args參數* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {// 根據傳入的參數決定QueueMessageQueue messageQueue = list.get((Integer)o);return messageQueue;}}, 0, 3000);}System.out.println("發送完成");} }?consumer
public class ConsumerOrder {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumerOrder");consumer.setNamesrvAddr("192.168.52.11:9876");consumer.subscribe("monkeyOrderMsgTopic","*");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt messageExt : list) {System.out.println(new String(messageExt.getBody()) + "current Thread:" + Thread.currentThread().getName());}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println("consumer start .....");}}測試結果:?
?總結:
你們應該如何保證消息的順序?
-
同一topic
-
同一個QUEUE
-
發消息的時候一個線程去發送消息
-
消費的時候 一個線程 消費一個queue里的消息或者使用MessageListenerOrderly
-
多個queue 只能保證單個queue里的順序
總結
以上是生活随笔為你收集整理的rocketmq发送顺序消息(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rocketmq发送第一条消息(三)
- 下一篇: Linux安装最新Redis