日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ 延迟队列,消息延迟推送

發布時間:2023/12/10 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ 延迟队列,消息延迟推送 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

應用場景

目前常見的應用軟件都有消息的延遲推送的影子,應用也極為廣泛,例如:

  • 淘寶七天自動確認收貨。在我們簽收商品后,物流系統會在七天后延時發送一個消息給支付系統,通知支付系統將款打給商家,這個過程持續七天,就是使用了消息中間件的延遲推送功能。
  • 12306 購票支付確認頁面。我們在選好票點擊確定跳轉的頁面中往往都會有倒計時,代表著 30 分鐘內訂單不確認的話將會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鐘,告訴訂單系統訂單未完成,如果我們在30分鐘內完成了訂單,則可以通過邏輯代碼判斷來忽略掉收到的消息。

在上面兩種場景中,如果我們使用下面兩種傳統解決方案無疑大大降低了系統的整體性能和吞吐量:

  • 使用 redis 給訂單設置過期時間,最后通過判斷 redis 中是否還有該訂單來決定訂單是否已經完成。這種解決方案相較于消息的延遲推送性能較低,因為我們知道 redis 都是存儲于內存中,我們遇到惡意下單或者刷單的將會給內存帶來巨大壓力。
  • 使用傳統的數據庫輪詢來判斷數據庫表中訂單的狀態,這無疑增加了IO次數,性能極低。
  • 使用 jvm 原生的 DelayQueue ,也是大量占用內存,而且沒有持久化策略,系統宕機或者重啟都會丟失訂單信息。

消息延遲推送的實現

首先我們創建交換機和消息隊列

import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;@Configuration public class MQConfig {public static final String LAZY_EXCHANGE = "Ex.LazyExchange";public static final String LAZY_QUEUE = "MQ.LazyQueue";public static final String LAZY_KEY = "lazy.#";@Beanpublic TopicExchange lazyExchange(){//Map<String, Object> pros = new HashMap<>();//設置交換機支持延遲消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);exchange.setDelayed(true);return exchange;}@Beanpublic Queue lazyQueue(){return new Queue(LAZY_QUEUE, true);}@Beanpublic Binding lazyBinding(){return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);} }復制代碼

我們在 Exchange 的聲明中可以設置exchange.setDelayed(true)來開啟延遲隊列,也可以設置為以下內容傳入交換機聲明的方法中,因為第一種方式的底層就是通過這種方式來實現的。

//Map<String, Object> pros = new HashMap<>();//設置交換機支持延遲消息推送//pros.put("x-delayed-message", "topic");TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);復制代碼

發送消息時我們需要指定延遲推送的時間,我們這里在發送消息的方法中傳入參數 new MessagePostProcessor() 是為了獲得 Message對象,因為需要借助 Message對象的api 來設置延遲時間。

import com.anqi.mq.config.MQConfig; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.Date;@Component public class MQSender {@Autowiredprivate RabbitTemplate rabbitTemplate;//confirmCallback returnCallback 代碼省略,請參照上一篇public void sendLazy(Object message){rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(confirmCallback);rabbitTemplate.setReturnCallback(returnCallback);//id + 時間戳 全局唯一CorrelationData correlationData = new CorrelationData("12345678909"+new Date());//發送消息時指定 header 延遲時間rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//設置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//message.getMessageProperties().setHeader("x-delay", "6000");message.getMessageProperties().setDelay(6000);return message;}}, correlationData);} }復制代碼

我們可以觀察 setDelay(Integer i)底層代碼,也是在 header 中設置 x-delay。等同于我們手動設置 header

message.getMessageProperties().setHeader("x-delay", "6000");

/*** Set the x-delay header.* @param delay the delay.* @since 1.6*/ public void setDelay(Integer delay) {if (delay == null || delay < 0) {this.headers.remove(X_DELAY);}else {this.headers.put(X_DELAY, delay);} }復制代碼

消費端進行消費

import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.Map;@Component public class MQReceiver {@RabbitListener(queues = "MQ.LazyQueue")@RabbitHandlerpublic void onLazyMessage(Message msg, Channel channel) throws IOException{long deliveryTag = msg.getMessageProperties().getDeliveryTag();channel.basicAck(deliveryTag, true);System.out.println("lazy receive " + new String(msg.getBody()));}復制代碼

測試結果

import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest @RunWith(SpringRunner.class) public class MQSenderTest {@Autowiredprivate MQSender mqSender;@Testpublic void sendLazy() throws Exception {String msg = "hello spring boot";mqSender.sendLazy(msg + ":");} }復制代碼

果然在 6 秒后收到了消息 lazy receive hello spring boot:

Java學習、面試;文檔、視頻資源免費獲取

轉載于:https://juejin.im/post/5cf7c2cd51882537465f29bf

總結

以上是生活随笔為你收集整理的RabbitMQ 延迟队列,消息延迟推送的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。