日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 前端技术 > javascript >内容正文

javascript

Spring Boot(十四)RabbitMQ延迟队列

發(fā)布時(shí)間:2024/8/26 javascript 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spring Boot(十四)RabbitMQ延迟队列 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、前言

延遲隊(duì)列的使用場(chǎng)景:1.未按時(shí)支付的訂單,30分鐘過(guò)期之后取消訂單;2.給活躍度比較低的用戶(hù)間隔N天之后推送消息,提高活躍度;3.過(guò)1分鐘給新注冊(cè)會(huì)員的用戶(hù),發(fā)送注冊(cè)郵件等。

實(shí)現(xiàn)延遲隊(duì)列的方式有兩種:

  • 通過(guò)消息過(guò)期后進(jìn)入死信交換器,再由交換器轉(zhuǎn)發(fā)到延遲消費(fèi)隊(duì)列,實(shí)現(xiàn)延遲功能;
  • 使用rabbitmq-delayed-message-exchange插件實(shí)現(xiàn)延遲功能;
  • 注意: 延遲插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依賴(lài)Erlang/OPT 18.0及以上運(yùn)行環(huán)境。

    由于使用死信交換器相對(duì)曲折,本文重點(diǎn)介紹第二種方式,使用rabbitmq-delayed-message-exchange插件完成延遲隊(duì)列的功能。

    二、安裝延遲插件

    1.1 下載插件

    打開(kāi)官網(wǎng)下載:http://www.rabbitmq.com/community-plugins.html

    選擇相應(yīng)的對(duì)應(yīng)的版本“3.7.x”點(diǎn)擊下載。

    注意: 下載的是.zip的安裝包,下載完之后需要手動(dòng)解壓。

    1.2 安裝插件

    拷貝插件到Docker:

    docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/plugins

    RabbitMQ在Docker的安裝,請(qǐng)參照本系列的上一篇文章:http://www.apigo.cn/2018/09/11/springboot13/

    1.3 啟動(dòng)插件

    進(jìn)入docker內(nèi)部:

    docker exec -it rabbit /bin/bash

    開(kāi)啟插件:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    查詢(xún)安裝的所有插件:

    rabbitmq-plugins list

    安裝正常,效果如下圖:

    重啟RabbitMQ,使插件生效

    docker restart rabbit

    三、代碼實(shí)現(xiàn)

    3.1 配置隊(duì)列

    import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map;@Configuration public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默認(rèn)的交換機(jī)@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//參數(shù)二為類(lèi)型:必須是x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 綁定隊(duì)列到交換器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();} }

    3.2 發(fā)送消息

    import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發(fā)送時(shí)間:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});} }

    3.3 消費(fèi)消息

    import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時(shí)間:" + sdf.format(new Date()));System.out.println("消息內(nèi)容:" + msg);} }

    3.4 測(cè)試隊(duì)列

    import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat; import java.util.Date;@RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序執(zhí)行之后,再退出測(cè)試} }

    執(zhí)行結(jié)果如下:

    發(fā)送時(shí)間:2018-09-11 20:47:51 接收時(shí)間:2018-09-11 20:47:54 消息內(nèi)容:Hi Admin.

    完整代碼訪(fǎng)問(wèn)我的GitHub:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq

    四、總結(jié)

    到此為止我們已經(jīng)使用“rabbitmq-delayed-message-exchange”插件實(shí)現(xiàn)了延遲功能,但是需要注意的一點(diǎn)是,如果使用命令“rabbitmq-plugins disable rabbitmq_delayed_message_exchange”禁用了延遲插件,那么所有未發(fā)送的延遲消息都將丟失。

    轉(zhuǎn)載于:https://www.cnblogs.com/vipstone/p/9967649.html

    總結(jié)

    以上是生活随笔為你收集整理的Spring Boot(十四)RabbitMQ延迟队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。