日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java实现rabbitMQ延时队列详解以及spring-rabbit整合教程

發布時間:2025/3/17 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java实现rabbitMQ延时队列详解以及spring-rabbit整合教程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

在實際的業務中我們會遇見生產者產生的消息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支持延遲隊列功能,但是我們可以根據其特性Per-Queue Message TTL和?Dead Letter Exchanges實現延時隊列。也可以通過改特性設置消息的優先級。

1.Per-Queue Message TTL
RabbitMQ可以針對消息和隊列設置TTL(過期時間)。隊列中的消息過期時間(Time To Live, TTL)有兩種方法可以設置。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息進行單獨設置,每條消息TTL可以不同。如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead message,消費者將無法再收到該消息。
2.Dead Letter Exchanges
當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange。消息變成Dead Letter一向有以下幾種情況:
消息被拒絕(basic.reject or basic.nack)并且requeue=false
消息TTL過期
隊列達到最大長度
實際上就是設置某個隊列的屬性,當這個隊列中有Dead Letter時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange中去,進而被路由到另一個隊列,publish可以監聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支持的immediate參數中的向publish確認的功能。

雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設置了 TTL 值時不會產生任何問題,因為過期的 message 總是會出現在 queue 的頭部。當對每一條 message 設置了 TTL 時,過期的 message 可能會排隊于未過期 message 的后面,直到這些消息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計信息中被計算進去(例如,queue 中存在的 message 的數量)。對于第一種設置隊列TTL屬性的方法,一旦消息過期,就會從隊列中抹去,而第二種方法里,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期時在即將投遞到消費者之前判定的,為什么兩者得處理方法不一致?因為第一種方法里,隊列中已過期的消息肯定在隊列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期消息即可,而第二種方法里,每條消息的過期時間不同,如果要刪除所有過期消息,勢必要掃描整個隊列,所以不如等到此消息即將被消費時再判定是否過期,如果過期,再進行刪除。

?

一、在隊列上設置TTL

1.建立delay.exchange

這里Internal設置為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。

2.建立延時隊列(delay queue)

如上配置延時5min隊列(x-message-ttl=300000)

x-max-length:最大積壓的消息個數,可以根據自己的實際情況設置,超過限制消息不會丟失,會立即轉向delay.exchange進行投遞

x-dead-letter-exchange:設置為剛剛配置好的delay.exchange,消息過期后會通過delay.exchange進行投遞

這里不需要配置"dead letter routing key"否則會覆蓋掉消息發送時攜帶的routingkey,導致后面無法路由為剛才配置的delay.exchange

3.配置延時路由規則

需要延時的消息到exchange后先路由到指定的延時隊列

1)創建delaysync.exchange通過Routing key將消息路由到延時隊列

?

2.配置delay.exchange 將消息投遞到正常的消費隊列

?

配置完成。

下面使用代碼測試一下:

生產者:

