日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

MQ延迟队列实现延迟消息

發(fā)布時(shí)間:2023/12/20 编程问答 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MQ延迟队列实现延迟消息 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

在開(kāi)發(fā)中經(jīng)常會(huì)遇到延時(shí)任務(wù)的需求,例如在12306購(gòu)買車票,若生成訂單30分鐘未支付則自動(dòng)取消;還有在線商城完成訂單后48小時(shí)不評(píng)價(jià) ,自動(dòng)5星好評(píng)。像這類在某事件觸發(fā)后一段時(shí)間內(nèi)執(zhí)行的需求任務(wù)我們稱之為?延時(shí)任務(wù)。

那么如何實(shí)現(xiàn)延遲任務(wù)呢?

第一反應(yīng)是利用cron方案來(lái)實(shí)現(xiàn):

啟動(dòng)一個(gè)cron定時(shí)任務(wù),每隔一段時(shí)間執(zhí)行一次,比如30分鐘,找到那些超時(shí)的數(shù)據(jù),直接更新?tīng)顟B(tài),或者拿出來(lái)執(zhí)行一些操作。如果數(shù)據(jù)量比較大,需要分頁(yè)查詢,分頁(yè)update,這將是一個(gè)for循環(huán)更新操作。

cron方案是很常見(jiàn)的一種方案,但是常見(jiàn)的不一定是最好的,主要有以下幾個(gè)問(wèn)題:

  • 當(dāng)數(shù)據(jù)量大的時(shí)候輪詢效率低;
  • 時(shí)效性不夠好,如果每小時(shí)輪詢一次,最差的情況時(shí)間誤差會(huì)達(dá)到1小時(shí);
  • 如果通過(guò)增加cron輪詢頻率來(lái)減少時(shí)間誤差,則會(huì)出現(xiàn)輪詢低效和重復(fù)計(jì)算的問(wèn)題;

既然cron方案不是很理想,那就請(qǐng)出我們今天的主角,使用RocketMQ的延時(shí)消息解決。在創(chuàng)建訂單的時(shí)候發(fā)送一條延時(shí)消息到RocketMQ,30分鐘后消費(fèi)者消費(fèi)消息去檢查訂單的狀態(tài),如果發(fā)現(xiàn)訂單未支付則取消訂單釋放庫(kù)存。

實(shí)現(xiàn)

RocketMQ延遲隊(duì)列的核心思路是:所有的延遲消息由producer發(fā)出之后,都會(huì)存放到同一個(gè)topic(SCHEDULE_TOPIC_XXXX)下,不同的延遲級(jí)別會(huì)對(duì)應(yīng)不同的隊(duì)列序號(hào),當(dāng)延遲時(shí)間到之后,由定時(shí)線程讀取轉(zhuǎn)換為普通的消息存的真實(shí)指定的topic下,此時(shí)對(duì)于consumer端此消息才可見(jiàn),從而被consumer消費(fèi)。

注意:?RocketMQ不支持任意時(shí)間的延時(shí),只支持以下幾個(gè)固定的延時(shí)等級(jí)

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我們結(jié)合SprintBoot利用RocketMQ發(fā)送延時(shí)消息

  • 引入RocketMQ組件
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>
  • 增加RocketMQ的配置

rocketmq:??

????????name-server:?172.31.0.44:9876??

????????producer:????

????????group:?delay-group

  • 編寫(xiě)生產(chǎn)者
@Component @Slf4j public?class?DelayProduce?{???@Autowired????private?RocketMQTemplate?rocketMQTemplatet;??public?void?sendDelayMessage(String?topic,String?message,int?delayLevel{???????SendResult?sendResult?=?rocketMQTemplatet.syncSend(topic,?MessageBuilder.withPayload(message).build(),?2000,?delayLevel);????????log.info("sendtime?is?{}",?DateTimeFormatter.ofPattern("yyyy年MM月dd 日?HH:mm:ss").format(LocalDateTime.now()));????????log.info("sendResult?is{}",sendResult);???? } }
  • 編寫(xiě)消費(fèi)者
@Slf4j@Component@RocketMQMessageListener(????????topic?=?"delay-topic",????????consumerGroup?=?"delay-group")public?class?DelayConsumer?implements?RocketMQListener<String>?{????@Override????public?void?onMessage(String?message)?{????????log.info("received?message?time?is?{}",?DateTimeFormatter.ofPattern("yyyy年MM月dd日?HH:mm:ss").format(LocalDateTime.now()));????????log.info("received?message?is?{}",message);????}}
  • 測(cè)試
@RunWith(SpringRunner.class) @SpringBootTest public?class?DelayProduceTest?{???? @Autowired???? private?DelayProduce?delayProduce;???? @Test???? public?void?sendDelayMessage()?{???????? delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知錄",5);???? }}

這里delayLevel設(shè)置成5,對(duì)應(yīng)RocketMQ的延時(shí)等級(jí)就是1分鐘后投遞消息。

  • 運(yùn)行結(jié)果

發(fā)送時(shí)間

消費(fèi)時(shí)間

修改延時(shí)級(jí)別

RocketMQ的延遲等級(jí)可以進(jìn)行修改,以滿足自己的業(yè)務(wù)需求,可以修改/添加新的level。例如:你想支持1天的延遲,修改最后一個(gè)level的值為1d,這個(gè)時(shí)候依然是18個(gè)level;也可以增加一個(gè)1d,這個(gè)時(shí)候總共就有19個(gè)level。

  • 打開(kāi)RocketMQ的配置文件,修改messageDelayLevel?屬性

brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHstorePathRootDir = /app/rocketmq/datamessageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

這次將延時(shí)等級(jí)1修改成了90s,生產(chǎn)者發(fā)送消息后需要90s后再進(jìn)行消息投遞。修改完成后重啟RocketMQ。

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

  • 使用延時(shí)等級(jí)1發(fā)送消息

public?void?sendDelayMessage()?{?delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知錄",1);}

  • 測(cè)試

發(fā)送時(shí)間

消費(fèi)時(shí)間

通過(guò)比對(duì)發(fā)送時(shí)間與消費(fèi)時(shí)間證明延時(shí)等級(jí)修改生效。

總結(jié)

以上是生活随笔為你收集整理的MQ延迟队列实现延迟消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。