【RabbitMQ】一文带你搞定RabbitMQ延迟队列
本文口味:魚(yú)香肉絲? ?預(yù)計(jì)閱讀:10分鐘
0|1一、說(shuō)明
在上一篇中,介紹了RabbitMQ中的死信隊(duì)列是什么,何時(shí)使用以及如何使用RabbitMQ的死信隊(duì)列。相信通過(guò)上一篇的學(xué)習(xí),對(duì)于死信隊(duì)列已經(jīng)有了更多的了解,這一篇的內(nèi)容也跟死信隊(duì)列息息相關(guān),如果你還不了解死信隊(duì)列,那么建議你先進(jìn)行上一篇文章的閱讀。
這一篇里,我們將繼續(xù)介紹RabbitMQ的高級(jí)特性,通過(guò)本篇的學(xué)習(xí),你將收獲:
0|1二、本文大綱
以下是本文大綱:
本文閱讀前,需要對(duì)RabbitMQ以及死信隊(duì)列有一個(gè)簡(jiǎn)單的了解。
0|1三、什么是延時(shí)隊(duì)列
延時(shí)隊(duì)列,首先,它是一種隊(duì)列,隊(duì)列意味著內(nèi)部的元素是有序的,元素出隊(duì)和入隊(duì)是有方向性的,元素從一端進(jìn)入,從另一端取出。
其次,延時(shí)隊(duì)列,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,跟普通的隊(duì)列不一樣的是,普通隊(duì)列中的元素總是等著希望被早點(diǎn)取出處理,而延時(shí)隊(duì)列中的元素則是希望被在指定時(shí)間得到取出和處理,所以延時(shí)隊(duì)列中的元素是都是帶時(shí)間屬性的,通常來(lái)說(shuō)是需要被處理的消息或者任務(wù)。
簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。
0|1四、延時(shí)隊(duì)列使用場(chǎng)景
那么什么時(shí)候需要用延時(shí)隊(duì)列呢?考慮一下以下場(chǎng)景:
這些場(chǎng)景都有一個(gè)特點(diǎn),需要在某個(gè)事件發(fā)生之后或者之前的指定時(shí)間點(diǎn)完成某一項(xiàng)任務(wù),如:發(fā)生訂單生成事件,在十分鐘之后檢查該訂單支付狀態(tài),然后將未支付的訂單進(jìn)行關(guān)閉;發(fā)生店鋪創(chuàng)建事件,十天后檢查該店鋪上新商品數(shù),然后通知上新數(shù)為0的商戶;發(fā)生賬單生成事件,檢查賬單支付狀態(tài),然后自動(dòng)結(jié)算未支付的賬單;發(fā)生新用戶注冊(cè)事件,三天后檢查新注冊(cè)用戶的活動(dòng)數(shù)據(jù),然后通知沒(méi)有任何活動(dòng)記錄的用戶;發(fā)生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發(fā)送消息給相關(guān)運(yùn)營(yíng)人員;發(fā)生預(yù)定會(huì)議事件,判斷離會(huì)議開(kāi)始是否只有十分鐘了,如果是,則通知各個(gè)與會(huì)人員。
看起來(lái)似乎使用定時(shí)任務(wù),一直輪詢數(shù)據(jù),每秒查一次,取出需要被處理的數(shù)據(jù),然后處理不就完事了嗎?如果數(shù)據(jù)量比較少,確實(shí)可以這樣做,比如:對(duì)于“如果賬單一周內(nèi)未支付則進(jìn)行自動(dòng)結(jié)算”這樣的需求,如果對(duì)于時(shí)間不是嚴(yán)格限制,而是寬松意義上的一周,那么每天晚上跑個(gè)定時(shí)任務(wù)檢查一下所有未支付的賬單,確實(shí)也是一個(gè)可行的方案。但對(duì)于數(shù)據(jù)量比較大,并且時(shí)效性較強(qiáng)的場(chǎng)景,如:“訂單十分鐘內(nèi)未支付則關(guān)閉“,短期內(nèi)未支付的訂單數(shù)據(jù)可能會(huì)有很多,活動(dòng)期間甚至?xí)_(dá)到百萬(wàn)甚至千萬(wàn)級(jí)別,對(duì)這么龐大的數(shù)據(jù)量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內(nèi)無(wú)法完成所有訂單的檢查,同時(shí)會(huì)給數(shù)據(jù)庫(kù)帶來(lái)很大壓力,無(wú)法滿足業(yè)務(wù)要求而且性能低下。
更重要的一點(diǎn)是,不!優(yōu)!雅!
沒(méi)錯(cuò),作為一名有追求的程序員,始終應(yīng)該追求更優(yōu)雅的架構(gòu)和更優(yōu)雅的代碼風(fēng)格,寫代碼要像寫詩(shī)一樣優(yōu)美。【滑稽】
這時(shí)候,延時(shí)隊(duì)列就可以閃亮登場(chǎng)了,以上場(chǎng)景,正是延時(shí)隊(duì)列的用武之地。
既然延時(shí)隊(duì)列可以解決很多特定場(chǎng)景下,帶時(shí)間屬性的任務(wù)需求,那么如何構(gòu)造一個(gè)延時(shí)隊(duì)列呢?接下來(lái),本文將介紹如何用RabbitMQ來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列。
0|1五、RabbitMQ中的TTL
在介紹延時(shí)隊(duì)列之前,還需要先介紹一下RabbitMQ中的一個(gè)高級(jí)特性——TTL(Time To Live)。
TTL是什么呢?TTL是RabbitMQ中一個(gè)消息或者隊(duì)列的屬性,表明一條消息或者該隊(duì)列中的所有消息的最大存活時(shí)間,單位是毫秒。換句話說(shuō),如果一條消息設(shè)置了TTL屬性或者進(jìn)入了設(shè)置TTL屬性的隊(duì)列,那么這條消息如果在TTL設(shè)置的時(shí)間內(nèi)沒(méi)有被消費(fèi),則會(huì)成為“死信”(至于什么是死信,請(qǐng)翻看上一篇)。如果同時(shí)配置了隊(duì)列的TTL和消息的TTL,那么較小的那個(gè)值將會(huì)被使用。
那么,如何設(shè)置這個(gè)TTL值呢?有兩種方式,第一種是在創(chuàng)建隊(duì)列的時(shí)候設(shè)置隊(duì)列的“x-message-ttl”屬性,如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
這樣所有被投遞到該隊(duì)列的消息都最多不會(huì)存活超過(guò)6s。
另一種方式便是針對(duì)每條消息設(shè)置TTL,代碼如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
這樣這條消息的過(guò)期時(shí)間也被設(shè)置成了6s。
但這兩種方式是有區(qū)別的,如果設(shè)置了隊(duì)列的TTL屬性,那么一旦消息過(guò)期,就會(huì)被隊(duì)列丟棄,而第二種方式,消息即使過(guò)期,也不一定會(huì)被馬上丟棄,因?yàn)橄⑹欠襁^(guò)期是在即將投遞到消費(fèi)者之前判定的,如果當(dāng)前隊(duì)列有嚴(yán)重的消息積壓情況,則已過(guò)期的消息也許還能存活較長(zhǎng)時(shí)間。
另外,還需要注意的一點(diǎn)是,如果不設(shè)置TTL,表示消息永遠(yuǎn)不會(huì)過(guò)期,如果將TTL設(shè)置為0,則表示除非此時(shí)可以直接投遞該消息到消費(fèi)者,否則該消息將會(huì)被丟棄。
0|1六、如何利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列
前一篇里介紹了如果設(shè)置死信隊(duì)列,前文中又介紹了TTL,至此,利用RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的兩大要素已經(jīng)集齊,接下來(lái)只需要將它們進(jìn)行調(diào)和,再加入一點(diǎn)點(diǎn)調(diào)味料,延時(shí)隊(duì)列就可以新鮮出爐了。
想想看,延時(shí)隊(duì)列,不就是想要消息延遲多久被處理嗎,TTL則剛好能讓消息在延遲多久之后成為死信,另一方面,成為死信的消息都會(huì)被投遞到死信隊(duì)列里,這樣只需要消費(fèi)者一直消費(fèi)死信隊(duì)列里的消息就萬(wàn)事大吉了,因?yàn)槔锩娴南⒍际窍M涣⒓刺幚淼南ⅰ?/p>
從下圖可以大致看出消息的流向:
生產(chǎn)者生產(chǎn)一條延時(shí)消息,根據(jù)需要延時(shí)時(shí)間的不同,利用不同的routingkey將消息路由到不同的延時(shí)隊(duì)列,每個(gè)隊(duì)列都設(shè)置了不同的TTL屬性,并綁定在同一個(gè)死信交換機(jī)中,消息過(guò)期后,根據(jù)routingkey的不同,又會(huì)被路由到不同的死信隊(duì)列中,消費(fèi)者只需要監(jiān)聽(tīng)對(duì)應(yīng)的死信隊(duì)列進(jìn)行處理即可。
下面來(lái)看代碼:
先聲明交換機(jī)、隊(duì)列以及他們的綁定關(guān)系:
@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea"; public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb"; public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey"; public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb"; // 聲明延時(shí)Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時(shí)隊(duì)列A 延時(shí)10s // 并綁定到對(duì)應(yīng)的死信交換機(jī) @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); // x-message-ttl 聲明隊(duì)列的TTL args.put("x-message-ttl", 6000); return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build(); } // 聲明延時(shí)隊(duì)列B 延時(shí) 60s // 并綁定到對(duì)應(yīng)的死信交換機(jī) @Bean("delayQueueB") public Queue delayQueueB(){ Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); // x-message-ttl 聲明隊(duì)列的TTL args.put("x-message-ttl", 60000); return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build(); } // 聲明死信隊(duì)列A 用于接收延時(shí)10s處理的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 聲明死信隊(duì)列B 用于接收延時(shí)60s處理的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 聲明延時(shí)隊(duì)列A綁定關(guān)系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY); } // 聲明業(yè)務(wù)隊(duì)列B綁定關(guān)系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY); } // 聲明死信隊(duì)列A綁定關(guān)系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 聲明死信隊(duì)列B綁定關(guān)系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }
接下來(lái),創(chuàng)建兩個(gè)消費(fèi)者,分別對(duì)兩個(gè)死信隊(duì)列的消息進(jìn)行消費(fèi):
@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},死信隊(duì)列A收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME) public void receiveB(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},死信隊(duì)列B收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
然后是消息的生產(chǎn)者:
@Component public class DelayMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String msg, DelayTypeEnum type){ switch (type){ case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg); break; } } }
接下來(lái),我們暴露一個(gè)web接口來(lái)生產(chǎn)消息:
@Slf4j @RequestMapping("rabbitmq") @RestController public class RabbitMQMsgController { @Autowired private DelayMessageSender sender; @RequestMapping("sendmsg") public void sendMsg(String msg, Integer delayType){ log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求,msg:{},delayType:{}", new Date(), msg, delayType); sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType))); } }
準(zhǔn)備就緒,啟動(dòng)!
打開(kāi)rabbitMQ的管理后臺(tái),可以看到我們剛才創(chuàng)建的交換機(jī)和隊(duì)列信息:
接下來(lái),我們來(lái)發(fā)送幾條消息,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1?http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2
日志如下:
2019-07-28 16:02:19.813 INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:02:19 CST 2019,收到請(qǐng)求,msg:testMsg1,delayType:1 2019-07-28 16:02:19.815 INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started 2019-07-28 16:02:25.829 INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:02:25 CST 2019,死信隊(duì)列A收到消息:testMsg1 2019-07-28 16:02:41.326 INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:02:41 CST 2019,收到請(qǐng)求,msg:testMsg2,delayType:2 2019-07-28 16:03:41.329 INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:03:41 CST 2019,死信隊(duì)列B收到消息:testMsg2
第一條消息在6s后變成了死信消息,然后被消費(fèi)者消費(fèi)掉,第二條消息在60s之后變成了死信消息,然后被消費(fèi)掉,這樣,一個(gè)還算ok的延時(shí)隊(duì)列就打造完成了。
不過(guò),等等,如果這樣使用的話,豈不是每增加一個(gè)新的時(shí)間需求,就要新增一個(gè)隊(duì)列,這里只有6s和60s兩個(gè)時(shí)間選項(xiàng),如果需要一個(gè)小時(shí)后處理,那么就需要增加TTL為一個(gè)小時(shí)的隊(duì)列,如果是預(yù)定會(huì)議室然后提前通知這樣的場(chǎng)景,豈不是要增加無(wú)數(shù)個(gè)隊(duì)列才能滿足需求??
嗯,仔細(xì)想想,事情并不簡(jiǎn)單。
0|1七、RabbitMQ延時(shí)隊(duì)列優(yōu)化
顯然,需要一種更通用的方案才能滿足需求,那么就只能將TTL設(shè)置在消息屬性里了。我們來(lái)試一試。
增加一個(gè)延時(shí)隊(duì)列,用于接收設(shè)置為任意延時(shí)時(shí)長(zhǎng)的消息,增加一個(gè)相應(yīng)的死信隊(duì)列和routingkey:
@Configuration public class RabbitMQConfig { public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange"; public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec"; public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey"; public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey"; public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec"; // 聲明延時(shí)Exchange @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE_NAME); } // 聲明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 聲明延時(shí)隊(duì)列C 不設(shè)置TTL // 并綁定到對(duì)應(yīng)的死信交換機(jī) @Bean("delayQueueC") public Queue delayQueueC(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 這里聲明當(dāng)前隊(duì)列綁定的死信交換機(jī) args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 這里聲明當(dāng)前隊(duì)列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build(); } // 聲明死信隊(duì)列C 用于接收延時(shí)任意時(shí)長(zhǎng)處理的消息 @Bean("deadLetterQueueC") public Queue deadLetterQueueC(){ return new Queue(DEAD_LETTER_QUEUEC_NAME); } // 聲明延時(shí)列C綁定關(guān)系 @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier("delayExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY); } // 聲明死信隊(duì)列C綁定關(guān)系 @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY); } }
增加一個(gè)死信隊(duì)列C的消費(fèi)者:
@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME) public void receiveC(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},死信隊(duì)列C收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
再次啟動(dòng)!然后訪問(wèn):http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000?來(lái)生產(chǎn)消息,注意這里的單位是毫秒。
2019-07-28 16:45:07.033 INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:45:07 CST 2019,收到請(qǐng)求,msg:testMsg1,delayTime:5000 2019-07-28 16:45:11.694 INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:45:11 CST 2019,收到請(qǐng)求,msg:testMsg2,delayTime:5000 2019-07-28 16:45:12.048 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:45:12 CST 2019,死信隊(duì)列C收到消息:testMsg1 2019-07-28 16:45:16.709 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:45:16 CST 2019,死信隊(duì)列C收到消息:testMsg2
看起來(lái)似乎沒(méi)什么問(wèn)題,但不要高興的太早,在最開(kāi)始的時(shí)候,就介紹過(guò),如果使用在消息屬性上設(shè)置TTL的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)镽abbitMQ只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,索引如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,則第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。
實(shí)驗(yàn)一下:
2019-07-28 16:49:02.957 INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:49:02 CST 2019,收到請(qǐng)求,msg:longDelayedMsg,delayTime:20000 2019-07-28 16:49:10.671 INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 16:49:10 CST 2019,收到請(qǐng)求,msg:shortDelayedMsg,delayTime:2000 2019-07-28 16:49:22.969 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:longDelayedMsg 2019-07-28 16:49:22.970 INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 16:49:22 CST 2019,死信隊(duì)列C收到消息:shortDelayedMsg
我們先發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為20s的消息,然后發(fā)了一個(gè)延時(shí)時(shí)長(zhǎng)為2s的消息,結(jié)果顯示,第二個(gè)消息會(huì)在等第一個(gè)消息成為死信后才會(huì)“死亡“。
0|1八、利用RabbitMQ插件實(shí)現(xiàn)延遲隊(duì)列
上文中提到的問(wèn)題,確實(shí)是一個(gè)硬傷,如果不能實(shí)現(xiàn)在消息粒度上添加TTL,并使其在設(shè)置的TTL時(shí)間及時(shí)死亡,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列。
那如何解決這個(gè)問(wèn)題呢?不要慌,安裝一個(gè)插件即可:Community Plugins — RabbitMQ?,下載rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄。
接下來(lái),進(jìn)入RabbitMQ的安裝目錄下的sbin目錄,執(zhí)行下面命令讓該插件生效,然后重啟RabbitMQ。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后,我們?cè)俾暶鲙讉€(gè)Bean:
@Configuration public class DelayedRabbitMQConfig { public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue"; public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange"; public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey"; @Bean public Queue immediateQueue() { return new Queue(DELAYED_QUEUE_NAME); } @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue, @Qualifier("customExchange") CustomExchange customExchange) { return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
controller層再添加一個(gè)入口:
@RequestMapping("delayMsg2") public void delayMsg2(String msg, Integer delayTime) { log.info("當(dāng)前時(shí)間:{},收到請(qǐng)求,msg:{},delayTime:{}", new Date(), msg, delayTime); sender.sendDelayMsg(msg, delayTime); }
消息生產(chǎn)者的代碼也需要修改:
public void sendDelayMsg(String msg, Integer delayTime) { rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{ a.getMessageProperties().setDelay(delayTime); return a; }); }
最后,再創(chuàng)建一個(gè)消費(fèi)者:
@RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("當(dāng)前時(shí)間:{},延時(shí)隊(duì)列收到消息:{}", new Date().toString(), msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
一切準(zhǔn)備就緒,啟動(dòng)!然后分別訪問(wèn)以下鏈接:
http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000 http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000
日志如下:
2019-07-28 17:28:13.729 INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 17:28:13 CST 2019,收到請(qǐng)求,msg:msg1,delayTime:20000 2019-07-28 17:28:20.607 INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController : 當(dāng)前時(shí)間:Sun Jul 28 17:28:20 CST 2019,收到請(qǐng)求,msg:msg2,delayTime:2000 2019-07-28 17:28:22.624 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 17:28:22 CST 2019,延時(shí)隊(duì)列收到消息:msg2 2019-07-28 17:28:33.751 INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer : 當(dāng)前時(shí)間:Sun Jul 28 17:28:33 CST 2019,延時(shí)隊(duì)列收到消息:msg1
第二個(gè)消息被先消費(fèi)掉了,符合預(yù)期。至此,RabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的部分就完結(jié)了。
0|1九、總結(jié)
延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用RabbitMQ來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過(guò)RabbitMQ集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。
當(dāng)然,延時(shí)隊(duì)列還有很多其它選擇,比如利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的時(shí)間輪,這些方式各有特點(diǎn),但就像爐石傳說(shuō)一般,這些知識(shí)就好比手里的卡牌,知道的越多,可以用的卡牌也就越多,遇到問(wèn)題便能游刃有余,所以需要大量的知識(shí)儲(chǔ)備和經(jīng)驗(yàn)積累才能打造出更出色的卡牌組合,讓自己解決問(wèn)題的能力得到更好的提升。
但另一方面,隨著時(shí)間的流逝和閱歷的增長(zhǎng),越來(lái)越感覺(jué)到自己的能力有限,無(wú)法獨(dú)自面對(duì)紛繁復(fù)雜且多變的業(yè)務(wù)需求,在很多方面需要其他人的協(xié)助才能很好的完成任務(wù)。也知道聞道有先后,術(shù)業(yè)有專攻,不會(huì)再狂妄自大,覺(jué)得自己能把所有事情都搞定,也將重心慢慢轉(zhuǎn)移到研究如何有效的進(jìn)行團(tuán)隊(duì)合作上來(lái),我相信一個(gè)高度協(xié)調(diào)的團(tuán)隊(duì)永遠(yuǎn)比一個(gè)人戰(zhàn)斗要更有價(jià)值。
花了一個(gè)周末的時(shí)間完成了這篇文章,文中所有的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo如有需要可以自行查閱,希望能對(duì)你有幫助,如果有錯(cuò)誤的地方,歡迎指正,也歡迎關(guān)注我的公眾號(hào)進(jìn)行留言交流。
總結(jié)
以上是生活随笔為你收集整理的【RabbitMQ】一文带你搞定RabbitMQ延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 别克4s保养一次车多少钱?
- 下一篇: RabbitMQ自学之路(九)——Rab