日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ详解及集群搭建

發布時間:2023/12/31 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ详解及集群搭建 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.什么是RabbitMQ

1.1 MQ(Message Queue)消息隊列

  • 消息隊列中間件,是分布式系統中的重要組件
  • 主要解決,異步處理,應用解耦,流量削峰等問題
  • 從而實現高性能,高可用,可伸縮和最終一致性的架構
  • 使用較多的消息隊列產品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等

1.1.1 異步處理

  • 用戶注冊后,需要發送驗證郵箱和手機驗證碼;
  • 將注冊信息寫入數據庫,發送驗證郵件,發送手機,三個步驟全部完成后,返回給客戶端

1.1.2 應用解耦

  • ?場景:訂單系統需要通知庫存系統
  • 如果庫存系統異常,則訂單調用庫存失敗,導致下單失敗
    ? ? ? ?原因:訂單系統和庫存系統耦合度太高

  • ?訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶,下單成功;
  • 庫存系統:訂閱下單的消息,獲取下單信息,庫存系統根據下單信息,再進行庫存操作;
  • 假如:下單的時候,庫存系統不能正常運行,也不會影響下單,因為下單后,訂單系統寫入消息隊 列就不再關心其他的后續操作了,實現了訂單系統和庫存系統的應用解耦;
  • 所以說,消息隊列是典型的:生產者消費者模型
  • 生產者不斷的向消息隊列中生產消息,消費者不斷的從隊列中獲取消息
  • 因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的入侵,這樣就 實現了生產者和消費者的解耦

1.1.3 流量削峰

  • 搶購,秒殺等業務,針對高并發的場景
  • 因為流量過大,暴增會導致應用掛掉,為解決這個問題,在前端加入消息隊列

  • ?用戶的請求,服務器接收后,首先寫入消息隊列,如果超過隊列的長度,就拋棄,甩一個秒殺結束 的頁面!
  • 說白了,秒殺成功的就是進入隊列的用戶;

1.2 背景知識介紹

1.2.1 AMQP高級消息隊列協議

  • Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議
  • 協議:數據在傳輸的過程中必須要遵守的規則
  • 基于此協議的客戶端可以與消息中間件傳遞消息
  • 并不受產品、開發語言等條件的限制

1.2.2 JMS

  • Java Message Server,Java消息服務應用程序接口, 一種規范,和JDBC擔任的角色類似
  • 是一個Java平臺中關于面向消息中間件的API,用于在兩個應用程序之間,或分布式系統中發送消 息,進行異步通信

1.2.3 二者的聯系

  • JMS是定義了統一接口,統一消息操作;AMQP通過協議統一數據交互格式
  • JMS必須是java語言;AMQP只是協議,與語言無關

