日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

rockemq 发送延迟消息_RocketMQ系列(五)广播与延迟消息

發布時間:2024/9/18 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 rockemq 发送延迟消息_RocketMQ系列(五)广播与延迟消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

今天要給大家介紹RocketMQ中的兩個功能,一個是“廣播”,這個功能是比較基礎的,幾乎所有的mq產品都是支持這個功能的;另外一個是“延遲消費”,這個應該算是RocketMQ的特色功能之一了吧。接下來,我們就分別看一下這兩個功能。

廣播

廣播是把消息發送給訂閱了這個主題的所有消費者。這個定義很清楚,但是這里邊的知識點你都掌握了嗎?咱們接著說“廣播”的機會,把消費者這端的內容好好和大家說說。

首先,消費者端的概念中,最大的應該是消費者組,一個消費者組中可以有多個消費者,這些消費者必須訂閱同一個Topic。

那么什么算是一個消費者呢?我們在寫消費端程序時,看到了setConsumeThreadMax這個方法,設置消費者的線程數,難道一個線程就是一個消費者?錯!這里的一個消費者是一個進程,你可以理解為ip+端口。如果在同一個應用中,你實例化了兩個消費者,這兩個消費者配置了相同的消費者組名稱,那么應用程序啟動時會報錯的,這里不給大家演示了,感興趣的小伙伴私下里試一下吧。

同一個消息,可以被不同的消費者組同時消費。假設,我有兩個消費者組cg-1和cg-2,這兩個消費者組訂閱了同一個Topic,那么這個Topic的消息會被cg-1和cg-2同時消費。那這是不是廣播呢?錯!當然不是廣播,廣播是同一個消費者組中的多個消費者都消費這個消息。如果配置的不是廣播,像前幾個章節中的那樣,一個消息只能被一個消費者組消費一次。

好了,說了這么多,我們實驗一下吧,先把消費者配置成廣播,如下:

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")

public DefaultMQPushConsumer broadcast() throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");

consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");

consumer.subscribe("cluster-topic","*");

consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for (MessageExt msg : msgs) {

System.out.println(new String(msg.getBody()));

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

return consumer;

}

其中,NameServer,訂閱的Topic都沒有變化。

注意其中consumer.setMessageModel(MessageModel.BROADCASTING);這段代碼,設置消費者為廣播。咱們可以看一下,MessageModel枚舉中只有兩個值,BROADCASTING和CLUSTERING,默認為CLUSTERING。

因為要測試廣播,所以我們要啟動多個消費者,還記得什么是消費者嗎?對了,一個ip+端口算是一個消費者,在這里我們啟動兩個應用,端口分別是8080和8081。發送端的程序不變,如下:

@Test

public void producerTest() throws Exception {

for (int i = 0;i<5;i++) {

MessageExt message = new MessageExt();

message.setTopic("cluster-topic");

message.setKeys("key-"+i);

message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());

SendResult sendResult = defaultMQProducer.send(message);

System.out.println("i=" + i);

System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());

}

}

我們執行一下發送端的程序,日志如下:

i=0

BrokerName:broker-a

i=1

BrokerName:broker-a

i=2

BrokerName:broker-b

i=3

BrokerName:broker-b

i=4

BrokerName:broker-b

再來看看8080端口的應用后臺打印出來的日志:

消費了5個消息,再看看8081的后臺打印的日志,

也消費了5個。兩個消費者同時消費了消息,這就是廣播。有的小伙伴可能會有疑問了,如果不設置廣播,會怎么樣呢?私下里實驗一下吧,上面的程序中,只要把設置廣播的那段代碼注釋掉就可以了。運行的結果當然是只有一個消費者可以消費消息。

延遲消息

延遲消息是指消費者過了一個指定的時間后,才去消費這個消息。大家想象一個電商中場景,一個訂單超過30分鐘未支付,將自動取消。這個功能怎么實現呢?一般情況下,都是寫一個定時任務,一分鐘掃描一下超過30分鐘未支付的訂單,如果有則被取消。這種方式由于每分鐘查詢一下訂單,一是時間不精確,二是查庫效率比較低。這個場景使用RocketMQ的延遲消息最合適不過了,我們看看怎么發送延遲消息吧,發送端代碼如下:

@Test

public void producerTest() throws Exception {

for (int i = 0;i<1;i++) {

MessageExt message = new MessageExt();

message.setTopic("cluster-topic");

message.setKeys("key-"+i);

message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());

message.setDelayTimeLevel(2);

SendResult sendResult = defaultMQProducer.send(message);

System.out.println("i=" + i);

System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());

}

}

我們只是增加了一句message.setDelayTimeLevel(2);

為了方便,這次我們只發送一個消息。

setDelayTimeLevel是什么意思,設置的是2,難道是2s后消費嗎?怎么參數也沒有時間單位呢?如果我要自定義延遲時間怎么辦?我相信很多小伙伴都有這樣的疑問,我也是帶著這樣的疑問查了很多資料,最后在RocketMQ的Github官網上看到了說明,

