日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例

發布時間:2024/9/30 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 設置隊列ttl
    • 配置文件
    • 生產者
    • 消費者
  • 設置消息ttl
  • 延遲插件的使用
    • 修改配置文件
    • 修改生產者
    • 修改消費者


設置隊列ttl

代碼架構:

創建兩個隊列QA和QB,兩者隊列TTL分別設置為10S和40S,然后在創建一個交換機X和死信交換機Y,它們的類型都是direct,創建一個死信隊列QD

配置文件

spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ @Configuration public class Rabbitmqconfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD";// 聲明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE); } // 聲明xExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }//聲明隊列A ttl為10s并綁定到對應的死信交換機@Bean("queueA")public Queue queueA(){Map<String,Object> args = new HashMap<String,Object>();args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-routing-key","YD");args.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}@Beanpublic Binding queueaBingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")DirectExchange exchange){return BindingBuilder.bind(queueA).to(exchange).with("XA");//通過XA路由鍵讓交換機與隊列A綁定}//聲明隊列A ttl為40s并綁定到對應的死信交換機@Bean("queueB")public Queue queueB(){Map<String,Object> args = new HashMap<String,Object>();args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-routing-key","YD");args.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}@Beanpublic Binding queuebBingX(@Qualifier("queueB")Queue queueA,@Qualifier("xExchange")DirectExchange exchange){return BindingBuilder.bind(queueA).to(exchange).with("XB");//通過XB路由鍵讓交換機與隊列B綁定//這里隊列A和隊列B綁定的是同一個交換機}@Bean("queueD")public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);//聲明死信隊列}@Bean//死信隊列與交換機通過yd路由鍵綁定 這里隊列綁定的是y交換機而不是x交換機,上面兩個隊列綁定的是x交換機public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");} }

生產者

@RestController public class Producer { @Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message){System.out.println("當前時間"+new Date().toString()+" 發送的消息:"+message);rabbitTemplate.convertAndSend("X","XA","消息來自ttl為10的隊列"+message);rabbitTemplate.convertAndSend("X","XB","消息來自ttl為40的隊列"+message);} }

消費者

@Component public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")//配置文件已經聲明了死信隊列 Queue("QD");//聲明死信隊列public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println("當前時間:" +new Date().toString()+",收到死信隊列信息"+msg); } }

啟動項目:


控制臺:

設置消息ttl

上面是對隊列屬性設置了過期時間,但如果有很多數據需要設置不同的過期時間則需要很多隊列,這樣明顯浪費不必要的內存,這里也可以對消息設置不同過期時間:
再定義一個新隊列,這里隊列不再設置ttl屬性:

@Bean("queueC")public Queue queueC(){Map<String, Object> args = new HashMap<>();//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //聲明當前隊列的死信路由keyargs.put("x-dead-letter-routing-key", "YD"); //沒有聲明TTL屬性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//聲明隊列B綁定X交換機@Bean public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC"); }

修改生產者:

@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; });System.out.println("當前時間:{}"+ new Date().toString()+"發送一條時長"+ttlTime+"毫秒TTL信息給隊列C:"+ message); }

看起來似乎沒什么問題,但是在最開始的時候,就介紹過如果使用在消息屬性上設置TTL的方式,消息可能并不會按時“死亡“,因為RabbitMQ只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優先得到執行。

延遲插件的使用

官網https://www.rabbitmq.com/community-plugins.html
下載:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

允許使用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange


修改綁定關系:

修改配置文件

在我們自定義的交換機中,這是一種新的交換類型,該類型消息支持延遲投遞機制 消息傳遞后并不會立即投遞到目標隊列中,而是存儲在mnesia(一個分布式數據系統)表中,當達到投遞時間時,才投遞到目標隊列中。

@Configuration public class DelayedQueueConfig {//自定義交換機 我們在這里定義的是一個延遲交換機@Bean public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); //自定義交換機的類型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }

修改生產者

public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});}

修改消費者

public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message){ String msg = new String(message.getBody()); log.info("當前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg); }

小結:
延時隊列在需要延時處理的場景下非常有用,使用RabbitMQ來實現延時隊列可以很好的利用RabbitMQ的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延時隊列不可用或者消息丟失。
當然,延時隊列還有很多其它選擇,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特點,看需要適用的場景

總結

以上是生活随笔為你收集整理的【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲欧洲在线看 | 久久精品5 | 国产精品日韩欧美一区二区三区 | 久久久久久香蕉 | 99久久人妻无码中文字幕系列 | 亚洲成人偷拍 | 日本高清www免费视频 | 欧美精品99| 欧美日韩a级片 | 国产精品高潮呻吟 | 免费污视频 | 日韩av三级在线观看 | 国产成人精品a视频 | 久久9966 | 添女人荫蒂视频 | 国产69精品久久久久999小说 | 欧美色乱 | 古装做爰无遮挡三级聊斋艳谭 | 日本精品久久久久久 | 亚洲美女综合 | 潘金莲一级淫片aaaaa | www.色就是色 | 一区二区三区伦理片 | 素人一区二区三区 | 少妇偷人精品无码人妻 | 国产精品国语自产拍在线观看 | 日本日皮视频 | 日韩大片在线免费观看 | 成人在线免费视频 | 91精品欧美一区二区三区 | 亚洲成人高清在线 | 伊人草草| 2019中文字幕在线观看 | 福利91 | 美丽的姑娘观看在线播放 | 99热这里精品 | 久久免费黄色 | 在线观看成人免费 | 日本丰满少妇裸体自慰 | 国产农村av| 爱福利视频网 | 久久久免费高清视频 | 伊人久久久久久久久久久久久 | 精品熟女一区 | 骚虎免费视频 | 精品综合 | 久草视频在线播放 | 综合在线观看 | 久久视频在线免费观看 | 麻豆成人久久精品一区二区三区 | av伊人久久 | 992av| www.chengren| a级黄色片 | 强行挺进白丝老师里呻吟 | 免费无码国产v片在线观看 三级全黄做爰在线观看 | 亚洲国产在 | 亚洲国产精品久久精品怡红院 | 国产精品精 | 噼里啪啦免费看 | 精品欧美一区二区精品久久 | 午夜黄色av | 午夜色大片| 无码精品一区二区三区AV | 免费av不卡| 污到下面流水的视频 | 91精产国品一二三区在线观看 | 欧洲精品码一区二区三区免费看 | 法国空姐在线观看完整版 | 99热99这里只有精品 | 国产精品 色 | 国产香蕉尹人视频在线 | 美日韩精品一区二区 | 成人3d动漫一区二区三区91 | 韩国三级免费 | 一区二区视频在线看 | 日本大尺度吃奶做爰久久久绯色 | 四虎影院www| 99香蕉视频 | 一区二区三区免费看 | 深夜视频在线播放 | 6699嫩草久久久精品影院 | 精品人妻互换一区二区三区 | 日本成人在线不卡 | 国产一区999| 9i看片成人免费 | 国产一区二区三区久久 | 亚洲熟女乱色一区二区三区 | 超碰在线香蕉 | 自拍超碰在线 | 国产精品一区二区三区免费 | 亚洲最大成人在线 | 国产成人久久精品77777综合 | 国产欧美在线观看 | 欧美偷拍一区二区三区 | 一区二区三区四区影院 | 日本乱轮视频 | 天天摸天天碰 | 日本三级视频网站 |