1.2.4 Erlang語言

  • Erlang(['?:l??])是一種通用的面向并發的編程語言,它由瑞典電信設備制造商愛立信所轄的CSLab開發,目的是創造一種可以應對大規模并發活動的編程語言和運行環境
  • 最初是由愛立信專門為通信應用設計的,比如控制交換機或者變換協議等,因此非常適合構建分布 式,實時軟并行計算系統
  • Erlang運行時環境是一個虛擬機,有點像Java的虛擬機,這樣代碼一經編譯,同樣可以隨處運行

1.3 為什么選擇RabbitMQ

  • 我們開篇說消息隊列產品那么多,為什么偏偏選擇RabbitMQ呢?
  • 先看命名:兔子行動非常迅速而且繁殖起來也非常瘋狂,所以就把Rabbit用作這個分布式軟件的 命名(就是這么簡單)
  • Erlang開發,AMQP的最佳搭檔,安裝部署簡單,上手門檻低
  • 企業級消息隊列,經過大量實踐考驗的高可靠,大量成功的應用案例,例如阿里、網易等一線大廠 都有使用
  • 有強大的WEB管理頁面
  • 強大的社區支持,為技術進步提供動力
  • 支持消息持久化、支持消息確認機制、靈活的任務分發機制等,支持功能非常豐富
  • 集群擴展很容易,并且可以通過增加節點實現成倍的性能提升
  • 總結:如果你希望使用一個可靠性高、功能強大、易于管理的消息隊列系統那么就選擇 RabbitMQ,如果你想用一個性能高,但偶爾丟點數據不是很在乎可以使用kafka或者zeroMQ
  • kafka和zeroMQ的性能爆表,絕對可以壓RabbitMQ一頭!

1.4 RabbitMQ各組件功能

Broker:消息隊列服務器實體

Virtual Host:虛擬主機

  • 標識一批交換機、消息隊列和相關對象,形成的整體
  • 虛擬主機是共享相同的身份認證和加密環境的獨立服務器域
  • 每個vhost本質上就是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定和權 限機制
  • vhost是AMQP概念的基礎,RabbitMQ默認的vhost是 /,必須在鏈接時指定

Exchange:交換器(路由)

  • 用來接收生產者發送的消息并將這些消息路由給服務器中的隊列

Queue:消息隊列

  • 用來保存消息直到發送給消費者。
  • 它是消息的容器,也是消息的終點。
  • 一個消息可投入一個或多個隊列。
  • 消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

Banding:綁定,用于消息隊列和交換機之間的關聯。

Channel:通道(信道)

  • 多路復用連接中的一條獨立的雙向數據流通道。
  • 信道是建立在真實的TCP連接內的 虛擬鏈接
  • AMQP命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,都是通過信 道完成的
  • 因為對于操作系統來說,建立和銷毀TCP連接都是非常昂貴的開銷,所以引入了信道的概 念,用來復用TCP連接。

Connection:網絡連接,比如一個TCP連接。

Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序。

Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

Message:消息

  • 消息是不具名的,它是由消息頭和消息體組成。
  • 消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括routing-key(路由 鍵)、priority(優先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。

2.怎么用RabbitMQ

  • 想要安裝RabbitMQ,必須先安裝erlang語言環境,類似安裝tomcat,必須先安裝JDK
  • 查看匹配的版本:https://www.rabbitmq.com/which-erlang.html

?2.1 RabbitMQ安裝啟動

erlang下載:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang

socat 下載:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

RabbitMQ 下載:https://www.rabbitmq.com/install-rpm.html#downloads

2.1.1 安裝

[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm [root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm [root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm

2.1.2 啟動后臺管理插件

[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management

2.1.3 啟動RabbitMQ

[root@localhost opt]# systemctl start rabbitmq-server.service [root@localhost opt]# systemctl status rabbitmq-server.service [root@localhost opt]# systemctl restart rabbitmq-server.service [root@localhost opt]# systemctl stop rabbitmq-server.service

2.1.4 查看進程

[root@localhost opt]# ps -ef | grep rabbitmq

2.1.5 測試

1. 關閉防火墻: systemctl stop firewalld

2. 瀏覽器輸入:http://ip:15672

3. 默認帳號密碼:guest,guest用戶默認不允許遠程連接

? ? ? ? 1.創建賬號

[root@localhost opt]# rabbitmqctl add_user panghl panghl

????????2. 設置用戶角色

[root@localhost opt]# rabbitmqctl set_user_tags panghl administrator

????????3. 設置用戶權限

[root@localhost opt]# rabbitmqctl set_permissions -p "/" panghl ".*" ".*" ".*"

????????4. 查看當前用戶和角色

[root@localhost opt]# rabbitmqctl list_users

????????5. 修改用戶的密碼

[root@localhost opt]# rabbitmqctl change_password panghl panghl

管理界面介紹

overview:概覽

connections:查看鏈接情況

channels:信道(通道)情況

Exchanges:交換機(路由)情況,默認4類7個

Queues:消息隊列情況

Admin:管理員列表

端口:

  • 5672:RabbitMQ提供給編程語言客戶端鏈接的端口
  • 15672:RabbitMQ管理界面的端口
  • 25672:RabbitMQ集群的端口

2.2 RabbitMQ快速入門

2.2.1 依賴

<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency></dependencies>

2.2.2 日志依賴log4j(可選項)

log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rebbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n log4j.rootLogger=debug, stdout,file

2.2.2 創建連接

先創建好虛擬主機

package util;import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;/*** @Author panghl* @Date 2021/8/8 12:26* @Description 專門與RabbitMQ獲得連接**/ public class ConnectionUtil {public static Connection getConnection() throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory factory = new ConnectionFactory();//2.在工廠對象中設置MQ的連接信息(ip,port,vhost,username,password)factory.setHost("192.168.40.100");factory.setPort(5672);factory.setVirtualHost("/lagou");factory.setUsername("panghl");factory.setPassword("panghl");//3.通過工廠獲得與MQ的連接Connection connection = factory.newConnection();return connection;}public static void main(String[] args) throws IOException, TimeoutException {Connection connection = getConnection();System.out.println("connection:->"+connection);//amqp://panghl@192.168.40.100:5672//lagouconnection.close();} }

2.3 RabbitMQ模式

RabbitMQ提供了6種消息模型,但是第6種其實是RPC,并不是MQ,因此我們只學習前5種

在線手冊:https://www.rabbitmq.com/getstarted.html

5種消息模型,大體分為兩類:

????????1和2屬于點對點

????????3、4、5屬于發布訂閱模式(一對多)

點對點模式:P2P(point to point)模式包含三個角色:

????????消息隊列(queue),發送者(sender),接收者(receiver)

????????每個消息發送到一個特定的隊列中,接收者從中獲得消息

????????隊列中保留這些消息,直到他們被消費或超時

????????特點:

????????????????1. 每個消息只有一個消費者,一旦消費,消息就不在隊列中了

????????????????2. 發送者和接收者之間沒有依賴性,發送者發送完成,不管接收者是否運行,都不會?影?響 消息發送到隊列中(我給你發微信,不管你看不看手機,反正我發完了)

????????????????3. 接收者成功接收消息之后需向對象應答成功(確認)

? ? ? ? ? ? ? ? 4.?如果希望發送的每個消息都會被成功處理,那需要P2P

發布訂閱模式:publish(Pub)/subscribe(Sub)

  • pub/sub模式包含三個角色:交換機(exchange),發布者(publisher),訂閱者 (subcriber)
  • 多個發布者將消息發送交換機,系統將這些消息傳遞給多個訂閱者
  • 特點:
    1.每個消息可以有多個訂閱者
    2.發布者和訂閱者之間在時間上有依賴,對于某個交換機的訂閱者,必須創建一個訂閱 后,才能消費發布者的消息
    3.為了消費消息,訂閱者必須保持運行狀態;類似于,看電視直播。
  • ?如果希望發送的消息被多個消費者處理,可采用本模式

2.3.1 簡單模式

RabbitMQ本身只是接收,存儲和轉發消息,并不會對信息進行處理!

類似郵局,處理信件的應該是收件人而不是郵局!

?2.3.1.1 生產者P

/*** @Author panghl* @Date 2021/8/10 21:32* @Description 簡單模式-生產者P**/ public class MessageSender {public static void main(String[] args) throws IOException, TimeoutException {String msg = "panghl: Hello Java Rab";//1.獲得連接Connection connection = ConnectionUtil.getConnection();//2.在連接中創建通道(信道)Channel channel = connection.createChannel();//3.創建消息隊列(1,2,3,4,5)/** 參數1:隊列的名稱* 參數2:隊列中的數據是否持久化* 參數3:是否排外(是否支持擴展,當前隊列只能自己用,不能給別人用)* 參數4:是否自動刪除(當隊列的連接數為0時,隊列會銷毀,不管隊列是否還保存數據)* 參數5:隊列參數(沒有參數為null)*/channel.queueDeclare("queue1",false,false,false,null);//4.向指定的隊列發送消息/** 參數1:交換機名稱,當前是簡單模式,也就是P2P模式,沒有交換機,所以名稱為""* 參數2: 目標隊列的名稱* 參數3:設置消息的屬性(沒有屬性則為null)* 參數4:消息的內容 (只接受字節數組)*/channel.basicPublish("","queue1",null,msg.getBytes());//5.釋放資源channel.close();connection.close();} }

2.3.1.2 消費者C

/*** @Author panghl* @Date 2021/8/10 21:32* @Description 簡單模式-接收者P**/ public class MessageRecer {public static void main(String[] args) throws IOException, TimeoutException {//1.獲得連接Connection connection = ConnectionUtil.getConnection();//2.獲得通道(信道)Channel channel = connection.createChannel();//3.從信道中獲得消息(1,2,3,4,5)DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Override //交付處理(收件人信息,包裹上的快遞標簽,協議的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是從隊列中獲取的消息String s = new String(body);System.out.println("接收 = " + s);//todo.... 業務代碼//手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(),false);}};//4.監聽隊列 true:自動消息確認 自動ACKchannel.basicConsume("queue1", false, defaultConsumer);} }

啟動消費者,前往管理端查看隊列中的信息,所有信息都已經處理和確認,顯示0

2.3.1.3 消息確認機制ACK

  • 通過剛才的案例可以看出,消息一旦被消費,消息就會立刻從隊列中移除
  • RabbitMQ如何得知消息被消費者接收?
    1、如果消費者接收消息后,還沒執行操作就拋異常宕機導致消費失敗,但是RabbitMQ無從得 知,這樣消息就丟失了
    2、因此,RabbitMQ有一個ACK機制,當消費者獲取消息后,會向RabbitMQ發送回執ACK,告 知消息已經被接收
    3、ACK:(Acknowledge character)即是確認字符,在數據通信中,接收站發給發送站的一種 傳輸類控制字符。表示發來的數據已確認接收無誤我們在使用http請求時,http的狀態碼200 就是告訴我們服務器執行成功
    4、整個過程就想快遞員將包裹送到你手里,并且需要你的簽字,并拍照回執
    5、不過這種回執ACK分為兩種情況:
    自動ACK:消息接收后,消費者立刻自動發送ACK(快遞放在快遞柜)
    手動ACK:消息接收后,不會發送ACK,需要手動調用(快遞必須本人簽收)
    6、兩種情況如何選擇,需要看消息的重要性:
    如果消息不太重要,丟失也沒有影響,自動ACK會比較方便
    如果消息非常重要,最好消費完成手動ACK,如果自動ACK消費后,RabbitMQ就會把 消息從隊列中刪除,如果此時消費者拋異常宕機,那么消息就永久丟失了
    ?
  • 修改手動消息確認 // false:手動消息確認channel.basicConsume("queue1", false, consumer);
  • 結果如下:
  • 解決問題
    ? /*** @Author panghl* @Date 2021/8/10 21:32* @Description 簡單模式-接收者P**/ public class MessageRecer {public static void main(String[] args) throws IOException, TimeoutException {//1.獲得連接Connection connection = ConnectionUtil.getConnection();//2.獲得通道(信道)Channel channel = connection.createChannel();//3.從信道中獲得消息(1,2,3,4,5)DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Override //交付處理(收件人信息,包裹上的快遞標簽,協議的配置,消息)public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//body就是從隊列中獲取的消息String s = new String(body);System.out.println("接收 = " + s);//todo.... 業務代碼//手動確認(收件人信息,是否同時確認多個消息)channel.basicAck(envelope.getDeliveryTag(),false);}};//4.監聽隊列 true:自動消息確認 自動ACKchannel.basicConsume("queue1", false, defaultConsumer);} }

?2.3.2 工作隊列模式

  • 之前我們學習的簡單模式,一個消費者來處理消息,如果生產者生產消息過快過多,而消費者的能 力有限,就會產生消息在隊列中堆積(生活中的滯銷)
    ?
  • 一個燒烤師傅,一次烤50支羊肉串,就一個人吃的話,烤好的肉串會越來越多,怎么處理?
  • 多招攬客人進行消費即可。當我們運行許多消費者程序時,消息隊列中的任務會被眾多消費者共 享,但其中某一個消息只會被一個消費者獲取(100支肉串20個人吃,但是其中的某支肉串只能被 一個人吃)

2.3.2.1 生產者P

/*** @Author panghl* @Date 2021/8/10 22:06* @Description 工作隊列--發送者**/ public class Sender {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("test_work_queue", false, false, false, null);for (int i = 1; i <= 100; i++) {String msg = "羊肉串--->" + i;channel.basicPublish("", "test_work_queue", null, msg.getBytes());System.out.println("新鮮出爐:--》" + msg);}channel.close();connection.close();} }

2.3.2.2 消費者1

/*** @Author panghl* @Date 2021/8/10 22:10* @Description 工作隊列-消費者**/ public class Recer {static int i = 1;public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare("test_work_queue", false, false, false, null);//queueDeclare() 此方法有雙重作用,如果隊列不存在,再創建;如果隊列存在,則獲取channel.basicQos(1);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【顧客1】吃掉"+s+"!總共吃【"+i+++"】串!");//模擬網絡延遲try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("test_work_queue",false,defaultConsumer);} }

2.3.2.3 消費者2

/*** @Author panghl* @Date 2021/8/10 22:10* @Description 工作隊列-消費者**/ public class Recer2 {static int i = 1;public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare("test_work_queue", false, false, false, null);//queueDeclare() 此方法有雙重作用,如果隊列不存在,再創建;如果隊列存在,則獲取channel.basicQos(2);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【顧客2】吃掉"+s+"!總共吃【"+i+++"】串!");//模擬網絡延遲try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume("test_work_queue",false,defaultConsumer);} }
  • 先運行2個消費者,排隊等候消費(取餐),再運行生產者開始生產消息(烤肉串)
  • 雖然兩個消費者的消費速度不一致(線程休眠時間),但是消費的數量卻是一致的,各消費50個 消息
    ? ? ? ?例如:工作中,A同學編碼速率高,B同學編碼速率低,兩個人同時開發一個項目,A10天完 成,B30天完成,A完成自己的編碼部分,就無所事事了,等著B完成就可以了,這樣是不可 以的,應該遵循“能者多勞”;效率高的多干點,效率低的少干點
  • 看下面官網是如何給出解決思路的:


?

公平的分配

您可能已經注意到分派仍然不能完全按照我們的要求工作。例如,如果有兩個員工,當所有 奇怪的消息都很重,甚至消息都很輕時,一個員工會一直很忙,而另一個人幾乎什么工作都 不做。好吧,RabbitMQ對此一無所知,它仍然會均勻地分派消息。

????????這是因為RabbitMQ只在消息進入隊列時發送消息。它不查看用戶未確認消息的數量。它 只是盲目地將每條第n個消息分派給第n個消費者。

????????為了克服這個問題,我們可以使用設置為prefetchCount = 1的basicQos方法。這告訴 RabbitMQ一次不要給一個worker發送一條以上的消息。或者,換句話說,在worker處理并 確認前一個消息之前,不要向它發送新消息。相反,它將把它分派到下一個不繁忙的 worker。

// 聲明隊列(此處為消費者,不是聲明創建隊列,而且獲取,二者代碼相同)出餐口排隊 channel.queueDeclare("test_work_queue",false,false,false,null); // 可以理解為:快遞一個一個送,送完一個再送下一個,速度快的送件就多 channel.basicQos(1);

能者多勞必須要配合手動的ACK機制才生效

2.3.2.4 面試題:避免消息堆積?

1. workqueue,多個消費者監聽同一個隊列

2. 接收到消息后,通過線程池,異步消費

2.3.3 發布訂閱模式

Publish/Subscribe

????????In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

????????To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them. ????????In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen. ????????Essentially, published log messages are going to be broadcast to all the receivers.

發布-訂閱

????????在上一篇教程中,我們創建了一個工作隊列。工作隊列背后的假設是,每個任務都被準確地交 付給一個工作者。在這一部分中,我們將做一些完全不同的事情——將消息傳遞給多個消費者。 此模式稱為“發布/訂閱”。

????????為了演示這個模式,我們將構建一個簡單的日志記錄系統。它將由兩個程序組成——第一個將 發送日志消息,第二個將接收和打印它們。

????????在我們的日志系統中,接收程序的每一個正在運行的副本都將獲得消息。這樣我們就可以運行 一個接收器并將日志指向磁盤;與此同時,我們可以運行另一個接收器并在屏幕上看到日志。

????????基本上,發布的日志消息將廣播到所有接收方。

生活中的案例:就是玩抖音快手,眾多粉絲關注一個視頻主,視頻主發布視頻,所有粉絲都可以得到視 頻通知

  • ?上圖中,X就是視頻主,紅色的隊列就是粉絲。binding是綁定的意思(關注)
  • P生產者發送信息給X路由,X將信息轉發給綁定X的隊列

  • ?X隊列將信息通過信道發送給消費者,從而進行消費
  • 整個過程,必須先創建路由
    1、路由在生產者程序中創建
    2、因為路由沒有存儲消息的能力,當生產者將信息發送給路由后,消費者還沒有運行,所以沒 有隊列,路由并不知道將信息發送給誰
    3、運行程序的順序: 1. MessageSender 2. MessageReceiver1和MessageReceiver2 3. MessageSender

2.3.3.1 生產者

/*** @Author panghl* @Date 2021/8/10 22:37* @Description 發布訂閱-生產者**/ public class Sender {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明路由(路由名,路由類型)// fanout: 不處理路由鍵(只需要將隊列綁定到路由上,發送到路由的消息都會被轉發到該路由綁定的所有隊列上)channel.exchangeDeclare("test_exchange_fanout", "fanout");String msg = "hello,panghl!!";// 不要寫隊列的名稱,此時你并不知道 誰會綁定路由channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());System.out.println("生產者:=》" + msg);channel.close();connection.close();} }

2.3.3.2 消費者1

/*** @Author panghl* @Date 2021/8/10 22:43* @Description 發布訂閱-消費者1**/ public class Recer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);//綁定路由(隊列名,路由名, routing key)channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout","");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【粉絲1】==>"+s);}};channel.basicConsume("test_exchange_fanout_queue_1",true,consumer);} }

?2.3.3.3 消費者2

/*** @Author panghl* @Date 2021/8/10 22:43* @Description 發布訂閱-消費者2**/ public class Recer2 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_fanout_queue_2",false,false,false,null);//綁定路由(隊列名,路由名,)channel.queueBind("test_exchange_fanout_queue_2","test_exchange_fanout","");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【粉絲2】==>"+s);}};channel.basicConsume("test_exchange_fanout_queue_2",true,consumer);} }

2.3.4 路由模式

?2.3.4.1 生產者

/*** @Author panghl* @Date 2021/8/10 23:19* @Description 路由模式-定向分發-生產者**/ public class Sender {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明路由(路由名,路由類型)channel.exchangeDeclare("test_exchange_direct", "direct");// String msg = "用戶注冊,[userid=1]";String msg = "用戶查詢,[userid=1]"; // channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());channel.basicPublish("test_exchange_direct", "select", null, msg.getBytes());System.out.println("【用戶系統】:"+msg);channel.close();connection.close();} }

2.3.4.2 消費者1

/*** @Author panghl* @Date 2021/8/10 23:22* @Description 路由模式-定向分發-消費者1**/ public class Recer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null);//綁定路由channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("[消費者1]="+s);}};channel.basicConsume("test_exchange_direct_queue_2",true,consumer);} }

2.3.4.3 消費者2

/*** @Author panghl* @Date 2021/8/10 23:22* @Description 路由模式-定向分發-消費者2**/ public class Recer2 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null);//綁定路由(如果路由鍵的類型是 添加、刪除、修改的話,綁定到這個隊列上)channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert");channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update");channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("[消費者1]="+s);}};channel.basicConsume("test_exchange_direct_queue_1",true,consumer);} }

1. 記住運行程序的順序,先運行一次sender(創建路由器),

2. 有了路由器之后,在創建兩個Recer1和Recer2,進行隊列綁定

3. 再次運行sender,發出消息

2.3.5 通配符模式

?和路由模式90%是一樣的。

唯獨的區別就是路由鍵支持模糊匹配

匹配符號

????????*:只能匹配一個詞(正好一個詞,多一個不行,少一個也不行)

????????#:匹配0個或更多個詞

看一下官網案例:

????????Q1綁定了路由鍵 *.orange.*? ? ? ? ?Q2綁定了路由鍵 *.*.rabbit 和 lazy.#

????????下面生產者的消息會被發送給哪個隊列?


quick.orange.rabbit? ? ? ? ? ? ? ? ? ? ?# Q1 Q2

lazy.orange.elephant? ? ? ? ? ? ? ? ? # Q1 Q2

quick.orange.fox? ? ? ? ? ? ? ? ? # Q1

lazy.brown.fox? ? ? ? ? ? ? ? ? ? ? # Q2

lazy.pink.rabbit? ? ? ? ? ? ? ? ? ? # Q2

quick.brown.fox ? ? ? ? ? ? ? ? ?# 無

orange ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # 無

quick.orange.male.rabbit? ?# 無

2.3.5.1 生產者

/*** @Author panghl* @Date 2021/8/10 23:32* @Description 通配符模式-生產者**/ public class Sender {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//topic : 模糊匹配的定向分發channel.exchangeDeclare("test_exchange_topic","topic",true);String msg = "hello,msg - topic"; // channel.basicPublish("test_exchange_topic","user.insert",null,msg.getBytes());channel.basicPublish("test_exchange_topic","product.down", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());System.out.println("[消息中心-->]"+msg);channel.close();connection.close();} }

2.3.5.2 消費者1

/*** @Author panghl* @Date 2021/8/10 23:41* @Description 通配符模式-消費者**/ public class Recer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明隊列 true 隊列持久化channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);//綁定路由(隊列名,路由名,路由key) 綁定用戶相關的消息channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【消費者1】==>"+s);}};channel.basicConsume("test_exchange_topic_queue_1",true,consumer);} }

2.3.5.3 消費者2

/*** @Author panghl* @Date 2021/8/10 23:41* @Description 通配符模式-消費者**/ public class Recer2 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明隊列channel.queueDeclare("test_exchange_topic_queue_2",true,false,false,null);//綁定路由(隊列名,路由名,路由key) 綁定商品和訂單相關的消息channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#");channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【消費者2】==>"+s);}};channel.basicConsume("test_exchange_topic_queue_2",true,consumer);} }

2.4 持久化

  • 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丟失?

    ? ? ? ? 1、消費者的ACK確認機制,可以防止消費者丟失消息

    ? ? ? ? 2、萬一在消費者消費之前,RabbitMQ服務器宕機了,那消息也會丟失

  • 想要將消息持久化,那么 路由和隊列都要持久化 才可以

2.4.1 生產者

/*** @Author panghl* @Date 2021/8/10 23:32* @Description 通配符模式-生產者**/ public class Sender {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//topic : 模糊匹配的定向分發channel.exchangeDeclare("test_exchange_topic","topic",true);String msg = "hello,msg - topic"; // channel.basicPublish("test_exchange_topic","user.insert",null,msg.getBytes());channel.basicPublish("test_exchange_topic","product.down", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());System.out.println("[消息中心-->]"+msg);channel.close();connection.close();} }

2.4.2 消費者

/*** @Author panghl* @Date 2021/8/10 23:41* @Description 通配符模式-消費者**/ public class Recer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明隊列 true 隊列持久化channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);//綁定路由(隊列名,路由名,路由key) 綁定用戶相關的消息channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【消費者1】==>"+s);}};channel.basicConsume("test_exchange_topic_queue_1",true,consumer);} }

2.4 Spring整合RabbitMQ

  • 五種消息模型,在企業中應用最廣泛的就是最后一種:定向匹配topic
  • Spring AMQP 是基于 Spring 框架的AMQP消息解決方案,提供模板化的發送和接收消息的抽象 層,提供基于消息驅動的 POJO的消息監聽等,簡化了我們對于RabbitMQ相關程序的開發。

2.4.1 生產端工程

1、依賴

<dependencies><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>compile</scope></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.9</version></dependency></dependencies>

2、spring-rabbitmq-producer.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!-- 1.配置連接 --> <rabbit:connection-factory id="connectionFactory" host="192.168.40.100" port="5672" username="panghl" password="panghl" virtual-host="/lagou" /> <!-- 2.配置隊列 --> <rabbit:queue name="test_spring_queue_1"/> <!-- 3.配置rabbitAdmin:主要用于在Java代碼中對理隊和隊列進行管理,用于創建、綁定、刪 除隊列與交換機,發送消息等 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 4.配置topic類型exchange;隊列綁定到交換機 --> <rabbit:topic-exchange name="spring_topic_exchange"> <rabbit:bindings> <rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- 5. 配置消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/ > <!-- 6. 配置RabbitTemplate(消息生產者) --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter" /></beans>

3、發消息

/*** @Author panghl* @Date 2021/8/11 21:15* @Description 生產者**/ public class Sender {public static void main(String[] args) {//1.創建spring容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");//2.從容器中獲得rabbit模板對象RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);//3.發消息Map<String, String> map = new HashMap<>();map.put("name", "erguo2");map.put("email", "phl0425@qq2.com");// rabbitTemplate.convertAndSend("lalalal","msg.user",map); // for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("msg.user", map);System.out.println("消息已經發布。。。"); // }context.close();} }

2.4.2 消費端工程

1、依賴與生產者一致

2、spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><!-- 1. 配置連接 --> <rabbit:connection-factory id="connectionFactory" host="192.168.40.100" port="5672" username="panghl" password="panghl" virtual-host="/lagou" /> <!-- 2. 配置隊列 --> <rabbit:queue name="test_spring_queue_1"/> <!-- 3.配置rabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 4.springIOC注解掃描包--> <context:component-scan base-package="listener"/> <!-- 5.配置監聽 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="consumerListener" queuenames="test_spring_queue_1" /> </rabbit:listener-container></beans>

消費者

????????MessageListener接口用于spring容器接收到消息后處理消息

????????如果需要使用自己定義的類型 來實現 處理消息時,必須實現該接口,并重寫onMessage()方 法

????????當spring容器接收消息后,會自動交由onMessage進行處理

@Component public class ConsumerListener implements MessageListener { // jackson提供序列化和反序列中使用最多的類,用來轉換json的 private static final ObjectMapper MAPPER = new ObjectMapper(); public void onMessage(Message message) { try { // 將message對象轉換成json JsonNode jsonNode = MAPPER.readTree(message.getBody()); String name = jsonNode.get("name").asText(); String email = jsonNode.get("email").asText(); System.out.println("從隊列中獲取:【"+name+"的郵箱是:"+email+"】"); } catch (Exception e){ e.printStackTrace(); } } }

啟動項目

/*** @Author panghl* @Date 2021/8/11 21:43* @Description TODO**/ public class TestRunner {public static void main(String[] args) throws IOException {//1.創建spring容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");//讓程序一直運行System.in.read();} }

2.5 消息成功確認機制

在實際場景下,有的生產者發送的消息是必須保證成功發送到消息隊列中,那么如何保證成功投遞呢?

  • 事務機制
  • 發布確認機制

2.5.1 事務機制

  • AMQP協議提供的一種保證消息成功投遞的方式,通過信道開啟 transactional 模式
  • 并利用信道 的三個方法來實現以事務方式 發送消息,若發送失敗,通過異常處理回滾事務,確保 消息成功投遞

channel.txSelect(): 開啟事務

channel.txCommit() :提交事務

channel.txRollback() :回滾事務

  • Spring已經對上面三個方法進行了封裝,所以我們只能使用原始的代碼演示

2.5.1.1 生產者

/*** @Author panghl* @Date 2021/8/10 23:32* @Description 通配符模式-生產者**/ public class Sender {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//topic : 模糊匹配的定向分發channel.exchangeDeclare("test_transaction_exchange_topic", "topic");channel.txSelect();String msg = "商品降價";try {channel.basicPublish("test_transaction_exchange_topic", "product.price", null, "商品1降價".getBytes()); // System.out.println(1/0);channel.basicPublish("test_transaction_exchange_topic", "product.price", null, "商品2降價".getBytes());System.out.println("[生產者-->消息已發送]");channel.txCommit();} catch (Exception e) {System.out.println("[生產者-->消息已全部撤銷]");channel.txRollback();}finally {channel.close();connection.close();}} }

2.5.1.2 消費者

/*** @Author panghl* @Date 2021/8/10 23:41* @Description 通配符模式-消費者**/ public class Recer1 {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();//聲明隊列 true 隊列持久化channel.queueDeclare("test_tx_exchange_topic_queue_1",false,false,false,null);//綁定路由(隊列名,路由名,路由key) 綁定用戶相關的消息channel.queueBind("test_tx_exchange_topic_queue_1","test_transaction_exchange_topic","product.#");DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body);System.out.println("【消費者】==>"+s);}};channel.basicConsume("test_tx_exchange_topic_queue_1",true,consumer);} }

2.5.2 Confirm發布確認機制

  • RabbitMQ為了保證消息的成功投遞,采用通過AMQP協議層面為我們提供事務機制的方案,但是 采用事務會大大降低消息的吞吐量
  • 我本機SSD硬盤測試結果10w條消息未開啟事務,大約8s發送完畢;而開啟了事務后,需要將 近310s,差了30多倍。
  • 接著翻閱官網,發現官網中已標注

  • ?那么有沒有更加高效的解決方式呢?答案就是采用Confirm模式。
  • 事務效率為什么會這么低呢?試想一下:10條消息,前9條成功,如果第10條失敗,那么9條消息 要全部撤銷回滾。太太太浪費
  • 而confirm模式則采用補發第10條的措施來完成10條消息的送達

2.5.2.1 在spring中應用

spring-rabbitmq-producer.xml

<!--1.配置連接 ,啟動生產者確認機制,publisher-confirms="true"--><rabbit:connection-factory id="connectionFactory"host="192.168.40.100"port="5672"username="panghl" password="panghl"virtual-host="/lagou"publisher-confirms="true"/>//省略....<!--6.配置rabbitmq的模板,添加確認回調處理類:confirm-callback="msgSendConfirmCallback"--><rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"exchange="spring_topic_exchange"message-converter="jsonMessageConverter"confirm-callback="messageConfirm"/><!--7.確認機制的處理類--><bean id="messageConfirm" class="confirm.MessageConfirm"/>

消息確認處理類

/*** @Author panghl* @Date 2021/8/11 22:14* @Description 確認機制,消息確認處理**/ public class MessageConfirm implements RabbitTemplate.ConfirmCallback {/**** @param correlationData 消息相關的數據對象(封裝了消息的唯一id)* @param b 消息是否確認成功* @param s 異常信息*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {if (b){System.out.println("消息確認成功!correlationData-->"+correlationData+"s->"+s);}else {System.out.println("消息確認失敗! correlationData-->"+correlationData+"s->"+s);//如果本條消息一定要發送到隊列中,例如下訂單消息,我們可以采用補發//1.采用遞歸(限制遞歸的次數)//2.redis+定時任務(jdk的timer,或者定時任務框架)}} }

log4j.properties

log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rabbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n log4j.rootLogger=debug, stdout,file

發送消息

/*** @Author panghl* @Date 2021/8/11 21:15* @Description 生產者**/ public class Sender {public static void main(String[] args) {//1.創建spring容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");//2.從容器中獲得rabbit模板對象RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);//3.發消息Map<String, String> map = new HashMap<>();map.put("name", "erguo2");map.put("email", "phl0425@qq2.com"); // 第一個參數是路由名稱, // 不寫,則使用spring容器中創建的路由 // 亂寫一個,因為路由名錯誤導致報錯,則進入消息確認失敗流程rabbitTemplate.convertAndSend("lalalal","msg.user",map);context.close();} }

2.6 消費端限流

  • ?生產者使用循環發出多條消息
for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("msg.user", map);System.out.println("消息已經發布。。。");}
  • 生產10條堆積未處理的消息

  • 消費者進行限流處理
<!--5.配置監聽--><!-- prefetch="3" 一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,一旦有 N 個消息還沒有ack,則該 consumer 將阻塞,直到消息被ack--><!-- acknowledge-mode: manual 手動確認--><rabbit:listener-containerconnection-factory="connectionFactory"prefetch="3" acknowledge="manual"><rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/></rabbit:listener-container>

package listener;import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener; import org.springframework.stereotype.Component;import java.io.IOException; import java.util.concurrent.TimeUnit;/*** @Author panghl* @Date 2021/8/11 21:33* @Description 消費者監聽隊列 -* AbstractAdaptableMessageListener用于在spring容器接收到消息后用于處理消息的抽象* 基類**/ @Component public class ConsumerListener extends AbstractAdaptableMessageListener implements MessageListener {// jackson提供序列化和反序列中使用最多的類,用來轉換json的private static final ObjectMapper MAPPER = new ObjectMapper();// @Override // public void onMessage(Message message) { // System.out.println("收到消息了....."); // String body = new String(message.getBody()); // //將message對象轉換成json // try { // JsonNode jsonNode = MAPPER.readTree(body); // String name = jsonNode.get("name").asText(); // String email = jsonNode.get("email").asText(); // System.out.println("從隊列中獲取【"+name+"的郵箱是"+email+"】"); // // System.out.println("休息三秒然后再接收消息"); // } catch (IOException e) { // e.printStackTrace(); // } // }@Overridepublic void onMessage(Message message, Channel channel) throws Exception {System.out.println("收到消息了.....");String body = new String(message.getBody());//將message對象轉換成jsontry { // JsonNode jsonNode = MAPPER.readTree(body); // String name = jsonNode.get("name").asText(); // String email = jsonNode.get("email").asText(); // System.out.println("從隊列中獲取【" + name + "的郵箱是" + email + "】");System.out.println("body-->"+body);long msgId =message.getMessageProperties().getDeliveryTag(); //確認收到(參數1,參數2) /* 參數1:RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞 增的正整數,delivery_tag 的范圍僅限于 Channel 參數2:為了減少網絡流量,手動確認可以被批處理,當該參數為 true 時,則可以 一次性確認 msgId 小于等于傳入值的所有消息 */channel.basicAck(msgId, true);TimeUnit.SECONDS.sleep(3);System.out.println("休息三秒然后再接收消息");} catch (IOException e) {e.printStackTrace();}} }
  • 每次確認接收3條消息

2.7 過期時間TTL

  • Time To Live:生存時間、還能活多久,單位毫秒
  • 在這個周期內,消息可以被消費者正常消費,超過這個時間,則自動刪除(其實是被稱為dead message并投入到死信隊列,無法消費該消息)
  • RabbitMQ可以對消息和隊列設置TTL
    1、通過隊列設置,隊列中所有消息都有相同的過期時間
    2、對消息單獨設置,每條消息的TTL可以不同(更顆粒化)

2.7.1 設置隊列TTL

spring-rabbitmq-producer.xml

<!--2.重新配置一個隊列,同時,對隊列中的消息設置過期時間--> <rabbit:queue name="test_spring_queue_ttl" auto-declare="true"><rabbit:queue-arguments><entry key="x-message-ttl" value-type="long" value="5000"></entry></rabbit:queue-arguments> </rabbit:queue>

2.7.1 設置消息TTL

  • 設置某條消息的ttl,只需要在創建發送消息時指定即可
<!--2.配置隊列--> <rabbit:queue name="test_spring_queue_ttl_2"> /*** @Author panghl* @Date 2021/8/11 21:15* @Description 生產者**/ public class Sender2 {public static void main(String[] args) throws IOException {//1.創建spring容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");//2.從容器中獲得rabbit模板對象RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);//3.發消息Map<String, String> map = new HashMap<>();map.put("name", "erguo2");map.put("email", "phl0425@qq2.com");//創建消息的配置對象MessageProperties properties = new MessageProperties();//設置過期時間3sproperties.setExpiration("3000");//創建消息ByteArrayOutputStream os = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(os);oos.writeObject(map);Message message = new Message(os.toByteArray(),properties);rabbitTemplate.convertAndSend("msg.user", message);System.out.println("消息已經發布。。。");context.close();} }

  • 如果同時設置了queue和message的TTL值,則二者中較小的才會起作用

2.8 死信隊列

1、DLX(Dead Letter Exchanges)死信交換機/死信郵箱,當消息在隊列中由于某些原因沒有被及時 消費而變成死信(dead message)后,這些消息就會被分發到DLX交換機中,而綁定DLX交換機 的隊列,稱之為:“死信隊列”

2、消息沒有被及時消費的原因:

????????消息被拒絕(basic.reject/ basic.nack)并且不再重新投遞 requeue=false

????????消息超時未消費

????????達到最大隊列長度

?spring-rabbitmq-producer-dlx.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd"><!--1.配置連接 ,啟動生產者確認機制,publisher-confirms="true" 192.168.40.66 keepalived集群后的 虛擬ip--><rabbit:connection-factory id="connectionFactory"host="192.168.40.100"port="5672"username="panghl" password="panghl"virtual-host="/lagou"publisher-confirms="true"/><!--3.配置rabbitAdmin:主要用于在java代碼中隊隊列的管理,用來創建,綁定,刪除隊列與交換機、發送消息--><rabbit:admin connection-factory="connectionFactory"/><!--6.配置rabbitmq的模板,添加確認回調處理類:confirm-callback="msgSendConfirmCallback"--><rabbit:template id="rabbitTemplate"connection-factory="connectionFactory"exchange="my_exchange"/><!-- ############################################################################ ##########################################--><!--死信隊列--><rabbit:queue name="dlx_queue"/><!--定向死信交換機--><rabbit:direct-exchange name="dlx_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="dlx_queue" /><rabbit:binding key="dlx_max" queue="dlx_queue" /></rabbit:bindings></rabbit:direct-exchange><!--聲明定向的測試消息的交換機--><rabbit:direct-exchange name="my_exchange"><rabbit:bindings><rabbit:binding key="dlx_ttl" queue="test_ttl_queue"></rabbit:binding><rabbit:binding key="dlx_max" queue="test_max_queue"></rabbit:binding></rabbit:bindings></rabbit:direct-exchange><!--聲明 測試過期的消息隊列--><rabbit:queue name="test_ttl_queue"><rabbit:queue-arguments><!--設置隊列的過期時間TTL--><entry key="x-message-ttl" value-type="long" value="20000"/><!--消息超時 投遞給死信交換機--><entry key="x-dead-letter-exchange" value="dlx_exchange"/></rabbit:queue-arguments></rabbit:queue><!--聲明 測試超過長度的消息隊列--><rabbit:queue name="test_max_queue"><rabbit:queue-arguments><!--設置隊列的額定長度(本隊列最多裝兩個消息)--><entry key="x-max-length" value-type="long" value="2" /><!--消息如果超出長度 投遞給死信交換機--><entry key="x-dead-letter-exchange" value="dlx_exchange"/></rabbit:queue-arguments></rabbit:queue></beans>

3、發消息進行測試

/*** @Author panghl* @Date 2021/8/11 21:15* @Description 生產者-死信**/ public class SenderDLX {public static void main(String[] args) throws IOException {//1.創建spring容器ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");//2.從容器中獲得rabbit模板對象RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);//3.發消息 // rabbitTemplate.convertAndSend("dlx_ttl", "超時,關閉訂單".getBytes());rabbitTemplate.convertAndSend("dlx_max", "測試長度1".getBytes());rabbitTemplate.convertAndSend("dlx_max", "測試長度2".getBytes());rabbitTemplate.convertAndSend("dlx_max", "測試長度3".getBytes());System.out.println("消息已經發布。。。");context.close();} }

2.9 延遲隊列

  • 延遲隊列:TTL + 死信隊列的合體
  • 死信隊列只是一種特殊的隊列,里面的消息仍然可以消費
  • 在電商開發部分中,都會涉及到延時關閉訂單,此時延遲隊列正好可以解決這個問題

2.9.1 生產者

沿用上面死信隊列案例的超時測試,超時時間改為訂單關閉時間即可

2.9.2 消費者

spring-rabbitmq-consumer.xml

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:rabbit="http://www.springframework.org/schema/rabbit"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"><!--1.配置連接--><rabbit:connection-factory id="connectionFactory"host="192.168.40.100"port="5672"username="panghl" password="panghl"virtual-host="/lagou"/><!--2.配置隊列--><rabbit:queue name="test_spring_queue_ttl" /><!--3.配置rabbitAdmin:主要用于在java代碼中隊隊列的管理,用來創建,綁定,刪除隊列與交換機、發送消息--><rabbit:admin connection-factory="connectionFactory"/><!--4.注解掃描包(SpringIOC)--><context:component-scan base-package="listener"/><!--5.配置監聽--><!-- prefetch="3" 一次性消費的消息數量。會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,一旦有 N 個消息還沒有ack,則該 consumer 將阻塞,直到消息被ack--><!-- acknowledge-mode: manual 手動確認--><rabbit:listener-containerconnection-factory="connectionFactory"prefetch="3" acknowledge="manual"><rabbit:listener ref="consumerListener" queue-names="test_spring_queue_ttl"/></rabbit:listener-container><!-- 監聽死信隊列 --><rabbit:listener-container connection-factory="connectionFactory" prefetch="3"acknowledge="manual"><rabbit:listener ref="consumerListener" queue-names="dlx_queue" /></rabbit:listener-container></beans>

3、集群搭建

由于篇幅過長,請移步:https://blog.csdn.net/qq_45441466/article/details/119699104

總結

以上是生活随笔為你收集整理的RabbitMQ详解及集群搭建的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。