RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
文章目錄
- 順序消息的概念
- 順序消費的原理
- 消費狀態
- 演示
- Producer
- Consumer
- 代碼
順序消息的概念
消息有序指的是可以按照消息的發送順序來消費(FIFO)。
RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
順序消費的原理
在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);
而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。
但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。
當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。
消費狀態
package org.apache.rocketmq.client.consumer.listener;public enum ConsumeOrderlyStatus {/*** Success consumption*/SUCCESS,/*** Suspend current queue a moment* 不能跳過消息,等待一下*/SUSPEND_CURRENT_QUEUE_A_MOMENT; }演示
下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
Producer
package com.artisan.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String[] tags = new String[]{"TagA", "TagC", "TagD"};// 訂單列表List<OrderStep> orderList = buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加個時間前綴String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,body.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg; //根據訂單id選擇發送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//訂單idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}/*** 生成模擬訂單數據*/private static List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("購物車");orderList.add(orderDemo);return orderList;}/*** 訂單的步驟*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}} }日志
Consumer
package com.artisan.rocketmq.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 設置消費位置*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");// 有序消費 MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(true);Random random = new Random();for (MessageExt msg : msgs) {// 可以看到每個queue有唯一的consume來消費, 訂單對每個queue(分區)有序try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId()+ ", content:" + new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}try {//模擬業務邏輯處理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");} }運行日志
代碼
請移步:https://github.com/yangshangwei/rocketmqMaster
總結
以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IDEA-使用IDEA创建maven多模
- 下一篇: RocketMQ-初体验RocketMQ