?

  • package cn.slimsmart.study.rabbitmq.delayqueue.queue;
  • import java.io.IOException;
  • import com.rabbitmq.client.Channel;
  • import com.rabbitmq.client.Connection;
  • import com.rabbitmq.client.ConnectionFactory;
  • public class Producer {
  • private static String queue_name = "test.queue";
  • public static void main(String[] args) throws IOException {
  • ConnectionFactory factory = new ConnectionFactory();
  • factory.setHost("10.1.199.169");
  • factory.setUsername("admin");
  • factory.setPassword("123456");
  • Connection connection = factory.newConnection();
  • Channel channel = connection.createChannel();
  • // 聲明隊列
  • channel.queueDeclare(queue_name, true, false, false, null);
  • String message = "hello world!" + System.currentTimeMillis();
  • channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes());
  • System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
  • // 關閉頻道和連接
  • channel.close();
  • connection.close();
  • }
  • }

  • ?消費者: ?

  • package cn.slimsmart.study.rabbitmq.delayqueue.queue;
  • import com.rabbitmq.client.Channel;
  • import com.rabbitmq.client.Connection;
  • import com.rabbitmq.client.ConnectionFactory;
  • import com.rabbitmq.client.QueueingConsumer;
  • public class Consumer {
  • private static String queue_name = "test.queue";
  • public static void main(String[] args) throws Exception {
  • ConnectionFactory factory = new ConnectionFactory();
  • factory.setHost("10.1.199.169");
  • factory.setUsername("admin");
  • factory.setPassword("123456");
  • Connection connection = factory.newConnection();
  • Channel channel = connection.createChannel();
  • // 聲明隊列
  • channel.queueDeclare(queue_name, true, false, false, null);
  • QueueingConsumer consumer = new QueueingConsumer(channel);
  • // 指定消費隊列
  • channel.basicConsume(queue_name, true, consumer);
  • while (true) {
  • // nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
  • QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  • String message = new String(delivery.getBody());
  • System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
  • }
  • }
  • }
  • 二、在消息上設置TTL

    ?

    實現代碼:

    生產者:?

    ?

  • package cn.slimsmart.study.rabbitmq.delayqueue.message;
  • import java.io.IOException;
  • import java.util.HashMap;
  • import com.rabbitmq.client.AMQP;
  • import com.rabbitmq.client.Channel;
  • import com.rabbitmq.client.Connection;
  • import com.rabbitmq.client.ConnectionFactory;
  • public class Producer {
  • private static String queue_name = "message_ttl_queue";
  • public static void main(String[] args) throws IOException {
  • ConnectionFactory factory = new ConnectionFactory();
  • factory.setHost("10.1.199.169");
  • factory.setUsername("admin");
  • factory.setPassword("123456");
  • Connection connection = factory.newConnection();
  • Channel channel = connection.createChannel();
  • HashMap<String, Object> arguments = new HashMap<String, Object>();
  • arguments.put("x-dead-letter-exchange", "amq.direct");
  • arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
  • channel.queueDeclare("delay_queue", true, false, false, arguments);
  • // 聲明隊列
  • channel.queueDeclare(queue_name, true, false, false, null);
  • // 綁定路由
  • channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
  • String message = "hello world!" + System.currentTimeMillis();
  • // 設置延時屬性
  • AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
  • // 持久性 non-persistent (1) or persistent (2)
  • AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build();
  • // routingKey =delay_queue 進行轉發
  • channel.basicPublish("", "delay_queue", properties, message.getBytes());
  • System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis());
  • // 關閉頻道和連接
  • channel.close();
  • connection.close();
  • }
  • }

  • ?

    消費者:

    ?

  • package cn.slimsmart.study.rabbitmq.delayqueue.message;
  • import java.util.HashMap;
  • import com.rabbitmq.client.Channel;
  • import com.rabbitmq.client.Connection;
  • import com.rabbitmq.client.ConnectionFactory;
  • import com.rabbitmq.client.QueueingConsumer;
  • public class Consumer {
  • private static String queue_name = "message_ttl_queue";
  • public static void main(String[] args) throws Exception {
  • ConnectionFactory factory = new ConnectionFactory();
  • factory.setHost("10.1.199.169");
  • factory.setUsername("admin");
  • factory.setPassword("123456");
  • Connection connection = factory.newConnection();
  • Channel channel = connection.createChannel();
  • HashMap<String, Object> arguments = new HashMap<String, Object>();
  • arguments.put("x-dead-letter-exchange", "amq.direct");
  • arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey");
  • channel.queueDeclare("delay_queue", true, false, false, arguments);
  • // 聲明隊列
  • channel.queueDeclare(queue_name, true, false, false, null);
  • // 綁定路由
  • channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey");
  • QueueingConsumer consumer = new QueueingConsumer(channel);
  • // 指定消費隊列
  • channel.basicConsume(queue_name, true, consumer);
  • while (true) {
  • // nextDelivery是一個阻塞方法(內部實現其實是阻塞隊列的take方法)
  • QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  • String message = new String(delivery.getBody());
  • System.out.println("received message:" + message + ",date:" + System.currentTimeMillis());
  • }
  • }
  • }


  • ?

    spring-rabbit整合教程

    maven依賴:

    ?

  • <dependency>
  • <groupId>org.springframework.amqp</groupId>
  • <artifactId>spring-rabbit</artifactId>
  • <version>1.4.6.RELEASE</version>
  • </dependency>
  • spring配置文件(在文件頭部引入rabbit的命名空間和約束文件):
  • <?xml version="1.0" encoding="UTF-8"?>
  • <beans xmlns="http://www.springframework.org/schema/beans"
  • xmlns:aop="http://www.springframework.org/schema/aop"
  • xmlns:context="http://www.springframework.org/schema/context"
  • xmlns:mvc="http://www.springframework.org/schema/mvc"
  • xmlns:task="http://www.springframework.org/schema/task"
  • xmlns:tx="http://www.springframework.org/schema/tx"
  • xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  • xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  • xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  • http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  • http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
  • http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
  • http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd
  • http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  • http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
  • <!-- 定義Rabbit,指定連接工廠 -->
  • <rabbit:connection-factory id="connectionFactory" host="你的rabbitMQ服務的ip" virtual-host="/vhost名稱" username="用戶名" password="密碼" port="5672" />
  • <!-- MQ的管理,包括隊列、交換器等 -->
  • <rabbit:admin connection-factory="connectionFactory"/>
  • <!-- 定義Rabbit模板,指定連接工廠以及定義exchange -->
  • <rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory" />
  • <!-- queue 隊列聲明 -->
  • <!-- durable 是否持久化 ,exclusive 僅創建者可以使用的私有隊列,斷開后自動刪除 ,auto-delete 當所有消費端連接斷開后,是否自動刪除隊列 -->
  • <rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/>
  • <!-- 交換機定義 -->
  • <!-- direct-exchange 模式:消息與一個特定的路由器完全匹配,才會轉發; topic-exchange 模式:按規則轉發消息,最靈活 -->
  • <rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false">
  • <rabbit:bindings>
  • <!-- 設置消息Queue匹配的pattern (direct模式為key) -->
  • <rabbit:binding queue="my_queue" pattern="my_patt"/>
  • </rabbit:bindings>
  • </rabbit:topic-exchange>
  • <!-- 引入消費者 -->
  • <bean id="rabbitmqService" class="com.group.service.RabbitmqService" />
  • <!-- 配置監聽 消費者 acknowledeg = manual,auto,none -->
  • <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
  • <!-- queues 監聽隊列,多個用逗號分隔; ref 監聽器 -->
  • <rabbit:listener queue-names="my_queue" ref="rabbitmqService" method="test"/>
  • </rabbit:listener-container>
  • </beans>

  • 那么在項目中裝配amqpTemplate中就可以發送消息了

    ?

    ?

    總結

    以上是生活随笔為你收集整理的java实现rabbitMQ延时队列详解以及spring-rabbit整合教程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 色爱av| 日本r级电影在线观看 | heyzo北岛玲在线播放 | 色姐 | 日韩精品免费视频 | 日本黄色特级片 | 日韩视频福利 | 国产96视频 | 香蕉在线看| 综合激情在线 | 国产日韩成人 | 天堂色区 | 天天操天天干天天舔 | 中文字幕久久久 | 久草福利免费 | 成人人伦一区二区三区 | 2019日韩中文字幕mv | 波多野结衣50连登视频 | 日韩在线不卡 | 日本高清有码视频 | 调教丰满的已婚少妇在线观看 | 成人午夜小视频 | 中文字幕在线观看一区二区 | 91一区二区在线观看 | 少妇久久久久久久 | 偷拍xxxx| 91亚洲国产成人精品一区二区三 | 日韩经典中文字幕 | 中文字幕在线观看亚洲 | 一区小视频| 老色批永久免费网站www | 欧美性生活一级 | 亚洲欧美色图片 | 亚洲另类色图 | 欧美xxxx少妇| 日韩a级黄色片 | 无码人妻精品一区二区三区9厂 | www.中文字幕在线观看 | 52av在线| 亚洲黄色免费在线观看 | 美女露出粉嫩尿囗让男人桶 | 中文字幕系列 | 成人性生交免费看 | 国产精品99久久久久久久 | 欧美日韩久久婷婷 | 国产一级二级在线观看 | 91二区| 米奇影视第四色 | 西方av在线| 日日摸夜夜添狠狠添久久精品成人 | 亚洲人一区 | 一区二区三区视频免费视 | 国产123区| 国产在线视频你懂的 | 黄色一区二区三区 | 西野翔之公侵犯中文字幕 | 69视频一区二区 | 久久成人综合 | 国产成人久久精品流白浆 | 日韩精品视频免费播放 | 欧美美女性视频 | 日韩视频国产 | 澳门一级黄色片 | 欧美一a| 男人天堂黄色 | 波多野结衣久久久久 | aaa国产精品 | 在线观看免费高清视频 | 美女赤身免费网站 | 99在线看| 天天艹日日艹 | 日韩欧美一二三 | 影音先锋久久久 | 99这里 | 欧美精品1区 | 黄色片成年人 | 国产精品性 | 国产l精品国产亚洲区久久 午夜青青草 | 黄色一级片欧美 | 谁有毛片网址 | 超碰2019| 五月综合激情日本mⅴ | 亚洲九九视频 | 国产精品9191 | 国产偷人 | 亚洲精品97久久中文字幕无码 | 综合五月激情 | 国产精品久久久久电影 | 色人阁婷婷 | 中文字幕在线成人 | 香蕉视频污视频 | 欧美色炮 | 91口爆一区二区三区在线 | 国产99久久九九精品无码 | 三年中文在线观看免费观看 | 尤物视频在线观看免费 | 亚洲一区二区小说 | 国产深喉视频一区二区 | 夜夜看 |