中间件-RabbitMQ学习笔记
目錄
- RabbitMQ概述
- RabbitMQ安裝Docker版
- RabbitMQ安裝集群版
- 集群搭建步驟
- 搭建鏡像隊列
- Haproxy+Keepalive 實現高可用負載均衡
- RabbitMQ消息確認機制-可靠抵達
- 可靠抵達-服務端確認(confirmCallback 、returnCallback )
- 可靠抵達-消費端確認(ack)
- RabbitMQ延時隊列
- 延時隊列實戰
- RabbitMQ死信隊列
- 死信隊列實戰
RabbitMQ概述
RabbitMQ 是一個消息中間件:它接受并轉發消息。你可以把它當做一個快遞站點,當你要發送一個包 裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那里,按照這種邏輯 RabbitMQ 是 一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ 與快遞站的主要區別在于,它不處理快件而是接收, 存儲和轉發消息數據。
四大核心概念
生產者
產生數據發送消息的程序是生產者
交換機
交換機是 RabbitMQ 非常重要的一個部件,一方面它接收來自生產者的消息,另一方面它將消息 推送到隊列中。交換機必須確切知道如何處理它接收到的消息,是將這些消息推送到特定隊列還是推 送到多個隊列,亦或者是把消息丟棄,這個得有交換機類型決定
隊列
隊列是 RabbitMQ 內部使用的一種數據結構,盡管消息流經 RabbitMQ 和應用程序,但它們只能存 儲在隊列中。隊列僅受主機的內存和磁盤限制的約束,本質上是一個大的消息緩沖區。許多生產者可 以將消息發送到一個隊列,許多消費者可以嘗試從一個隊列接收數據。這就是我們使用隊列的方式
消費者
消費與接收具有相似的含義。消費者大多時候是一個等待接收消息的程序。請注意生產者,消費 者和消息中間件很多時候并不在同一機器上。同一個應用程序既可以是生產者又是可以是消費者。
RabbitMQ工作原理
Broker:接收和分發消息的應用,RabbitMQ Server就是Message Broker
Virtual host:出于多租戶和安全因素設計的,把AMQP的基本組件劃分到一個虛擬的分組中,類似于網絡中的namespace概念。當多個不同的用戶使用同一個 RabbitMQ Server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange/queue等
Connection:publisher/consumer和 broker之間的TCP連接
Channel:如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection 內部建立的邏輯連接,如果應用程序支持多線程,通常每個thread創建單獨的channel進行通訊,AMQP method包含了channel id 幫助客戶端和message broker識別channel,所以channel之間是完全隔離的。Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷
Exchange:message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最終被送到這里等待consumer取走
Binding:exchange和queue之間的虛擬連接,binding中可以包含routing key,Binding信息被保存到exchange中的查詢表中,用于message的分發依據
RabbitMQ安裝Docker版
下載RabbitMQ并啟動
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management設置開機啟動RabbitMQ
docker update rabbitmq --restart=always訪問IP:15672端口,默認的登錄賬號密碼為guest
SHELL方式添加添加新用戶
WEB界面方式添加新用戶
RabbitMQ安裝集群版
如果RabbitMQ服務器遇到內存崩潰、機器掉電或者主板故障等情況,該怎么辦?單臺 RabbitMQ 服務器可以滿足每秒1000條消息的吞吐量,那么如果應用需要 RabbitMQ 服務滿足每秒10萬條消息的吞吐量呢?購買昂貴的服務器來增強單機RabbitMQ務的性能顯得捉襟見肘,搭建一個RabbitMQ集群才是解決實際問題的關鍵
集群搭建步驟
修改 3 臺機器的主機名稱,并重啟
vim /etc/hostname配置各個節點的 hosts 文件,讓各個節點都能互相識別對方
vim /etc/hosts 10.211.55.74 node1 10.211.55.75 node2 10.211.55.76 node3確保各個節點的 cookie 文件使用的是同一個值,在node1上執行遠程操作命令,將node1的cookie復制給node2和node3
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie啟動 RabbitMQ 服務,順帶啟動 Erlang 虛擬機和 RbbitMQ 應用服務(在三臺節點上分別執行以下命令)
rabbitmq-server -detached在節點 2 執行
rabbitmqctl stop_app (rabbitmqctl stop 會將Erlang 虛擬機關閉,rabbitmqctl stop_app 只關閉 RabbitMQ 服務) rabbitmqctl reset rabbitmqctl join_cluster rabbit@node1 rabbitmqctl start_app(只啟動應用服務)在節點 3 執行
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node2 (rabbitmqctl join_cluster rabbit@主機名稱 加哪個主機就指定哪個主機的名稱) rabbitmqctl start_app集群狀態
rabbitmqctl cluster_status重新設置超級管理員用戶,在一臺機器上運行即可,并使用新賬號訪問登錄
創建賬號 rabbitmqctl add_user admin admin 設置用戶角色 rabbitmqctl set_user_tags admin administrator 設置用戶權限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"解除集群節點(node2 和 node3 機器分別執行)
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmqctl cluster_status忘記集群
rabbitmqctl forget_cluster_node rabbit@node2(node1 機器上執行)搭建鏡像隊列
使用鏡像隊列的原因
如果RabbitMQ集群中只有一個Broker節點,那么該節點的失效將導致整體服務的臨時性不可用,并且也可能會導致消息的丟失。可以將所有消息都設置為持久化,并且對應隊列的durable屬性也設置為true,但是這樣仍然無法避免由于緩存導致的問題:因為消息在發送之后和被寫入磁盤井執行刷盤動作之間存在一個短暫卻會產生問題的時間窗。通過publisher confirm機制能夠確保客戶端知道哪些消息己經存入磁盤,盡管如此,一般不希望遇到因單點故障導致的服務不可用。引入鏡像隊列的機制,可以將隊列鏡像到集群中的其他Broker節點之上,如果集群中的一個節點失效了,隊列能自動地切換到鏡像中的另一個節點上以保證服務的可用性。
搭建步驟
啟動三臺集群節點
隨便找一個節點添加 policy策略
- Name:策略名稱,隨便起
- Pattern:正則規則,只有滿足該正則規則的隊列,才會起作用
- Apply to:應用于交換機還是隊列還是都應用
在 node1 上創建一個隊列發送一條消息,隊列存在鏡像隊列
停掉 node1 之后發現 node2 成為鏡像隊列
就算整個集群只剩下一臺機器了 依然能消費隊列里面的消息,說明隊列里面的消息被鏡像隊列傳遞到相應機器里面了
Haproxy+Keepalive 實現高可用負載均衡
HAProxy 提供高可用性、負載均衡及基于TCPHTTP 應用的代理,支持虛擬主機,它是免費、快速并 且可靠的一種解決方案,包括 Twitter,Reddit,StackOverflow,GitHub 在內的多家知名互聯網公司在使用。 HAProxy 實現了一種事件驅動、單一進程模型,此模型支持非常大的井發連接數。
高可用負載均衡搭建步驟
下載 haproxy(在 node1 和 node2)
yum -y install haproxy修改 node1 和 node2 的 haproxy.cfg
vim /etc/haproxy/haproxy.cfg需要修改紅色 IP 為當前機器 IP
在兩臺節點啟動 haproxy
haproxy -f /etc/haproxy/haproxy.cfg ps -ef | grep haproxy訪問地址http://10.211.55.71:8888/stats
RabbitMQ消息確認機制-可靠抵達
- 保證消息不丟失,可靠抵達,可以使用事務消息,性能下降250倍,為此引入確認機制
- publisher confirmCallback確認模式(觸發時機:服務端將消息發送給RabbitMQ所在的服務器)
- publisher returnCallback未投遞到queue退出模式(觸發時機:RabbitmQ所在的服務器調用交換機投遞給對應隊列)
- consumer ack機制(觸發機制:消費端成功獲取到消息隊列的消息)
可靠抵達-服務端確認(confirmCallback 、returnCallback )
開啟發送端確認
- NONE:禁用發布確認模式,默認模式
- CORRELATED:發布消息成功到交換器后會觸發回調方法
- SIMPLE
開啟發送端消息抵達隊列的確認
# 開啟發送端消息抵達隊列的確認 spring.rabbitmq.publisher-returns=true # 只要發送端消息抵達隊列,以異步方式優先回調這個returnConfirm(綁定一起使用) spring.rabbitmq.template.mandatory=true定制RabbitTemplate自定義confirmCallback 、returnCallback 觸發方法
/*** 定制RabbitTemplate* 1. MQ服務器收到消息就回調* 1. spring.rabbitmq.publisher-confirms=true* 2. 設置回調確認confirmCallback * 2. 消息正確抵達隊列進行回調* 1. spring.rabbitmq.publisher-returns=true* 2. spring.rabbitmq.template.mandatory=true* 3. 設置回調確認returnCallback */// PostConstruct: 當MyRabbitConfig對象創建完再執行該方法@PostConstructpublic void initRabbitTemplate() {// 設置MQ服務器收到消息回調rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** 只要消息抵達MQ服務器ack就為true* @param correlationData:當前消息的唯一關聯數據(這個是消息的唯一id)即發送時傳的CorrelationData參數* @param b:ack,消息是否成功還是失敗* @param s:失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("correlationData: " + correlationData);System.out.println("ack: " + b);System.out.println("s: " + s);}});// 設置消息抵達隊列回調rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息沒有投遞給指定的隊列,就觸發這個失敗回調* @param message:投遞失敗的消息詳細信息* @param i:回復的狀態碼* @param s:回復的文本內容* @param s1:當時這個消息發送給哪個交換機* @param s2:當時這個消息發送給哪個路由鍵*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("fail message: " + message);System.out.println("i: " + i);System.out.println("s: " + s);System.out.println("s1: " + s1);System.out.println("s2: " + s2);}});}可靠抵達-消費端確認(ack)
保證每個消息被正確消費,此時才可以MQ刪除這個消息
RabbitMQ延時隊列
概念
延時隊列,隊列內部是有序的,最重要的特性就體現在它的延時屬性上,延時隊列中的元素是希望在指定時間到了以后或之前取出和處理,簡單來說,延時隊列就是用來存放需要在指定時間被處理的元素的隊列。
使用場景
- 訂單在十分鐘之內未支付則自動取消
- 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送消息提醒。
- 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒。
- 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員。
- 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議
延時TTL
TTL 是 RabbitMQ 中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有 消息的最大存活時間,單位是毫秒。換句話說,如果一條消息設置了 TTL 屬性或者進入了設置TTL 屬性的隊列,那么這 條消息如果在TTL 設置的時間內沒有被消費,則會成為"死信"。如果同時配置了隊列的TTL 和消息的 TTL,那么較小的那個值將會被使用,有兩種方式設置 TTL。
消息設置TTL
針對每條消息設置TTL
rabbitTemplate.convertAndSend("exchange", "route-key", "消息", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 設置消息10秒過期message.getMessageProperties().setExpiration("10000");return message;} });隊列設置TTL
在創建隊列的時候設置隊列的x-message-ttl屬性
@Bean("queue") public Queue queueB() {Map<String, Object> args = new HashMap<>(3);//聲明隊列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable("queue").withArguments(args).build(); }兩者的區別
如果設置了隊列的TTL 屬性,那么一旦消息過期,就會被隊列丟棄(如果配置了死信隊列被丟到死信隊列中),而第二種方式,消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間;另外,還需 要注意的一點是,如果不設置 TTL,表示消息永遠不會過期,如果將 TTL 設置為 0,則表示除非此時可以 直接投遞該消息到消費者,否則該消息將會被丟棄。
延時隊列實戰
創建兩個隊列 QA 和 QB,兩者隊列 TTL 分別設置為 10S 和 40S,然后在創建一個交換機 X 和死信交 換機 Y,它們的類型都是direct,創建一個死信隊列QD,它們的綁定關系如下:
延時隊列架構代碼
@Configuration public class TtlQueueConfig {// 普通交換機名稱public static final String X_EXCHANGE = "X";// 死信交換機名稱public static final String Y_DEAD_LETTER_EXCHANGE = "Y";// 普通隊列名稱public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";// 死信隊列名稱public static final String DEAD_LETTER_QUEUE = "QD";// 通用隊列名稱public static final String QUEUE_C = "QC";// 聲明 xExchange@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}// 聲明 yExchange@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//聲明隊列 A ttl 為 10s 并綁定到對應的死信交換機@Bean("queueA")public Queue queueA() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//聲明隊列的 TTLargs.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}// 聲明隊列 A 綁定 X 交換機@Beanpublic Binding queueaBindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//聲明隊列 B ttl 為 40s 并綁定到對應的死信交換機@Bean("queueB")public Queue queueB() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//聲明隊列的 TTLargs.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}//聲明隊列 B 綁定 X 交換機@Beanpublic Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queue1B).to(xExchange).with("XB");}//聲明死信隊列 QD@Bean("queueD")public Queue queueD() {return new Queue(DEAD_LETTER_QUEUE);}//聲明死信隊列 QD 綁定關系@Beanpublic Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}//聲明隊列 C 死信交換機@Bean("queueC")public Queue queueC() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "YD");//沒有聲明 TTL 屬性return QueueBuilder.durable(QUEUE_C).withArguments(args).build();}//聲明隊列 C 綁定 X 交換機@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}}生產者代碼
// 發送延時消息 @GetMapping("sendMsg/{msg}") public R sendMsg(@PathVariable("msg") String msg) {rabbitTemplate.convertAndSend("X", "XA", "消息為10秒" + msg);rabbitTemplate.convertAndSend("X", "XB", "消息為40秒" + msg);return R.ok(); }// 發送指定延時時間的消息 @GetMapping("sendTtlMsg/{ttl}/{msg}") public R sendTtlMsg(@PathVariable("ttl") String ttl, @PathVariable("msg") String msg) {rabbitTemplate.convertAndSend("X", "XC", msg, message -> {// 發送消息時候的延時時長message.getMessageProperties().setExpiration(ttl);return message;});return R.ok(); }消費者代碼
// 監聽延時隊列 @RabbitListener(queues = {"QD"}) public void receiveD(Message message, String content, Channel channel) {System.out.println("接受消息: " + content); }實現效果
第一條消息在10S后變成了死信消息然后被消費者消費掉,第二條消息在40S之后變成了死信消息然后被消費掉,這樣一個延時隊列就打造完成了。
RabbitMQ死信隊列
死信概念
死信,顧名思義就是無法被消費的消息,一般來說,producer將消息投遞到broker或者直接到queue 里了,consumer 從 queue 取出消息 進行消費,但某些時候由于特定的原因導致queue中的某些消息無法被消費,這樣的消息如果沒有 后續的處理,就變成了死信,有死信自然就有了死信隊列。
應用場景:為了保證訂單業務的消息數據不丟失,需要使用到RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中,還有比如說用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效
死信的來源
消息 TTL 過期
隊列達到最大長度(隊列滿了,無法再添加數據到 mq 中)
消息被拒絕(basic.reject 或 basic.nack)并且 requeue=false
死信隊列實戰
死信架構代碼
@Configuration public class DeadQueueConfig {//普通交換機名稱private static final String NORMAL_EXCHANGE = "normal_exchange";//死信交換機名稱private static final String DEAD_EXCHANGE = "dead_exchange";// 聲明死信交換機@Bean(DEAD_EXCHANGE)public DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE);}// 聲明死信隊列@Bean("dead_queue")public Queue deadQueue() {return new Queue("dead_queue");}// 聲明死信隊列與死信交換機的綁定關系@Bean("deadBinding")public Binding deadBinding(@Qualifier("dead_queue") Queue deadQueue,@Qualifier(DEAD_EXCHANGE) DirectExchange deadExchange) {return BindingBuilder.bind(deadQueue).to(deadExchange).with("lisi");}// 聲明普通交換機@Bean(NORMAL_EXCHANGE)public DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}// 聲明普通隊列@Bean("normal_queue")public Queue normalQueue() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "lisi");return QueueBuilder.durable("normal_queue").withArguments(args).build();}// 聲明普通隊列與普通交換機的綁定關系@Bean("normalBinding")public Binding normalBinding(@Qualifier("normal_queue") Queue normalQueue,@Qualifier(NORMAL_EXCHANGE) DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("zhansan");}}生產者代碼,設置消息過期TTL
@GetMapping("product") public R product() {rabbitTemplate.convertAndSend("normal_exchange", "zhansan", "消息", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 設置消息10秒過期message.getMessageProperties().setExpiration("10000");return message;}});return R.ok(); }消費者代碼(關閉正常接收隊列,模擬接收不到消息,進入死信)
// 正常接受消息隊列 // @RabbitListener(queues = {"normal_queue"}) // public void consumer(String content) { // System.out.println("正常隊列接受消息:" + content); // }// 死信接受消息隊列@RabbitListener(queues = {"dead_queue"})public void dead(String content) {System.out.println("死信隊列接受消息:" + content);}消費者代碼(模擬隊列達到最大長度,進入死信)
args.put("x-max-length", );
// 聲明普通隊列 @Bean("normal_queue") public Queue normalQueue() {Map<String, Object> args = new HashMap<>(3);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//聲明當前隊列的死信路由 keyargs.put("x-dead-letter-routing-key", "lisi");// 設置隊列長度args.put("x-max-length", 6);return QueueBuilder.durable("normal_queue").withArguments(args).build(); }消費者代碼(模擬消息被拒,進入死信)
總結
以上是生活随笔為你收集整理的中间件-RabbitMQ学习笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 115道Java面试题及答案分享,jav
- 下一篇: Toast拓展--自定义显示时间和动画