RabbitMQ教程大全看这一篇就够了-java版本
目錄
什么是RabbitMQ?
RabbitMQ 核心概念
Docker 安裝 RabbitMQ?
RabbitMQ 控制臺頁面介紹
RabbitMQ 交換機(jī)?Exchange 介紹
Direct Exchange 定向、直連交換機(jī)
Fanout Exchange 發(fā)布/訂閱、廣播、扇形交換機(jī)
Topic Exchange 主題、通配符交換機(jī)
Headers Exchanges(少用)
RabbitMQ 代碼 Java版
1:簡單隊列
2:工作隊列?Work Queues?
3:Fanout 發(fā)布/訂閱 交換機(jī)模式
4:Routing 路由 交換機(jī)模式
5:Topic?主題 交換機(jī)模式
SpringBoot 整合 RabbitMQ
RabbitMQ 消息可靠性投遞
生產(chǎn)者到交換機(jī) 開啟ACK確認(rèn)可靠消息投遞
交換機(jī)到隊列 可靠消息投遞
RabbitMQ 消息確認(rèn)消費ACK
消息的可靠消費
SpringBoot 配置 RabbitMQ 廣播 發(fā)布/訂閱
RabbitMQ TTL死信隊列
什么是 TTL?
什么是死信隊列?
什么是死信交換機(jī)?
消息什么情況下會成為死信消息?
如何設(shè)置消息的TTL存活時間?
RabbitMQ 控制臺 操作 死信隊列綁定死信交換機(jī)
RabbitMQ 延遲隊列
什么是延遲隊列?
定時消息的使用場景
RabbitMQ 實現(xiàn)延遲消息
SpringBoot 實現(xiàn)延遲隊列
RabbitMQ 的集群環(huán)境
RabbitMQ 普通集群模式的介紹
RabbitMQ 搭建普通集群環(huán)境
1:準(zhǔn)備節(jié)點環(huán)境
2:節(jié)點配置成集群
3:SpringBoot 整合 RabbitMQ 普通集群
RabbitMQ 鏡像集群模式的介紹(推薦)
策略policy介紹
官網(wǎng):https://www.rabbitmq.com/
什么是RabbitMQ?
RabbitMQ是一個開源的AMQP實現(xiàn),服務(wù)器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、C、用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不錯,與SpringAMQP完美的整合、API豐富易用
RabbitMQ 核心概念
- Broker
RabbitMQ的服務(wù)端程序,可以認(rèn)為一個mq節(jié)點就是一個broker。
- Producer 生產(chǎn)者
創(chuàng)建消息Message,然后發(fā)布到RabbitMQ隊列中
- Consumer 消費者
消費隊列中的消息
- Message 消息
生產(chǎn)消費的內(nèi)容,有消息頭和消息體,也包括多個屬性配置,比如routingKey路由鍵
- Queue 隊列
是RabbitMQ的內(nèi)部對象,用于存儲消息,消息都只能存儲在隊列中
- Channel 信道
一條支持多路復(fù)用的通道,獨立的雙向數(shù)據(jù)流通道,可以發(fā)布、訂閱、接收消息。信道是建立在真實的TCP連接內(nèi)的虛擬連接,復(fù)用TCP連接的通道
- Connection 連接
是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯,一個連接上可以有多個channel進(jìn)行通信
- Exchange 交換器
生產(chǎn)者將消息發(fā)送到Exchange,交換器將消息路由到一個或者多個隊列中,隊列和交換機(jī)是多對多的關(guān)系。
- RoutingKey 路由鍵
生產(chǎn)者將消息發(fā)給交換器的時候,一般會指定一個RoutingKey,用來指定這個消息的路由規(guī)則
最大長度255 字節(jié)
- Binding 綁定
通過綁定將交換器與隊列關(guān)聯(lián)起來,在綁定的時候一般會指定一個綁定鍵 (BindingKey),這樣 RabbitMQ 就知道如何正確地將消息路由到隊列了
- Virtual host 虛擬主機(jī)
用于不同業(yè)務(wù)模塊的邏輯隔離,一個Virtual Host里面可以有若干個Exchange和Queue,同一個VirtualHost里面不能有相同名稱的Exchange或Queue
默認(rèn)是 / ,可以使用 /dev /test /pro
Docker 安裝 RabbitMQ?
使用源碼安裝需要的依賴多、且版本和維護(hù)相對復(fù)雜,需要erlang環(huán)境、版本也有要求。
linux 上安裝 docker
https://github.com/docker-library/docs/tree/master/rabbitmq
docker pull rabbitmq:management? ? ? ? ? ? ?
// 拉取遠(yuǎn)程鏡像,management 帶 后臺管理頁面的版本
docker images? ? ? ? ? ? ? ? ? ? ? ? ????????????????
// 查看本機(jī)存在的鏡像
docker run -d -h rabbitmq_1 --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management?
// 運行 docker 鏡像
參數(shù)說明:
- run -d :? run 運行鏡像 -d 后臺運行
- -h :自定義容器的主機(jī)名,它會被寫到容器內(nèi)的 /etc/hostname 和 /etc/hosts,作為容器主機(jī)IP的別名,并且將顯示在容器的bash中
- --name:自定義容器名稱
- -p 15672:15672 :management 界面管理訪問端口
- -p 5672:5672? ? ?:amqp 訪問端口
- -e rabbitma參數(shù)
rabbitmq 訪問地址:http://ip:15672? ? ? ? ? ? ? ? // 如果訪問不了,請查看防火墻端口是否開放
rabbitmq 默認(rèn)登錄賬號和密碼:guest/guest
開機(jī)自動啟動 rabbitmq
docker update 容器ID?--restart=always
rabbitma 的主要端口
4369 ????????erlang 發(fā)現(xiàn)口
5672 ????????client 端通信口
15672? ? ? ?管理界面 ui 端口
25672 ??????server 間內(nèi)部通信口
RabbitMQ 控制臺頁面介紹
RabbitMQ控制面板介紹 - 簡書
RabbitMQ 交換機(jī)?Exchange 介紹
- 生產(chǎn)者將消息發(fā)送到 Exchange,交換器將消息路由到一個或者多個隊列中,交換機(jī)有多個類型,隊列和交換機(jī)是多對多的關(guān)系。
- 交換機(jī)只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,如果沒有隊列和exchange綁定,或者沒有符合的路由規(guī)則,則消息會被丟失。
- RabbitMQ有四種交換機(jī)類型,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的不常用。
-
Direct Exchange 定向、直連交換機(jī)
- 將一個隊列綁定到交換機(jī)上,要求該消息與一個特定的路由鍵完全匹配
- 例子:如果一個隊列綁定到該交換機(jī)上要求路由鍵 “aabb”,則只有被標(biāo)記為“aabb”的消息才被轉(zhuǎn)發(fā),不會轉(zhuǎn)發(fā)aabb.cc,也不會轉(zhuǎn)發(fā)gg.aabb,只會轉(zhuǎn)發(fā)aabb
- 處理路由健
-
Fanout Exchange 發(fā)布/訂閱、廣播、扇形交換機(jī)
- 只需要簡單的將隊列綁定到交換機(jī)上,一個發(fā)送到交換機(jī)的消息都會被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息
- Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的,用于發(fā)布訂閱,廣播形式,中文是扇形
- 不處理路由健
-
Topic Exchange 主題、通配符交換機(jī)
- 主題交換機(jī)是一種發(fā)布/訂閱的模式,結(jié)合了直連交換機(jī)與扇形交換機(jī)的特點
- 將路由鍵和某模式進(jìn)行匹配。此時隊列需要綁定在一個模式上
- 符號“#”匹配一個或多個詞,符號“*”匹配只匹配一個詞
- 例子:因此“abc.#”能夠匹配到“abc.def.ghi”,但是“abc.*” 只會匹配到“abc.def”
-
Headers Exchanges(少用)
- 根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配, 在綁定Queue與Exchange時指定一組鍵值對
- 當(dāng)消息發(fā)送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進(jìn)行匹配
- 如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列
- 不處理路由鍵
RabbitMQ 代碼 Java版
maven項目中依賴rabbitmq的包
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version> </dependency>1:簡單隊列
官網(wǎng)教程:RabbitMQ tutorial - "Hello World!" — RabbitMQ
- 發(fā)送消息
執(zhí)行代碼,可以在 rabbitmq 控制臺上看到隊列已經(jīng)被創(chuàng)建了,并且有一條未被消費的消息
- 消費消息
?執(zhí)行代碼,可以在 rabbitmq 控制臺上看到隊列的消息已經(jīng)被消費了,并且可以看到 連接信息
2:工作隊列?Work Queues?
官網(wǎng)教程:RabbitMQ tutorial - Work Queues — RabbitMQ
例如:生產(chǎn)者一秒可以生產(chǎn) 一萬個消息,消費者一秒可以消費 一千個消息,這種情況如果只有一個消費者,消息就會堆積在隊列中。這時就需要部署多個消費者節(jié)點。
多個消費者負(fù)載均衡策略是 輪詢。
- 兩個消費者
- 一個生產(chǎn)者
先啟動2個消費者監(jiān)聽隊列,再啟動生產(chǎn)者生產(chǎn)消息。可以看到消息被輪詢消費
設(shè)置 多節(jié)點消費者負(fù)載均衡策略為:公平策略 (能者多勞)
Channel channel = connection.createChannel(); // 消費者設(shè)置 qos為 1, 一個消費完后繼續(xù)消費 channel.basicQos(1);3:Fanout 發(fā)布/訂閱 交換機(jī)模式
官網(wǎng)教程:https://www.rabbitmq.com/tutorials/tutorial-three-python.html
作用:生產(chǎn)者發(fā)布消息后,所有監(jiān)聽廣播類型指定交換機(jī)的的消費者都可以消費此消息。
- 2個或多個消費者
- 一個生產(chǎn)者
4:Routing 路由 交換機(jī)模式
官網(wǎng)教程:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
- 隊列和交換機(jī)(Direct)綁定,需要指定一個routingKey(也叫Bingding Key)
- 消息生產(chǎn)者發(fā)送消息給交換機(jī),需要指定routingKey
- 交換機(jī)根據(jù)消息的routingKey,轉(zhuǎn)發(fā)給對應(yīng)的隊列
示例:日志收集系統(tǒng)
- 一個隊列收集 error 日志
- 一個隊列收集 全部 日志
- 2個消費者,一個消費 error 消息,一個消費 全部 消息
- ?一個生產(chǎn)者
5:Topic?主題 交換機(jī)模式
官網(wǎng)教程:RabbitMQ tutorial - Topics — RabbitMQ
- Topic?可以實現(xiàn)發(fā)布訂閱模式Fanout 和 路由模式Direct 的功能,更加靈活,支持模式匹配,通配符等。
- 交換機(jī)通過通配符進(jìn)行轉(zhuǎn)發(fā)到對應(yīng)的隊列,* 代表一個詞,#代表1個或多個詞,一般用#作為通配符居多,比如 #.order, 會匹配 info.order 、sys.error.order, 而 *.order ,只會匹配 info.order, 使用.進(jìn)行分割多個詞。
- 注意:
- 交換機(jī)和隊列綁定時用的binding使用通配符的路由健
- 生產(chǎn)者發(fā)送消息時需要使用具體的路由健
示例:日志收集系統(tǒng)
- 一個隊列收集 error 日志
- 一個隊列收集 全部 日志
- 2個消費者,一個消費 error 消息,一個消費 全部 消息
- ?一個生產(chǎn)者
SpringBoot 整合 RabbitMQ
pom 文件中添加依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.5</version> </dependency>application.yml 文件配置 rabbitmq
注意:1:guest 賬號只能連本機(jī)的mq服務(wù),實際開發(fā)的時候請創(chuàng)建一個新的賬號。2:rabbitmq集成在maven聚合組件中,然后這個組件被其他服務(wù)依賴以此達(dá)到整合mq的方式的時候,application 文件的后綴名要是 .properties (rabbitmq 讀取不到 yml 后綴的配置)
spring:rabbitmq:host: 192.168.31.71port: 5672username: guestpassword: guestvirtual-host: /dev配置 交換機(jī)和隊列綁定的 Bean
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class RabbitmqConfig {// 自定義交換機(jī)名稱public static final String EXCHANGE_NAME = "order_exchange";// 自定義隊列名稱public static final String QUEUE_NAME = "order_queue";/*** 創(chuàng)建 topic 交換機(jī)*/@Bean(EXCHANGE_NAME) // 多個交換機(jī)時要指定交換機(jī)的Bean名稱public Exchange orderExchange() {return ExchangeBuilder// 指定 主題類型的交換機(jī) 名稱.topicExchange(EXCHANGE_NAME)// 是否持久化.durable(true).build();}/*** 創(chuàng)建持久化隊列*/@Bean(QUEUE_NAME) // 多個隊列時要指定隊里的Bean名稱public Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*** 隊列和交換機(jī)綁定*/@Beanpublic Binding orderBinding(@Qualifier(QUEUE_NAME) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange) {return BindingBuilder// 綁定的隊列.bind(queue)// 隊列綁定到 指定的交換機(jī).to(exchange)// 綁定的 routingKey.with("order.#")// 沒有其他參數(shù).noargs();} }發(fā)送消息
import com.lxx.rabbitmq.config.RabbitmqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {/** 發(fā)送消息* 參數(shù)1:要發(fā)送的交換機(jī)* 參數(shù)2:指定匹配的 routingKey* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 error消息");} }消費者監(jiān)聽隊列
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @RabbitListener(queues = "order_queue") // 消費者監(jiān)聽這個隊列 public class OrderMqListener {/*** 監(jiān)聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandlerpublic void messageHandler(String body, Message message) {System.err.println(message.getMessageProperties().getMessageId());System.err.println(message.getMessageProperties().getDeliveryTag());System.err.println(message.toString());System.err.println(" X 字符串消費者:" + body);}/*** 監(jiān)聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandlerpublic void messageHandler(Integer body, Message message) {System.err.println(" X 數(shù)字消費者:" + body);} }RabbitMQ 消息可靠性投遞
什么是消息的可靠投遞?
保證消息百分百發(fā)送到消息隊列中
如果確保消息的可靠投遞
消息生產(chǎn)者 需要接受到mq服務(wù)端 接受到消息的確認(rèn)應(yīng)答
完善的消息補(bǔ)償機(jī)制,發(fā)送失敗的消息可以再感知并二次處理
RabbitMQ消息投遞路徑:生產(chǎn)者->交換機(jī)->隊列->消費者
通過兩個的點控制消息的可靠性投遞
- 生產(chǎn)者到交換機(jī)
- 通過confirmCallback
- 交換機(jī)到隊列
- 通過returnCallback
-
生產(chǎn)者到交換機(jī) 開啟ACK確認(rèn)可靠消息投遞
appliction.yml 配置
spring:rabbitmq:# 開啟消息 confirm 二次確認(rèn)publisher-confirm-type: correlated消息監(jiān)聽代碼沒變化
發(fā)送消息,代碼如下:
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.err.println("ConfirmCallback ================");System.err.println(" ================ correlationData = " + correlationData);System.err.println(" ================ ack = " + ack);System.err.println(" ================ cause = " + cause);if (ack) {System.out.println("發(fā)送成功");// 更新數(shù)據(jù)庫消息狀態(tài)為 成功} else {System.err.println("發(fā)送失敗");// 更新數(shù)據(jù)庫消息狀態(tài)為 失敗}}});// 發(fā)送消息之前 ,數(shù)據(jù)庫新增一條消息記錄狀態(tài),狀態(tài)是 發(fā)送 TODO// 發(fā)送消息rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 ConfirmCallback 消息");// 模擬投遞失敗 // rabbitTemplate.convertAndSend("不存在的交換機(jī)", "order.error", "訂單 ConfirmCallback消息");} }-
交換機(jī)到隊列 可靠消息投遞
appliction.yml 配置
spring:rabbitmq:# 開啟 交換機(jī)到 隊列publisher-returns: truetemplate:# 為true,則交換機(jī)處理消息到路由失敗后,則會返回給生產(chǎn)者。 或者代碼 rabbitTemplate.setMandatory(true) 是一樣的效果mandatory: true消息監(jiān)聽代碼沒變化
發(fā)送消息,代碼如下:
package com.lxx.rabbitmq;import com.lxx.rabbitmq.config.RabbitmqConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {int replyCode = returned.getReplyCode();System.err.println("ReturnsCallback ================");System.err.println(" ================ code = " + replyCode);System.err.println(" ================ returned = " + returned.toString());}});// 發(fā)送消息rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "order.error", "訂單 ReturnsCallback 消息");// 模擬投遞失敗 // rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "不存在的routingKey", "訂單 ReturnsCallback 消息");}}RabbitMQ 消息確認(rèn)消費ACK
消費者從broker中監(jiān)聽到消息,要確保消息被正常處理。
RabbitMQ 消費者ACK介紹
- 消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋后才將此消息從隊列中刪除
- 消費者在處理消息出現(xiàn)了網(wǎng)絡(luò)不穩(wěn)定、服務(wù)器異常等現(xiàn)象,那么就不會有ACK反饋,RabbitMQ會認(rèn)為這個消息沒有正常消費,會將消息重新放入隊列中Ready
- 只有當(dāng)消費者正確發(fā)送ACK反饋,RabbitMQ確認(rèn)收到后,消息才會從RabbitMQ服務(wù)器的數(shù)據(jù)中刪除。
- 消息的ACK確認(rèn)機(jī)制默認(rèn)是開啟狀態(tài)自動ACK,消息如未被進(jìn)行ACK的消息確認(rèn)機(jī)制,這條消息被鎖定Unacked
-
消息的可靠消費
appliction.yml 配置
spring:rabbitmq:listener:simple:#開啟手動確認(rèn)消息,如果消息重新入隊,進(jìn)行重試acknowledge-mode: manualretry:enabled: true #是否開啟消費者重試max-attempts: 5 #最大重試次數(shù)initial-interval: 5000ms #重試間隔時間(單位毫秒)max-interval: 1200000ms #重試最大時間間隔(單位毫秒)multiplier: 2 #間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設(shè)置的最大間隔時間發(fā)送消息代碼沒變化
消息監(jiān)聽,代碼如下:
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = "order_queue") public class OrderMqListener {@RabbitHandlerpublic void messageHandler(String body, Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.err.println(deliveryTag);System.err.println(message.toString());System.err.println(" X 字符串消費者:" + body);// 告訴 broker 消息被正常消費 確認(rèn)ACKchannel.basicAck(deliveryTag, false);/** 告訴 broker,消息被消費后 拒絕確認(rèn)ACK* 參數(shù)一:deliveryTag,消息被投遞的次數(shù)* 參數(shù)二:是否批量 拒絕ACK,false 一條一條的拒絕ack* 參數(shù)上:是否重新投遞到隊列中*///channel.basicNack(deliveryTag, false, true); // 一次可以拒絕接收0個或多個//channel.basicReject(deliveryTag, true); // 一次只能拒絕接收一個消息} }SpringBoot 配置 RabbitMQ 廣播 發(fā)布/訂閱
一個消息生產(chǎn)者,多個消費者節(jié)點,共同消費同一條消息
配置 廣播 交換機(jī)和隊列綁定的 Bean
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class RabbitmqConfig {// 自定義交換機(jī)名稱public static final String EXCHANGE_NAME = "order_exchange";// 自定義隊列名稱public static final String QUEUE_NAME = "order_queue";/*** 創(chuàng)建 廣播 交換機(jī)*/@Bean(EXCHANGE_NAME)public FanoutExchange orderExchange() {return ExchangeBuilder.fanoutExchange(EXCHANGE_NAME).durable(true).build();}/*** 創(chuàng)建持久化隊列*/@Bean(QUEUE_NAME)public Queue orderQueue() {return QueueBuilder.durable(QUEUE_NAME).build();}/*** 隊列和交換機(jī)綁定*/@Beanpublic Binding orderBinding() {return BindingBuilder.bind(orderQueue()).to(orderExchange());} }發(fā)送消息
import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class RabbitmqTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void test() {/** 發(fā)送消息* 參數(shù)1:要發(fā)送的交換機(jī)* 參數(shù)2:廣播不要指定路由key* 參數(shù)3:要發(fā)送的消息*/rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME, "", "廣播消息");} }消費者監(jiān)聽,消費者可以多節(jié)點/集群部署,多節(jié)點可以消費同一條消息
import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;@Component public class OrderMqListener {/*** 監(jiān)聽到隊列有消息后,RabbitHandler 處理指定類型的消息** @param body 消息*/@RabbitHandler@RabbitListener(bindings = @QueueBinding(value = @Queue(), // 注意:此處不能指定隊列名稱。 如果指定隊列只能被一個消費者節(jié)點消費exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_NAME, type = ExchangeTypes.FANOUT)))public void messageHandler(String body) {System.err.println(" X 消息 :" + body);} }RabbitMQ TTL死信隊列
什么是 TTL?
- time to live 消息存活時間的意思。
- 如果消息在存活時間內(nèi)未被消費,則會被清除。
- RabbitMQ支持兩種ttl設(shè)置
- 整個隊列進(jìn)行配置ttl(居多)
- 單獨消息進(jìn)行配置ttl
什么是死信隊列?
用來存放 在存活時間內(nèi)未被消費消息 的隊列? ? ? ? // 過期消息不清楚,存放在此隊列?
什么是死信交換機(jī)?
Dead Letter Exchange(死信交換機(jī),縮寫:DLX)當(dāng)消息成為死信后,會被重新發(fā)送到另一個交換機(jī),這個交換機(jī)就是DLX死信交換機(jī)。
注意:死信隊列和死信交換機(jī) 與 普通隊列普通交換機(jī)沒區(qū)別。
消息什么情況下會成為死信消息?
- 消費者拒收消息(basic.reject/basic.nack),并且沒有重新入隊 requeue=false
- 消息在隊列中未被消費,且超過隊列或者消息本身的過期時間TTL(time-to-live)
- 隊列的消息長度達(dá)到極限
成為死信的結(jié)果:如果該隊列綁定了死信交換機(jī),則消息會被死信交換機(jī)重新路由到死信隊列
如何設(shè)置消息的TTL存活時間?
方式一:隊列過期,對整個隊列消息設(shè)置統(tǒng)一過期時間
x-message-ttl? ? ? 單位:ms毫秒
方式二:消息過期,對單個消息進(jìn)行設(shè)置
expiration??? ? ? ? ?單位:ms毫秒
注意:兩者都配置的話,時間短的先觸發(fā)。
RabbitMQ 控制臺 操作 死信隊列綁定死信交換機(jī)
-- 代碼操作和普通操作沒有不同,這里學(xué)習(xí)控制面板的操作
創(chuàng)建死信交換機(jī)
創(chuàng)建死信隊列
死信隊列和死信交換機(jī)綁定
新建普通隊列,設(shè)置隊列的過期時間。指定普通隊列對應(yīng)的死信交換機(jī)
向普通隊列 里發(fā)送消息,過期后,消息路由到 死信隊列
RabbitMQ 延遲隊列
什么是延遲隊列?
一種帶有延遲功能的消息隊列,Producer 將消息發(fā)送到消息隊列 服務(wù)端,但并不期望這條消息立馬投遞,而是推遲到在當(dāng)前時間點之后的某一個時間投遞到 Consumer 進(jìn)行消費,該消息即定時消息。
定時消息的使用場景
- 通過消息觸發(fā)一些定時任務(wù),比如在某一固定時間點向用戶發(fā)送提醒消息
- 用戶登錄之后5分鐘給用戶做分類推送、用戶多少天未登錄給用戶做召回推送;
- 消息生產(chǎn)和消費有時間窗口要求:比如在天貓電商交易中超時未支付關(guān)閉訂單的場景,在訂單創(chuàng)建時會發(fā)送一條 延時消息。這條消息將會在 30 分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應(yīng)的訂單是否已完成支付。 如支付未完成,則關(guān)閉訂單。如已完成支付則忽略
RabbitMQ 實現(xiàn)延遲消息
RabbitMQ本身是不支持延遲隊列的。需要結(jié)合死信隊列的特性,達(dá)到延遲消息的目的。
- 消息生產(chǎn)者
- 消息投遞到普通的交換機(jī)
- 消息過期,進(jìn)入死信隊列
- 消費消費者
- 消費者監(jiān)聽死信交換機(jī)的隊列
SpringBoot 實現(xiàn)延遲隊列
配置 死信交換機(jī)和死信隊列,配置普通交換機(jī)和普通隊列,配置普通隊列綁定到死信交換機(jī)
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component;@Component public class OrderTimeoutCloseConfig {// ==================================================死信隊列 start========================================================/*** 死信交換機(jī),訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE = "order_timeout_close_dead_exchange";/*** 死信隊列,訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_CLOSE_DEAD_QUEUE = "order_timeout_close_dead_queue";/*** 進(jìn)入死信隊列的路由key,訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_CLOSE_ROUTING_KEY = "order_timeout_close_routing_key";/*** 創(chuàng)建 死信 交換機(jī)*/@Bean(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)public TopicExchange orderTimeoutCloseDeadExchange() {return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE).durable(true).build();}/*** 創(chuàng)建 死信 隊列*/@Bean(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE)public Queue orderTimeoutCloseDeadQueue() {return QueueBuilder.durable(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE).build();}/*** 死信隊列和死信交換機(jī)綁定*/@Beanpublic Binding deadOrderTimeoutBinding(@Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) Queue queue, @Qualifier(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_TIMEOUT_CLOSE_ROUTING_KEY).noargs();}/** 死信隊列和死信交換機(jī)綁定,方式二*/ // @Bean // public Binding deadOrderTimeoutBinding() { // return new Binding( // ORDER_TIMEOUT_CLOSE_DEAD_QUEUE, // Binding.DestinationType.QUEUE, // ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE, // ORDER_TIMEOUT_CLOSE_ROUTING_KEY, // null // ); // }// ==================================================死信隊列 end========================================================// ==================================================普通隊列 start========================================================/*** 普通交換機(jī),訂單超時,用于進(jìn)入死信隊列*/public static final String ORDER_TIMEOUT_INTO_DEAD_EXCHANGE = "order_timeout_into_dead_exchange";/*** 普通隊列,訂單超時,用于進(jìn)入死信隊列*/public static final String ORDER_TIMEOUT_INTO_DEAD_QUEUE = "order_timeout_into_dead_queue";/*** 進(jìn)入普通隊列的路由key,訂單超時關(guān)閉*/public static final String ORDER_TIMEOUT_INTO_ROUTING_KEY = "order_timeout_into_routing_key";/*** 創(chuàng)建 普通 交換機(jī)*/@Bean(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE)public TopicExchange orderTimeoutIntoDeadExchange() {return ExchangeBuilder.topicExchange(ORDER_TIMEOUT_INTO_DEAD_EXCHANGE).durable(true).build();}/*** 創(chuàng)建 普通 隊列,普通隊列和死信隊列進(jìn)行綁定*/@Bean(ORDER_TIMEOUT_INTO_DEAD_QUEUE)public Queue orderTimeoutIntoDeadQueue() {/* // 方式一Map<String, Object> args = new HashMap<>(3);args.put("x-dead-letter-exchange", ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE); // 要綁定的死信交換機(jī)args.put("x-dead-letter-routing-key", ORDER_TIMEOUT_CLOSE_ROUTING_KEY); // 要綁定的死信 binding keyargs.put("x-message-ttl", 10000); // 普通隊列的消息過期時間,過期后 消息進(jìn)入死信隊列,單位:ms毫秒return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE).withArguments(args).build();*/// 方式二return QueueBuilder.durable(ORDER_TIMEOUT_INTO_DEAD_QUEUE)// 要綁定的死信交換機(jī).deadLetterExchange(ORDER_TIMEOUT_CLOSE_DEAD_EXCHANGE)// 要綁定的死信 binding key.deadLetterRoutingKey(ORDER_TIMEOUT_CLOSE_ROUTING_KEY)// 普通隊列的消息過期時間,過期后 消息進(jìn)入死信隊列,單位:ms毫秒.ttl(10000) // 這里測試指定10秒,正式情況可以指定30分鐘.build();}/** 普通隊列和普通交換機(jī)綁定*/@Beanpublic Binding orderTimeoutBinding() {return new Binding(ORDER_TIMEOUT_INTO_DEAD_QUEUE,Binding.DestinationType.QUEUE,ORDER_TIMEOUT_INTO_DEAD_EXCHANGE,ORDER_TIMEOUT_INTO_ROUTING_KEY,null);}// ==================================================普通隊列 end======================================================== }消費者,監(jiān)聽死信隊列
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component public class OrderTimeoutCloseMQListener {@RabbitHandler@RabbitListener(queues = OrderTimeoutCloseConfig.ORDER_TIMEOUT_CLOSE_DEAD_QUEUE) // 監(jiān)聽死信隊列public void messageHandler(String body, Message message, Channel channel) throws IOException {// 1:監(jiān)聽到 訂單消息,拿到訂單idSystem.err.println(" X 監(jiān)聽死信隊列收到消息 body = " + body);// 2:用訂單id,查詢數(shù)據(jù)庫訂單信息,如果訂單狀態(tài)是 已支付,這里不做操作// 3:如果訂單狀態(tài)是 未支付,把訂單狀態(tài)設(shè)置成 超時未支付channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }生產(chǎn)者,向普通隊列發(fā)送消息
import net.minidev.json.JSONObject; import org.junit.jupiter.api.Test; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap; import java.util.Map;@SpringBootTest class RabbitmqDemoApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 模擬下單成功*/@Testpublic void testBuy() {// 1:用戶下單把訂單信息存入數(shù)據(jù)庫,返回訂單id// 2:發(fā)送訂單id到 普通消息隊列Map<String, String> map = new HashMap<>();map.put("orderId", "123456789");rabbitTemplate.convertAndSend(OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_DEAD_EXCHANGE, OrderTimeoutCloseConfig.ORDER_TIMEOUT_INTO_ROUTING_KEY, JSONObject.toJSONString(map));} }RabbitMQ 的集群環(huán)境
RabbitMQ 普通集群模式的介紹
????????集群有 3 個節(jié)點,node1、node2、node3,三個節(jié)點有相同的元數(shù)據(jù)(交換機(jī)、隊列結(jié)構(gòu)),一個消息只存在一個節(jié)點上,不在其他節(jié)點同時存在。
例如:
????????A消息,存在node1節(jié)點上。A消息在node2、node3節(jié)點上不存在。
消費者監(jiān)聽node1節(jié)點可以直接消費到 A消息。假如消費者監(jiān)聽的是node2節(jié)點,那么rabbitmq 會把A消息被消費的時候才從 node1 節(jié)點取出放入到node2節(jié)點,然后node2節(jié)點再把消息轉(zhuǎn)發(fā)給消費者。
問題:
? ? ? ? 1:假如node1節(jié)點故障,那么node2無法獲取node1節(jié)點上未被消費的消息。
? ? ? ? 2:如果node1持久化后發(fā)生故障,消息需要等到node1恢復(fù)正常后才可以正常消費。
? ? ? ? 3:如果node1未做持久化發(fā)生故障,那么node1節(jié)點上的消息將會丟失。
應(yīng)用場景:
? ? ? ? 該模式適用于消息無需持久化的場景,例如日志傳輸隊列。
注意:集群需要保證每個節(jié)點有相同的token令牌。
消息持久化
RabbitMQ 搭建普通集群環(huán)境
1:準(zhǔn)備節(jié)點環(huán)境
3個節(jié)點的訪問 web控制臺訪問端口分別是:15671、15672、15673
準(zhǔn)備3個目錄,用于放 3個節(jié)點
/usr/local/rabbitmq/1 /usr/local/rabbitmq/2 /usr/local/rabbitmq/3創(chuàng)建 節(jié)點1
sudo docker run -d \ --name rabbitmq_1 \ -h rabbitmq_host1 \ -p 4361:4369 \ -p 5671:5672 \ -p 15671:15672 \ -p 25671:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --privileged=true \ -v /usr/local/rabbitmq/1/lib:/var/lib/rabbitmq/ \ rabbitmq:management創(chuàng)建 節(jié)點2
sudo docker run -d \ --name rabbitmq_2 \ -h rabbitmq_host2 \ -p 4362:4369 \ -p 5672:5672 \ -p 15672:15672 \ -p 25672:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --link rabbitmq_1:rabbitmq_host1 \ --privileged=true \ -v /usr/local/rabbitmq/2/lib:/var/lib/rabbitmq/ \ rabbitmq:management創(chuàng)建 節(jié)點3
sudo docker run -d \ --name rabbitmq_3 \ -h rabbitmq_host3 \ -p 4363:4369 \ -p 5673:5672 \ -p 15673:15672 \ -p 25673:25672 \ -e RABBITMQ_NODENAME='rabbit' \ -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie_lxx' \ --link rabbitmq_1:rabbitmq_host1 \ --link rabbitmq_2:rabbitmq_host2 \ --privileged=true \ -v /usr/local/rabbitmq/3/lib:/var/lib/rabbitmq/ \ rabbitmq:management參數(shù)說明:
- -e RABBITMQ_ERLANG_COOKIE:指定集群節(jié)點的cookie,節(jié)點的cookie要配置相同。?
- --link:容器互聯(lián),讓容器之前可以相互ping通
- --privileged:讓容器內(nèi)部的用戶有root權(quán)限,不然用戶對容器內(nèi)部的文件沒有操作權(quán)限permission denied
- -v :讓物理機(jī)路徑與容器里的路徑映射,容器里的路徑的數(shù)據(jù)會存儲在物理機(jī)上
節(jié)點完成后,可以訪問 http://ip:端口,查看2個節(jié)點是否創(chuàng)建成功。
如果容器啟動失敗,可以使用 docker logs 容器id 命令查看啟動日志。
2:節(jié)點配置成集群
配置之前查看節(jié)點狀態(tài),進(jìn)入容器內(nèi),使用命令:rabbitmqctl cluster_status
配置節(jié)點1
docker exec -it 節(jié)點1的容器名稱 /bin/bash? // 進(jìn)入啟動的docker容器內(nèi) rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務(wù),rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl start_app // 啟動 rabbitmq 服務(wù) exit // 退出 docker 容器配置節(jié)點2,加入集群
docker exec -it 節(jié)點2的容器名稱 /bin/bash? ? ? ? // 進(jìn)入啟動的docker容器內(nèi) rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務(wù),rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl join_cluster --ram 節(jié)點1的hostname // 加入集群 --ram 參數(shù)是以內(nèi)存的方式加入,不帶此參數(shù)默認(rèn)是磁盤的方式,節(jié)點1的hostname是:rabbit@rabbitmq_host1 rabbitmqctl start_app // 啟動 rabbitmq 服務(wù) exit // 退出 docker 容器配置節(jié)點3,加入集群
docker exec -it 節(jié)點3的容器名稱 /bin/bash? ? ? ? // 進(jìn)入啟動的docker容器內(nèi) rabbitmqctl stop_app? ? ? ? ? ? ? ? ? ? ? // 停止 rabbitmq 服務(wù),rabbitmqctl是rabbitmq的操作命令? rabbitmqctl reset // 重置 rabbitmq rabbitmqctl join_cluster --ram 節(jié)點1的hostname // 加入集群 --ram 參數(shù)是以內(nèi)存的方式加入,不帶此參數(shù)默認(rèn)是磁盤的方式,節(jié)點1的hostname是:rabbit@rabbitmq_host1 rabbitmqctl start_app // 啟動 rabbitmq 服務(wù) exit // 退出 docker 容器配置完成之后,可以在容器內(nèi)使用命令:rabbitmqctl cluster_status,查看集群狀態(tài),可以看到集群現(xiàn)在有3個節(jié)點在運行。1個磁盤節(jié)點,2個內(nèi)存節(jié)點
訪問網(wǎng)頁,可以看到有3個節(jié)點
消息隊列和交換機(jī)在所有節(jié)點上存在。消息只在自己的節(jié)點上存在,當(dāng)一個節(jié)點宕機(jī)后,宕機(jī)節(jié)點上的消息無法被消費(消息不可用)
3:SpringBoot 整合 RabbitMQ 普通集群
application.yml 文件配置 rabbitmq 集群地址,其他配置不變
spring:rabbitmq:listener:simple:acknowledge-mode: manualpublisher-returns: truetemplate:mandatory: truepublisher-confirm-type: correlated # host: 192.168.189.75 # port: 5672virtual-host: /devpassword: guestusername: guest### 配置節(jié)點地址addresses: 192.168.189.75:5671,192.168.189.75:5672,192.168.189.75:5673代碼操作,和上面的單節(jié)點的一樣正常的生產(chǎn)監(jiān)聽消息就行了,這里就不重復(fù)貼代碼了
RabbitMQ 鏡像集群模式的介紹(推薦)
????????隊列做成鏡像隊列,鏡像隊列中的消息在各個節(jié)點之間同步(A消息在各個節(jié)點中都存在)。
好處:
?? ?實現(xiàn)了高可用,部分節(jié)點宕機(jī)后,不影響消息的正常消費。
?? ?鏡像集群模式可以保證消息100%不丟失。適用于高可用要求高的需求,例如訂單服務(wù)。
缺點:
?? ?消息數(shù)量過多,大量的消息同步會加大網(wǎng)絡(luò)寬帶的消耗。節(jié)點越多服務(wù)器性能受影響越大
?? ?
注意:集群需要保證每個節(jié)點有相同的token令牌。
策略policy介紹
policy是用來控制和修改集群的vhost隊列和Exchange復(fù)制行為。哪些Exchange或者queue的數(shù)據(jù)需要復(fù)制、同步,以及如何復(fù)制同步。
- 創(chuàng)建一個policy策略
路徑:進(jìn)入rabbitmq控制臺 -> Admin -> Policies -> Add / update a policy
參數(shù)介紹:
Name:自定義策略名稱,建議不要使用空格
Pattern:用于匹配隊列/交換機(jī)的正則表達(dá)式,^ 符號,表示匹配所有
Apply to:應(yīng)用到交換機(jī)和隊列
Priority:優(yōu)先級。一個隊列/交換機(jī)只會有一個生效的 Policy,如果匹配多個 Policy,則優(yōu)先級數(shù)值最大的 Policy 生效。
Definition:JSON格式的一組鍵值對,表示設(shè)置的屬性,會被注入匹配隊列/交換機(jī)
- ha-mode:
- all:表示在集群中所有的節(jié)點上進(jìn)行鏡像同步(一般都用這個參數(shù))
- exactly:表示在指定個數(shù)的節(jié)點上進(jìn)行鏡像同步,節(jié)點的個數(shù)由ha-params指定
- nodes:表示在指定的節(jié)點上進(jìn)行鏡像同步,節(jié)點名稱通過ha-params指定
- ha-sync-mode:鏡像消息同步方式
- automatic: 自動(默認(rèn))
- manually:手動
policy策略創(chuàng)建完成后,鏡像隊列就配置成功了。可以看到隊列發(fā)生了如下變化
鏡像集群注意點:
- 集群啟動順序:先啟動磁盤節(jié)點 => 再啟動內(nèi)存節(jié)點
- 集群關(guān)閉順序:先關(guān)閉內(nèi)存節(jié)點 => 再關(guān)閉磁盤節(jié)點
- 最后關(guān)閉必須是磁盤節(jié)點,否則容易造成集群啟動失敗、數(shù)據(jù)丟失等異常情況
可以看到節(jié)點宕掉一個后,消息還是存在的
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ教程大全看这一篇就够了-java版本的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一分钟让你了解人脸识别套件中的双目、单目
- 下一篇: mediapipe bazel 编译问题