日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage

發布時間:2025/3/21 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 順序消息的概念
  • 順序消費的原理
  • 消費狀態
  • 演示
    • Producer
    • Consumer
  • 代碼


順序消息的概念

消息有序指的是可以按照消息的發送順序來消費(FIFO)。

RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序


順序消費的原理

在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);

而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。

但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。

當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。


消費狀態

package org.apache.rocketmq.client.consumer.listener;public enum ConsumeOrderlyStatus {/*** Success consumption*/SUCCESS,/*** Suspend current queue a moment* 不能跳過消息,等待一下*/SUSPEND_CURRENT_QUEUE_A_MOMENT; }

演示

下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。

Producer

package com.artisan.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");producer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");producer.start();String[] tags = new String[]{"TagA", "TagC", "TagD"};// 訂單列表List<OrderStep> orderList = buildOrders();Date date = new Date();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String dateStr = sdf.format(date);for (int i = 0; i < 10; i++) {// 加個時間前綴String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,body.getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Long id = (Long) arg; //根據訂單id選擇發送queuelong index = id % mqs.size();return mqs.get((int) index);}}, orderList.get(i).getOrderId());//訂單idSystem.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}/*** 生成模擬訂單數據*/private static List<OrderStep> buildOrders() {List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("創建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc("購物車");orderList.add(orderDemo);return orderList;}/*** 訂單的步驟*/private static class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}} }

日志


Consumer

package com.artisan.rocketmq.order;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version v1.0* @create 2019-11-10 16:38* @motto show me the code ,change the word* @blog https://artisan.blog.csdn.net/* @description**/public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");consumer.setNamesrvAddr("192.168.18.130:9876;192.168.18.131:9876");/*** 設置消費位置*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");// 有序消費 MessageListenerOrderlyconsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(true);Random random = new Random();for (MessageExt msg : msgs) {// 可以看到每個queue有唯一的consume來消費, 訂單對每個queue(分區)有序try {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId()+ ", content:" + new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}try {//模擬業務邏輯處理中...TimeUnit.SECONDS.sleep(random.nextInt(10));} catch (Exception e) {e.printStackTrace();}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");} }

運行日志


代碼

請移步:https://github.com/yangshangwei/rocketmqMaster


總結

以上是生活随笔為你收集整理的RocketMQ-初体验RocketMQ(07)-使用API操作RocketMQ_顺序消息 ordermessage的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。