在RocketMQ的源碼中,有一個MessageStoreConfig類,這個類中定義了延遲的時間,我們看一下,

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

我們在程序中設置的是2,那么這個消息將在5s以后被消費。

目前RocketMQ還不支持自定義延遲時間,延遲時間只能從上面的時間中選。如果你非要定義一個時間怎么辦呢?RocketMQ是開源的,下載代碼,把上面的時間改一下,再打包部署,就OK了。

再看看消費端的代碼,

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")

public DefaultMQPushConsumer broadcast() throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");

consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");

consumer.subscribe("cluster-topic","*");

consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {

for (MessageExt msg : msgs) {

Date now = new Date();

System.out.println("消費時間:"+now);

Date msgTime = new Date();

msgTime.setTime(msg.getBornTimestamp());

System.out.println("消息生成時間:"+msgTime);

System.out.println(new String(msg.getBody()));

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

});

return consumer;

}

我們還是使用廣播的模式,沒有變。

打印出了當前的時間,這個時間就是消費的時間。

通過msg.getBornTimestamp()方法,獲得了消息的生成時間,也打印出來,看看是不是延遲5s。

啟動兩個消費者8080和8081,發送消息,再看看消費者的后臺日志,

消費時間:Thu Jun 11 14:45:53 CST 2020

消息生成時間:Thu Jun 11 14:45:48 CST 2020

this is simpleMQ,my NO is 0---Thu Jun 11 14:45:47 CST 2020

我們看到消費時間比生成時間晚5s,符合我們的預期。這個功能還是比較實用的,如果能夠自定義延遲時間就更好了。

總結

RocketMQ的這兩個知識點還是比較簡單的,大家要分清楚什么是消費者組,什么是消費者,什么是消費者線程。另外就是延遲消息是不支持自定義的,大家可以在Github上看一下源碼。好了~今天就到這里了。

總結

以上是生活随笔為你收集整理的rockemq 发送延迟消息_RocketMQ系列(五)广播与延迟消息的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: av网址免费在线观看 | 97视频免费看 | 精品国产一区二区在线 | 日韩最新中文字幕 | 美女黄色一级视频 | 一区二区三区在线免费观看视频 | 精品中文字幕在线 | 那里有毛片看 | 玖玖热在线视频 | 精品一区二区三区视频 | 欧美在线一二三区 | 在线观看黄色网 | 国模私拍xvideos私拍 | 欧美大片在线看 | 69人妻精品久久无人专区 | 成人美女免费网站视频 | www.国产黄色 | 午夜在线 | 澳门三级 | 中文幕无线码中文字蜜桃 | 91在线日本 | 亚洲美女av在线 | 日韩一区高清 | 色综合av综合无码综合网站 | 波多野结衣一区二区三区 | 91在线精品播放 | 91av视频在线播放 | 久久久久五月 | 性欧美bb| 天堂欧美城网站网址 | 亚洲第一二三区 | 成人综合社区 | 日韩中文字幕影院 | 中文字幕在线观看视频网站 | 麻豆传媒一区二区 | 亚洲四区| 西西4444www大胆无视频 | 在线免费观看日韩 | 欧美性在线视频 | 国产 欧美 日韩 一区 | 欧美精彩视频 | 黄色片成人| 午夜男人天堂 | 国产日韩在线视频 | 国产精品国色综合久久 | 91一区视频| 日本亚洲一区二区 | 精品日本视频 | 久久欧美精品 | www.youjizz.com国产 | 性猛交xxxx乱大交孕妇印度 | 涩涩网站入口 | 四虎国产在线观看 | 国产亚洲欧美在线 | 国产精品av在线播放 | 99久久99久久精品免费看蜜桃 | 成人高潮片免费视频 | 可以看黄色的网站 | 日本美女一级片 | 亚洲区小说区 | 国产美女菊爆在线播放APP | 星铁乱淫h侵犯h文 | 中文字幕Av日韩精品 | 色综合五月 | 欧美一级爱爱 | 免费成人av在线播放 | 欧美精品二区三区四区免费看视频 | 欧美日韩视频在线播放 | 射综合网 | 一本色道久久综合亚洲精品 | 国内精品久久99人妻无码 | 久久久国产片 | 黄色大片在线 | 性感美女一级片 | 亚洲乱色熟女一区二区三区 | 337p粉嫩大胆噜噜噜亚瑟影院 | 国产毛片毛片毛片毛片毛片毛片 | 亚洲精品嫩草 | 黄色污在线观看 | caoporn成人 | 欧美aaaaaaa | 精品综合久久久久 | 三级av | 特级毛片爽www免费版 | 亚洲一区二区三区久久 | 午夜精品国产精品大乳美女 | 竹菊影视一区二区三区 | 扒开伸进免费视频 | 中文字幕天堂在线 | 国产又黄又硬又粗 | 涩涩涩涩涩涩涩涩涩涩 | 天堂va欧美ⅴa亚洲va一国产 | 欧美激情伊人 | 大乳护士喂奶hd | 粗大的内捧猛烈进出 | 国产一级网站 | 中文字幕成人 | va在线播放 | 亚洲区一区 |