日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

【RabbitMQ】一文带你搞定RabbitMQ延迟队列

發(fā)布時(shí)間:2024/9/20 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【RabbitMQ】一文带你搞定RabbitMQ延迟队列 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

本文口味:魚(yú)香肉絲? ?預(yù)計(jì)閱讀:10分鐘

0|1一、說(shuō)明

在上一篇中,介紹了RabbitMQ中的死信隊(duì)列是什么,何時(shí)使用以及如何使用RabbitMQ的死信隊(duì)列。相信通過(guò)上一篇的學(xué)習(xí),對(duì)于死信隊(duì)列已經(jīng)有了更多的了解,這一篇的內(nèi)容也跟死信隊(duì)列息息相關(guān),如果你還不了解死信隊(duì)列,那么建議你先進(jìn)行上一篇文章的閱讀。

這一篇里,我們將繼續(xù)介紹RabbitMQ的高級(jí)特性,通過(guò)本篇的學(xué)習(xí),你將收獲:

  • 什么是延時(shí)隊(duì)列
  • 延時(shí)隊(duì)列使用場(chǎng)景
  • RabbitMQ中的TTL
  • 如何利用RabbitMQ來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列
  • 0|1二、本文大綱

    以下是本文大綱:

    本文閱讀前,需要對(duì)RabbitMQ以及死信隊(duì)列有一個(gè)簡(jiǎn)單的了解。

    0|1三、什么是延時(shí)隊(duì)列

    延時(shí)隊(duì)列,首先,它是一種隊(duì)列,隊(duì)列意味著內(nèi)部的元素是有序的,元素出隊(duì)和入隊(duì)是有方向性的,元素從一端進(jìn)入,從另一端取出。

    其次,延時(shí)隊(duì)列,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,跟普通的隊(duì)列不一樣的是,普通隊(duì)列中的元素總是等著希望被早點(diǎn)取出處理,而延時(shí)隊(duì)列中的元素則是希望被在指定時(shí)間得到取出和處理,所以延時(shí)隊(duì)列中的元素是都是帶時(shí)間屬性的,通常來(lái)說(shuō)是需要被處理的消息或者任務(wù)。

    簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。

    0|1四、延時(shí)隊(duì)列使用場(chǎng)景

    那么什么時(shí)候需要用延時(shí)隊(duì)列呢?考慮一下以下場(chǎng)景:

  • 訂單在十分鐘之內(nèi)未支付則自動(dòng)取消。
  • 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。
  • 賬單在一周內(nèi)未支付,則自動(dòng)結(jié)算。
  • 用戶注冊(cè)成功后,如果三天內(nèi)沒(méi)有登陸則進(jìn)行短信提醒。
  • 用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
  • 預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議。
  • 這些場(chǎng)景都有一個(gè)特點(diǎn),需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;發(fā)生店鋪創(chuàng)建事件,十天后檢查該店鋪上新商品數(shù),然后通知上新數(shù)為0的商戶;發(fā)生賬單生成事件,檢查賬單支付狀態(tài),然后自動(dòng)結(jié)算未支付的賬單;發(fā)生新用戶注冊(cè)事件,三天后檢查新注冊(cè)用戶的活動(dòng)數(shù)據(jù),然后通知沒(méi)有任何活動(dòng)記錄的用戶;發(fā)生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發(fā)送消息給相關(guān)運(yùn)營(yíng)人員;發(fā)生預(yù)定會(huì)議事件,判斷離會(huì)議開(kāi)始是否只有十分鐘了,如果是,則通知各個(gè)與會(huì)人員。

    看起來(lái)似乎使用定時(shí)任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求,如果對(duì)于時(shí)間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個(gè)可行的方案。但對(duì)于數(shù)據(jù)量比較大,并且時(shí)效性較強(qiáng)的場(chǎng)景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多,活動(dòng)期間甚至?xí)_(dá)到百萬(wàn)甚至千萬(wàn)級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無(wú)法完成所有訂單的檢查,同時(shí)會(huì)給數(shù)據(jù)庫(kù)帶來(lái)很大壓力,無(wú)法滿足業(yè)務(wù)要求而且性能低下。

    更重要的一點(diǎn)是,不!優(yōu)!雅!

    沒(méi)錯(cuò),作為一名有追求的程序員,始終應(yīng)該追求更優(yōu)雅的架構(gòu)和更優(yōu)雅的代碼風(fēng)格,寫代碼要像寫詩(shī)一樣優(yōu)美。【滑稽】

    這時(shí)候,延時(shí)隊(duì)列就可以閃亮登場(chǎng)了,以上場(chǎng)景,正是延時(shí)隊(duì)列的用武之地。

    既然延時(shí)隊(duì)列可以解決很多特定場(chǎng)景下,帶時(shí)間屬性的任務(wù)需求,那么如何構(gòu)造一個(gè)延時(shí)隊(duì)列呢?接下來(lái),本文將介紹如何用RabbitMQ來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列。

    0|1五、RabbitMQ中的TTL

    在介紹延時(shí)隊(duì)列之前,還需要先介紹一下RabbitMQ中的一個(gè)高級(jí)特性——TTL(Time To Live)。

    TTL是什么呢?TTL是RabbitMQ中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒。換句話說(shuō),如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi),則會(huì)成為“死信”(至于什么是死信,請(qǐng)翻看上一篇)。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL,那么較小的那個(gè)值將會(huì)被使用。

    那么,如何設(shè)置這個(gè)TTL值呢?有兩種方式,第一種是在創(chuàng)建隊(duì)列的時(shí)候設(shè)置隊(duì)列的“x-message-ttl”屬性,如下:

    Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

    這樣所有被投遞到該隊(duì)列的消息都最多不會(huì)存活超過(guò)6s。

    另一種方式便是針對(duì)每條消息設(shè)置TTL,代碼如下:

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

    這樣這條消息的過(guò)期時(shí)間也被設(shè)置成了6s。

    但這兩種方式是有區(qū)別的,如果設(shè)置了隊(duì)列的TTL屬性,那么一旦消息過(guò)期,就會(huì)被隊(duì)列丟棄,而第二種方式,消息即使過(guò)期,也不一定會(huì)被馬上丟棄,因?yàn)橄⑹欠襁^(guò)期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過(guò)期的消息也許還能存活較長(zhǎng)時(shí)間。

    另外,還需要注意的一點(diǎn)是,如果不設(shè)置TTL,表示消息永遠(yuǎn)不會(huì)過(guò)期,如果將TTL設(shè)置為0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。

    0|1六、如何利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列

    前一篇里介紹了如果設(shè)置死信隊(duì)列,前文中又介紹了TTL,至此,利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩大要素已經(jīng)集齊,接下來(lái)只需要將它們進(jìn)行調(diào)和,再加入一點(diǎn)點(diǎn)調(diào)味料,延時(shí)隊(duì)列就可以新鮮出爐了。

    想想看,延時(shí)隊(duì)列,不就是想要消息延遲多久被處理嗎,TTL則剛好能讓消息在延遲多久之后成為死信,另一方面,成為死信的消息都會(huì)被投遞到死信隊(duì)列里,這樣只需要消費(fèi)者一直消費(fèi)死信隊(duì)列里的消息就萬(wàn)事大吉了,因?yàn)槔锩娴南⒍际窍M涣⒓刺幚淼南ⅰ?/p>

    從下圖可以大致看出消息的流向:

    生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據(jù)需要延時(shí)時(shí)間的不同,利用不同的routingkey將消息路由到不同的延時(shí)隊(duì)列,每個(gè)隊(duì)列都設(shè)置了不同的TTL屬性,并綁定在同一個(gè)死信交換機(jī)中,消息過(guò)期后,根據(jù)routingkey的不同,又會(huì)被路由到不同的死信隊(duì)列中,消費(fèi)者只需要監(jiān)聽(tīng)對(duì)應(yīng)的死信隊(duì)列進(jìn)行處理即可。

    下面來(lái)看代碼:

    先聲明交換機(jī)、隊(duì)列以及他們的綁定關(guān)系:

    @Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea"; public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb"; public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey"; public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb"; // 聲明延時(shí)Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時(shí)隊(duì)列A 延時(shí)10s // 并綁定到對(duì)應(yīng)的死信交換機(jī) @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); // x-message-ttl 聲明隊(duì)列的TTL args.put("x-message-ttl", 6000); return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build(); } // 聲明延時(shí)隊(duì)列B 延時(shí) 60s // 并綁定到對(duì)應(yīng)的死信交換機(jī) @Bean("delayQueueB") public Queue delayQueueB(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); // x-message-ttl 聲明隊(duì)列的TTL args.put("x-message-ttl", 60000); return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build(); } // 聲明死信隊(duì)列A 用于接收延時(shí)10s處理的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 聲明死信隊(duì)列B 用于接收延時(shí)60s處理的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 聲明延時(shí)隊(duì)列A綁定關(guān)系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY); } // 聲明業(yè)務(wù)隊(duì)列B綁定關(guān)系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY); } // 聲明死信隊(duì)列A綁定關(guān)系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 聲明死信隊(duì)列B綁定關(guān)系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }

    接下來(lái),創(chuàng)建兩個(gè)消費(fèi)者,分別對(duì)兩個(gè)死信隊(duì)列的消息進(jìn)行消費(fèi):

    @Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},死信隊(duì)列A收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},死信隊(duì)列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }

    然后是消息的生產(chǎn)者:

    @Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, DelayTypeEnum type){ switch (type){ case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg); break; } } }

    接下來(lái),我們暴露一個(gè)web接口來(lái)生產(chǎn)消息:

    @Slf4j @RequestMapping("rabbitmq") @RestController public class RabbitMQMsgController { @Autowired private DelayMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg, Integer delayType){ log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求,msg:{},delayType:{}", new Date(), msg, delayType); sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType))); } }

    準(zhǔn)備就緒,啟動(dòng)!

    打開(kāi)rabbitMQ的管理后臺(tái),可以看到我們剛才創(chuàng)建的交換機(jī)和隊(duì)列信息:

    接下來(lái),我們來(lái)發(fā)送幾條消息,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1?http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2

    日志如下:

    2019-07-28 16:02:19.813 INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:02:19 CST 2019,收到請(qǐng)求,msg:testMsg1,delayType:1 2019-07-28 16:02:19.815 INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started 2019-07-28 16:02:25.829 INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:02:25 CST 2019,死信隊(duì)列A收到消息:testMsg1 2019-07-28 16:02:41.326 INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:02:41 CST 2019,收到請(qǐng)求,msg:testMsg2,delayType:2 2019-07-28 16:03:41.329 INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:03:41 CST 2019,死信隊(duì)列B收到消息:testMsg2

    第一條消息在6s后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在60s之后變成了死信消息,然后被消費(fèi)掉,這樣,一個(gè)還算ok的延時(shí)隊(duì)列就打造完成了。

    不過(guò),等等,如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,這里只有6s和60s兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理,那么就需要增加TTL為一個(gè)小時(shí)的隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求??

    嗯,仔細(xì)想想,事情并不簡(jiǎn)單。

    0|1七、RabbitMQ延時(shí)隊(duì)列優(yōu)化

    顯然,需要一種更通用的方案才能滿足需求,那么就只能將TTL設(shè)置在消息屬性里了。我們來(lái)試一試。

    增加一個(gè)延時(shí)隊(duì)列,用于接收設(shè)置為任意延時(shí)時(shí)長(zhǎng)的消息,增加一個(gè)相應(yīng)的死信隊(duì)列和routingkey:

    @Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec"; public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey"; public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec"; // 聲明延時(shí)Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時(shí)隊(duì)列C 不設(shè)置TTL // 并綁定到對(duì)應(yīng)的死信交換機(jī) @Bean("delayQueueC") public Queue delayQueueC(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build(); } // 聲明死信隊(duì)列C 用于接收延時(shí)任意時(shí)長(zhǎng)處理的消息 @Bean("deadLetterQueueC") public Queue deadLetterQueueC(){ return new Queue(DEAD_LETTER_QUEUEC_NAME); } // 聲明延時(shí)列C綁定關(guān)系 @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY); } // 聲明死信隊(duì)列C綁定關(guān)系 @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY); } }

    增加一個(gè)死信隊(duì)列C的消費(fèi)者:

    @RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME) public void receiveC(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},死信隊(duì)列C收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }

    再次啟動(dòng)!然后訪問(wèn):http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000?來(lái)生產(chǎn)消息,注意這里的單位是毫秒。

    2019-07-28 16:45:07.033 INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:45:07 CST 2019,收到請(qǐng)求,msg:testMsg1,delayTime:5000 2019-07-28 16:45:11.694 INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:45:11 CST 2019,收到請(qǐng)求,msg:testMsg2,delayTime:5000 2019-07-28 16:45:12.048 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:45:12 CST 2019,死信隊(duì)列C收到消息:testMsg1 2019-07-28 16:45:16.709 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:45:16 CST 2019,死信隊(duì)列C收到消息:testMsg2

    看起來(lái)似乎沒(méi)什么問(wèn)題,但不要高興的太早,在最開(kāi)始的時(shí)候,就介紹過(guò),如果使用在消息屬性上設(shè)置TTL的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)镽abbitMQ只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,索引如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,則第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。

    實(shí)驗(yàn)一下:

    2019-07-28 16:49:02.957 INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:49:02 CST 2019,收到請(qǐng)求,msg:longDelayedMsg,delayTime:20000 2019-07-28 16:49:10.671 INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:49:10 CST 2019,收到請(qǐng)求,msg:shortDelayedMsg,delayTime:2000 2019-07-28 16:49:22.969 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:longDelayedMsg 2019-07-28 16:49:22.970 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:shortDelayedMsg

    我們先發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為20s的消息,然后發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為2s的消息,結(jié)果顯示,第二個(gè)消息會(huì)在等第一個(gè)消息成為死信后才會(huì)“死亡“。

    0|1八、利用RabbitMQ插件實(shí)現(xiàn)延遲隊(duì)列

    上文中提到的問(wèn)題,確實(shí)是一個(gè)硬傷,如果不能實(shí)現(xiàn)在消息粒度上添加TTL,并使其在設(shè)置的TTL時(shí)間及時(shí)死亡,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列。

    那如何解決這個(gè)問(wèn)題呢?不要慌,安裝一個(gè)插件即可:Community Plugins — RabbitMQ?,下載rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄。

    接下來(lái),進(jìn)入RabbitMQ的安裝目錄下的sbin目錄,執(zhí)行下面命令讓該插件生效,然后重啟RabbitMQ。

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    然后,我們?cè)俾暶鲙讉€(gè)Bean:

    @Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }

    controller層再添加一個(gè)入口:

    @RequestMapping("delayMsg2") public void delayMsg2(String msg, Integer delayTime) { log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求,msg:{},delayTime:{}", new Date(), msg, delayTime); sender.sendDelayMsg(msg, delayTime); }

    消息生產(chǎn)者的代碼也需要修改:

    public void sendDelayMsg(String msg, Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ a.getMessageProperties().setDelay(delayTime); return a; }); }

    最后,再創(chuàng)建一個(gè)消費(fèi)者:

    @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},延時(shí)隊(duì)列收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }

    一切準(zhǔn)備就緒,啟動(dòng)!然后分別訪問(wèn)以下鏈接:

    http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000 http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000

    日志如下:

    2019-07-28 17:28:13.729 INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 17:28:13 CST 2019,收到請(qǐng)求,msg:msg1,delayTime:20000 2019-07-28 17:28:20.607 INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 17:28:20 CST 2019,收到請(qǐng)求,msg:msg2,delayTime:2000 2019-07-28 17:28:22.624 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 17:28:22 CST 2019,延時(shí)隊(duì)列收到消息:msg2 2019-07-28 17:28:33.751 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 17:28:33 CST 2019,延時(shí)隊(duì)列收到消息:msg1

    第二個(gè)消息被先消費(fèi)掉了,符合預(yù)期。至此,RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的部分就完結(jié)了。

    0|1九、總結(jié)

    延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用RabbitMQ來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過(guò)RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。

    當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的時(shí)間輪,這些方式各有特點(diǎn),但就像爐石傳說(shuō)一般,這些知識(shí)就好比手里的卡牌,知道的越多,可以用的卡牌也就越多,遇到問(wèn)題便能游刃有余,所以需要大量的知識(shí)儲(chǔ)備和經(jīng)驗(yàn)積累才能打造出更出色的卡牌組合,讓自己解決問(wèn)題的能力得到更好的提升。

    但另一方面,隨著時(shí)間的流逝和閱歷的增長(zhǎng),越來(lái)越感覺(jué)到自己的能力有限,無(wú)法獨(dú)自面對(duì)紛繁復(fù)雜且多變的業(yè)務(wù)需求,在很多方面需要其他人的協(xié)助才能很好的完成任務(wù)。也知道聞道有先后,術(shù)業(yè)有專攻,不會(huì)再狂妄自大,覺(jué)得自己能把所有事情都搞定,也將重心慢慢轉(zhuǎn)移到研究如何有效的進(jìn)行團(tuán)隊(duì)合作上來(lái),我相信一個(gè)高度協(xié)調(diào)的團(tuán)隊(duì)永遠(yuǎn)比一個(gè)人戰(zhàn)斗要更有價(jià)值。

    花了一個(gè)周末的時(shí)間完成了這篇文章,文中所有的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查閱,希望能對(duì)你有幫助,如果有錯(cuò)誤的地方,歡迎指正,也歡迎關(guān)注我的公眾號(hào)進(jìn)行留言交流。

    總結(jié)

    以上是生活随笔為你收集整理的【RabbitMQ】一文带你搞定RabbitMQ延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 免费成人深夜在线观看 | 91精品综合久久久久久五月天 | 96国产在线| 2022av在线| 亚洲欧美一 | 欧美日韩国产图片 | 91激情视频在线观看 | 欧美无吗 | 精品人妻一区二区免费视频 | www麻豆| 日日噜| xxx在线播放 | 国模小黎自慰gogo人体 | 女久久| 日本综合色 | 永久免费看黄网站 | 国产欧美精品一区二区 | 免费在线视频观看 | 欧美精品一二三 | 精品少妇一区二区三区免费观 | 中文字幕不卡在线观看 | 好男人www日本 | 日日操狠狠干 | 亚洲精品视频导航 | 中文字幕一区二区在线观看视频 | 国产成人精品aa毛片 | 麻豆高清 | 黄色香蕉软件 | 天堂综合在线 | 日韩精品一区二区在线视频 | 热99这里只有精品 | 91porn破解版 | 国产日韩欧美成人 | 亚洲精品国产精品国自 | 中文字幕av久久爽 | 天天操天天看 | 成人aⅴ视频 | 淫片在线| 久久久久久久国产精品美女 | 无遮挡又爽又刺激的视频 | 国产婷婷色一区二区在线观看 | 大咪咪av | 四虎永久地址 | 一区二区三区四区av | 偷拍网亚洲| 在线视频欧美日韩 | 精品人妻互换一区二区三区 | 久久成人在线视频 | 国产激情视频一区二区 | 欧美日p视频 | 69天堂| 一级伦理农村妇女愉情 | 韩国一二三区 | 91社区视频| 精品国产三级a∨在线 | 久免费一级suv好看的国产 | 一区二区三区视频在线播放 | 欧美aaaaa | 91丨porny丨对白 | 亚洲人成色777777精品音频 | 激情在线观看视频 | 天天爽夜夜爽 | 日韩欧美精品久久 | 欧美性猛交xxxx乱大交3 | 亚洲午夜无码av毛片久久 | 邻居校草天天肉我h1v1 | 中文字幕av久久爽 | av在线免费观看不卡 | 碰碰97 | 天天综合天天 | 激情图片在线观看 | 亚洲情人网| 91青草视频| 老熟妇一区二区三区 | 中文字幕精品久久久久人妻红杏ⅰ | 国产精品久久久久aaaa | 伊人av网 | 精品无码人妻一区二区三区 | 婷婷综合另类小说色区 | 日本全黄裸体片 | av制服丝袜 | 亚洲风情亚aⅴ在线发布 | 成人国产精品免费 | 色四虎| 18禁网站免费无遮挡无码中文 | 国产亚洲精品美女 | 欧美大尺度视频 | 欧美精品在线看 | 日韩精品短片 | 国产av天堂无码一区二区三区 | 伊人福利在线 | 国产偷拍一区二区三区 | 狠狠爱五月婷婷 | 欧美视频四区 | 国产69精品久久久久777 | 寡妇高潮一级视频免费看 | 五月婷婷影院 | 欧美一卡二卡三卡四卡 | 一本—道久久a久久精品蜜桃 |