日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ保姆级教程

發布時間:2025/3/12 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ保姆级教程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 前言
  • 一、MQ是什么?
    • 1.1 AMQP
  • 二、在Linux安裝RabbitMQ
    • 2.1 安裝
    • 2.2 RabbitMQ啟動命令
    • 2.3 開啟RabbitMQ 后臺管理界面
      • 2.3.1 登錄rabbitMQ UI界面
    • 2.3 Docker啟動RabbitMQ
    • 2.4 常見消息模型
    • 2.5 生產者(Producer) / 消費者(Consumer)
    • 2.6 工作隊列模式(Work Queues)
    • 2.7 參數細節
    • 2.8 實現能者多勞
      • 2.8.1 Ack手動應答防止數據丟失和消息拒收后重新發送
      • 2.8.2 預取值
    • 2.9 Publish/Subscribe 發布/訂閱
    • 2.10 Routing(路由) - Direct
    • 2.11 Routing(路由)- Topic
  • 三、進階篇 高級特性
    • 3.1 死信隊列
      • 3.1.1 死信隊列實戰:消息TTL過期
      • 3.1.2 死信隊列實戰:隊列達到最大長度 設置正常隊列最大長度
      • 3.1.3 死信隊列實戰:消息被拒
    • 3.2 基于SpringBoot實現延遲隊列
    • 3.3 發布確認 高級特性
      • 3.3.1 可靠性投遞confirm模式
      • 3.3.2 可靠性投遞return模式
    • 3.4 優先級隊列
    • 3.5 消費端限流

前言

提示:RaabitMQ消息隊列的學習。


一、MQ是什么?

  • MQ全稱 Message Queue(消息隊列),是在消息的傳輸過程中保存消息的容器。多用于分布式系統
    之間進行通信。
  • RabbitMQ 是一個消息中間件:它接受并轉發消息。你可以把它當做一個快遞站點,當你要發送一個包
    裹時,你把你的包裹放到快遞站,快遞員最終會把你的快遞送到收件人那里,按照這種邏輯 RabbitMQ 是
    一個快遞站,一個快遞員幫你傳遞快件。RabbitMQ 與快遞站的主要區別在于,它不處理快件而是接收,
    存儲和轉發消息數據。
  • 工作原理

1.1 AMQP

  • AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協議),是一個網絡協議,是應用
    層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,遵
    循此協議,不收客戶端和中間件產品和開發語言限制。2006年,AMQP 規范發布。類比HTTP。

二、在Linux安裝RabbitMQ

2.1 安裝

1. 我們把erlang環境與rabbitMQ 安裝包解壓到Linux2. rpm -ivh erlang安裝包3. yum install socat -y 安裝依賴 / rpm -ivh socat依賴包 --force --nodeps4. rpm -ivh rabbitmq安裝包

2.2 RabbitMQ啟動命令

1. 開啟服務 /sbin/service rabbitmq-server start / service rabbitmq-server start 2. 停止服務 service rabbitmq-server stop 3. 重啟服務 service rabbitmq-server restart

2.3 開啟RabbitMQ 后臺管理界面

1. rabbitmq-plugins enable rabbitmq_management
  • 添加一個新的用戶
1. 創建rabbitMQ賬號rabbitmqctl add_user 用戶名 密碼2. 設置用戶角色rabbitmqctl set_user_tags 用戶名 administrator #設置用戶名為超級管理員3. 設置用戶權限rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"4. 查看rabbitmq的用戶和角色rabbitmqctl list_users5. 登錄rabbitMQ 界面: Linux虛擬機ip:15672 即可

2.3.1 登錄rabbitMQ UI界面

記得開放15672端口訪問 Linux虛擬機ip:15672 即可

輸入賬戶密碼后看到這個界面代表成功

2.3 Docker啟動RabbitMQ

Docker安裝

1. docker pull rabbitmq:3-management2. 開啟rabbitMQdocker run \-e RABBITMQ_DEFAULT_USER=root \-e RABBITMQ_DEFAULT_PASS=123456 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

2.4 常見消息模型

  • channel:操作MQ的工具
  • exchange:路由消息到隊列中
  • queue:緩存消息
  • virtual host:虛擬主機,是對queue、exchange等資源的邏輯分組

2.5 生產者(Producer) / 消費者(Consumer)

  • 所需依賴
<dependencies><!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version></dependency><!-- https://mvnrepository.com/artifact/commons-io/commons-io --><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency></dependencies> 1234567891011121314

  • 生產者代碼
/*** 生產者:發消息*/ public class Producer {//隊列名稱public static final String QUEUE_NAME="hello";//發消息public static void main(String[] args) throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();//工廠IP連接rabbitMQ隊列factory.setHost("ip地址");//設置用戶名密碼factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來發消息Channel channel = connection.createChannel();/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的信息是否持久化 默認false 信息存儲在內存中* 3.該列隊是否只供一個消費者進行消費,是否進行消息共享* true:可以多個消費者消費* false:只能一個消費者消費* 4.是否自動刪除,最后一個消費者斷開連接后,該隊列是否自動刪除* true:自動刪除* false:不自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//發消息String message="hello rabbitMQ";/*** 發送一個消息* 1.發送到哪個交換機* 2.路由的KEY值是哪個? 指的是本次隊列的名稱* 3.其他參數信息* 4.發送的消息體*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息發送完畢");channel.close();connection.close();} }
  • 消費者
/*** 消費者:接收消息*/ public class Consumer {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//創建連接工程ConnectionFactory factory = new ConnectionFactory();//工廠IP連接rabbitMQ隊列factory.setHost("ip地址");//設置用戶名密碼factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來收消息Channel channel = connection.createChannel();//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息頭和消息體,我們只想拿到消息體//若不進行轉換,直接輸出message我們拿到的則是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }

2.6 工作隊列模式(Work Queues)

  • 模式說明
  • Work Queues:與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消費,采用的是 輪詢機制
  • 應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度
  • 工作模式:生產者
public class ProducerWorkQueue {//隊列名稱public static final String QUEUE_NAME="hello";//發消息public static void main(String[] args) throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();//工廠IP連接rabbitMQ隊列factory.setHost("ip地址");//設置用戶名密碼factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來發消息Channel channel = connection.createChannel();/*** 生成一個隊列* 1.隊列名稱* 2.隊列里面的信息是否持久化 默認false 信息存儲在內存中* 3.該列隊是否只供一個消費者進行消費,是否進行消息共享* true:可以多個消費者消費* false:只能一個消費者消費* 4.是否自動刪除,最后一個消費者斷開連接后,該隊列是否自動刪除* true:自動刪除* false:不自動刪除* 5.其他參數*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i = 1; i <= 10; i++) {//發消息String message=i+"hello rabbitMQ";/*** 發送一個消息* 1.發送到哪個交換機* 2.路由的KEY值是哪個? 指的是本次隊列的名稱* 3.其他參數信息* 4.發送的消息體*/channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息發送完畢");}channel.close();connection.close();} }
  • 工作模式:兩個消費者
/*** 消費者:接收消息*/ public class ConsumerWorkQueues1 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//創建連接工程ConnectionFactory factory = new ConnectionFactory();//工廠IP連接rabbitMQ隊列factory.setHost("ip地址");//設置用戶名密碼factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來收消息Channel channel = connection.createChannel();//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息頭和消息體,我們只想拿到消息體//若不進行轉換,直接輸出message我們拿到的則是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} } /*** 消費者:接收消息*/ public class ConsumerWorkQueues2 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//創建連接工程ConnectionFactory factory = new ConnectionFactory();//工廠IP連接rabbitMQ隊列factory.setHost("ip地址");//設置用戶名密碼factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來收消息Channel channel = connection.createChannel();//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息頭和消息體,我們只想拿到消息體//若不進行轉換,直接輸出message我們拿到的則是地址String data = new String(message.getBody());System.out.println(new String(message.getBody()));};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);} }
  • 結果:各執行五次,也驗證了 我們上面所說的 輪詢機制

  • 小結:
    一個消息只能有一個接收者,但是可以有多個接收者

2.7 參數細節

  • durable:是否進行持久化,當前隊列如果進行持久化,我們重啟rabbitMQ后當前隊列依舊存在
//消費者生成的隊列channel.queueDeclare(QUEUE_NAME,(durable)true/false,false,false,null);
  • props :隊列中的信息是否持久化,若消息持久化,我們重啟rabbitMQ后當前隊列依舊存在
//MessageProperties.PERSISTENT_TEXT_PLAIN:將消息進行持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
  • autoDelete:是否自動刪除,最后一個消費者斷開連接后,該隊列是否自動刪除
channel.queueDeclare(QUEUE_NAME,false,false,(autoDelete的參數位置)false,null);
  • autoAck:自動應答
若開啟了自動應答,rabbitMQ消息隊列分配給消費者10個數據,只要消費者拿到消息隊列的數據時,就會告訴消息隊列,數據處理完畢。若當我們處理到第5個數據時,消費者出現了宕機,死掉了,則會出現數據丟失 channel.basicConsume(QUEUE_NAME,(autoAck是否自動應答)false,deliverCallback,cancelCallback);

2.8 實現能者多勞

  • 業務場景:

    當我們的兩個消費者執行業務時,a消費者執行速度快,b消費者執行速度慢,我們想讓執行速度快的多執行,應當如何實現呢?

  • 開啟不公平分發,能者多勞 channel.basicQos(1); 0:輪詢機制 1:能者多勞
  • 開啟手動確認
  • 消費者a

/*** 消費者:接收消息*/ public class ConsumerWorkQueues1 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//開啟不公平分發,能者多勞channel.basicQos(1);DeliverCallback deliverCallback=(consumerTag, message)-> {String data = new String(message.getBody());System.out.println(new String(message.getBody()));//參數1:確認隊列中那個具體的消息:// 可以獲取消息的id // 消息routingkey// 交換機 exchange// 消息和重傳標志//參數2:是否開啟多個消息同時確認channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*///關閉自動應答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }
  • 消費者b:消費消息時然消費者b休眠100毫秒
public class ConsumerWorkQueues2 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//開啟不公平分發,能者多勞channel.basicQos(1);//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手動確認消息://參數1:確認隊列中那個具體的消息 參數2:是否開啟多個消息同時確認channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*///關閉自動應答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);}
  • 執行結果:
    消費者a執行

    消費者b執行

2.8.1 Ack手動應答防止數據丟失和消息拒收后重新發送

  • 應用場景:兩個消費者每次都從隊列中來獲取消息,若消費者a正常執行,消費者b在執行過程中出現了宕機,掛掉了那么我們未被消費的消息會被重新放回到隊列中,防止消息丟失。
  • 生產者
  • public class ProducerWorkQueue {//隊列名稱public static final String QUEUE_NAME="hello";//發消息public static void main(String[] args) throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (true){String msg = scanner.nextLine();channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());System.out.println("消息發送完畢");}} }
  • 消費者a
  • public class ConsumerWorkQueues1 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//創建連接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來收消息Channel channel = connection.createChannel();//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息頭和消息體,我們只想拿到消息體//若不進行轉換,直接輸出message我們拿到的則是地址String data = new String(message.getBody());System.out.println("消費者1===>"+new String(message.getBody()));try {int i=3/0;//模擬業務發生異常channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}catch (Exception e){System.out.println("拒收消息發生了異常");//拒收消息//參數一:表示投遞的消息標簽//參數二:是否開啟多個消息同時確認//參數三:是否重新給隊列發送channel.basicNack(message.getEnvelope().getDeliveryTag(),false,true);}};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*///關閉自動應答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }
  • 消費者b
  • public class ConsumerWorkQueues2 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception{//創建連接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來收消息Channel channel = connection.createChannel();//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息頭和消息體,我們只想拿到消息體//若不進行轉換,直接輸出message我們拿到的則是地址System.out.println("睡10秒");try {Thread.sleep(1000*10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(new String(message.getBody()));//手動確認消息://參數1:確認隊列中那個具體的消息 參數2:是否開啟多個消息同時確認channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*///關閉自動應答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }
    • 當消費者b在消費消息時,我們讓消費者b睡眠10秒模擬業務流程,在這10秒內我們手動關掉消費者b
  • 發送 aa 消費者a接收
  • 發送bb消費者b接收,在消費者b睡眠過程中我們停止消費者b,來看看手動應答的結果

    此時我們查看消費者a,出現了本應該是消費者b消費的消息bb
  • 2.8.2 預取值

    channel.basicQos(1); 0:輪詢機制 1:能者多勞 若值>1代表當前隊列的預取值,代表當前隊列大概會拿到多少值

    2.9 Publish/Subscribe 發布/訂閱

    • 也可以叫 廣播模式,當我們的P消費者發送了消息,交給了X(交換機),所有綁定了這個X(交換機)的隊列都可以接收到P消費者發送的消息
    • 代碼實現生產者
    public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//將通道聲明指定交換機, 參數一:交換機名稱 參數二:交換機類型 fanout廣播類型 //參數2:交換機類型也可使用 BuiltinExchangeType. 的方式來查看選擇channel.exchangeDeclare("order", "fanout");channel.basicPublish("order","",null,"fanout type message".getBytes());channel.close();connection.close();} }
    • 代碼實現消費者
    public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通道綁定交換機channel.exchangeDeclare("order","fanout");//獲取臨時隊列名稱String queueName = channel.queueDeclare().getQueue();//綁定交換機和隊列channel.queueBind(queueName,"order","");channel.basicConsume(queueName,true,(consumerTag,message)->{System.out.println("消費者1===>"+new String(message.getBody()));},consumerTag -> System.out.println("取消消費消息"));} }

    2.10 Routing(路由) - Direct

    routing值訂閱模型-Direct(直連)

    • 在上面廣播模式中,一條消息,會被所有訂閱的隊列都消費。但是在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange

      在Direct模型下:

    • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
    • 消息的發送方在Exchange發送消息時,也必須指定消息的RoutingKey
    • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的RoutingKey與消息的Routing Key完全一致,才會接受到消息
    • 生產者

    public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通過信道聲明交換機, 參數一:交換機名稱 參數二:direct 路由模式channel.exchangeDeclare("logsExchange","direct");//發送消息 參數一:發送信息到的交換機名稱// 參數二:綁定路由 發送給隊列的那個路由key,//只有當隊列的路由key與交換機的路由key相對應時,隊列才會接受到消息channel.basicPublish("logsExchange","msgRouting",null,"routing logs direct info 發送了消息".getBytes());channel.close();connection.close();} }
    • 消費者
    public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs","direct");//獲取臨時隊列名String queueName = channel.queueDeclare().getQueue();//綁定隊列:參數一:臨時隊列名稱 參數二:綁定的交換機名稱 參數三:路由key,若消費者的路由key與生產者的路由key相同則可以收到消息channel.queueBind(queueName,"logsExchange","infoRouting");channel.queueBind(queueName,"logsExchange","msgRouting");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));} }
    • 消費者2
    public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("logs","direct");//獲取臨時隊列名String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,"logs","error");channel.queueBind(queueName,"logs","msg");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));} }

    2.11 Routing(路由)- Topic

    • Topic類型的Exchange與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。
    • 只不過Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符!
    #通配符* (star) can substitute for exactly one word :匹配一個詞# (hash) can substitute for zero or more words :匹配一個或多個詞
    • 生產者
    public class Provider {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//通過信道聲明交換機, 參數一:交換機名稱 參數二:topic 動態路由channel.exchangeDeclare("order","topic");String routingKey="user.order";//發送消息 參數一:發送信息到的交換機名稱 參數二:綁定路由 發送給隊列的那個路由keychannel.basicPublish("order",routingKey,null,("routing logs topic發送了消息"+routingKey).getBytes());channel.close();connection.close();} }
    • 消費者1
    public class Consumer1 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("order","topic");//獲取臨時隊列名String queueName = channel.queueDeclare().getQueue();//綁定隊列:參數一:臨時隊列名稱 參數二:綁定的交換機名稱 參數三:動態通配符路由keychannel.queueBind(queueName,"order","user.*");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));} }
    • 消費者2
    public class Consumer2 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("order","topic");//獲取臨時隊列名String queueName = channel.queueDeclare().getQueue();//綁定隊列:參數一:臨時隊列名稱 參數二:綁定的交換機名稱 參數三:動態通配符路由keychannel.queueBind(queueName,"order","user.#");channel.basicConsume(queueName,true,(consumerTag, message) -> System.out.println(new String(message.getBody())),consumerTag -> System.out.println(1));} }

    三、進階篇 高級特性

    3.1 死信隊列

    死信,顧名思義就是無法被消費的信息,字面意思可以這樣理解,一般來說,producer將消息投遞到queue里,consumer從queue取出消息進行消費,但某些時候由于特定的原因導致queue中的某些消息無法被消費,這樣的消息如果沒有后續的處理,就變成了死信,自然就有了死信隊列
    • 應用場景
    為了保證訂單業務的消息數據不丟失,需要使用到RabbitMQ的死信隊列機制,當消息消費發生異常時,將消息投入死信隊列中。比如說:用戶在商城下單成功并點擊去支付后在指定時間未支付時自動失效
    • 生產者:給正產的消息隊列發送消息,并且設置消息過期時間為10S,超過10S消息未被消費,則消息進入死信隊列
    public class TTLProvider {//普通交換機名稱public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("賬戶");factory.setPassword("密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//發送死信 設置TTL過期時間AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();for (int i = 1; i <= 10; i++) {String msg=""+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",properties,msg.getBytes());}System.out.println("結束發送");} }
    • 正常隊列消費者
    public class TTLConsumer1 {//普通交換機名稱public static final String NORMAL_EXCHANGE="normal_exchange";//死信交換機名稱public static final String DEAD_EXCHANGE="dead_exchange";//普通隊列名稱public static final String NORMAL_QUEUE="normal_queue";//死信隊列名稱public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("賬戶");factory.setPassword("密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明普通交換機和死信交換機channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//聲明普通隊列HashMap<String, Object> map = new HashMap<>();//當消息被拒絕接受/未被消費 會將消息轉發到死信隊列//正常隊列設置死信交換機map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//設置死信隊列的routingKeymap.put("x-dead-letter-routing-key","dead");channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//聲明死信隊列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//綁定普通交換機與普通隊列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//綁定死信交換機與死信隊列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);} }
    • 死信隊列消費者
    public class TTLConsumer2 {//死信隊列名稱public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("賬戶");factory.setPassword("密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }
    • 結果:當設置了死信隊列,和TTL過期時間,若超過了過期時間消息未被消費,則消息會轉發到死信隊列
      死信隊列產生三大原因
    • 消息被拒接
    • 消息TTL過期
    • 隊列達到最大長度

    3.1.1 死信隊列實戰:消息TTL過期

    • 配置類
    @Configuration public class RabbitMQConfiguration {//普通交換機public static final String X_EXCHANGE="X";//死信交換機public static final String Y_DEAD_LETTER_EXCHANGE="Y";//普通隊列public static final String QUEUE_A="QA";public static final String QUEUE_B="QB";//死信隊列public static final String DEAD_QUEUE_D="QD";//聲明普通x交換機@Beanpublic DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//聲明死信交換機@Beanpublic DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//聲明普通隊列A TTL:10S@Beanpublic Queue queueA(){Map<String,Object> arg=new HashMap<>();//設置死信交換機arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//設置死信routingKeyarg.put("x-dead-letter-routing-key","YD");//設置TTL過期時間arg.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arg).build();}//聲明普通隊列B TTL:40S@Beanpublic Queue queueB(){Map<String,Object> arg=new HashMap<>();//設置死信交換機arg.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//設置死信routingKeyarg.put("x-dead-letter-routing-key","YD");//設置TTL過期時間arg.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arg).build();}//死信隊列@Beanpublic Queue queueD(){return QueueBuilder.durable(DEAD_QUEUE_D).build();}@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");} }
    • TTL生產者
    @RestController @RequestMapping("/ttl") @Slf4j public class TTLProvider {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{msg}")public void sendMsg(@PathVariable("msg") String msg){log.info("當前發送時間:{}發送了一條消息",new Date().toString());rabbitTemplate.convertAndSend("X","XA","TTL消息延遲為10S,消息為===>"+msg);rabbitTemplate.convertAndSend("X","XB","TTL消息延遲為40S,消息為===>"+msg);} }
    • 死信隊列消費者
    @Component @Slf4j public class DeadLetterConsumer {@RabbitListener(queues = "QD")public void t1(Message message, Channel channel)throws Exception{log.info("收到死信隊列的消息{},時間為{}",new String(message.getBody(),"UTF-8"),new Date().toString());} }
    • 死信隊列-TTL過期時間測試結果

    3.1.2 死信隊列實戰:隊列達到最大長度 設置正常隊列最大長度

  • 生產者
  • public class Producer {//普通交換機名稱public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {String msg=""+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());}} }
  • 消費者a
    //設置當前正常隊列的長度限制超過長度,后面的消息會進入到死信隊列
    map.put(“x-max-length”,6);
  • public class Consumer01 {//普通交換機名稱public static final String NORMAL_EXCHANGE="normal_exchange";//死信交換機名稱public static final String DEAD_EXCHANGE="dead_exchange";//普通隊列名稱public static final String NORMAL_QUEUE="normal_queue";//死信隊列名稱public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明普通交換機和死信交換機channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//聲明普通隊列HashMap<String, Object> map = new HashMap<>();//正常隊列設置死信交換機map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//設置死信隊列的routingKeymap.put("x-dead-letter-routing-key","dead");//設置當前正常隊列的長度限制超過長度,后面的消息會進入到死信隊列map.put("x-max-length",6);channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//聲明死信隊列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//綁定普通交換機與普通隊列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//綁定死信交換機與死信隊列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);} }
  • 消費者b
  • public class Consumer02 {//死信隊列名稱public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }

    3.1.3 死信隊列實戰:消息被拒

  • 生產者
  • public class Producer {//普通交換機名稱public static final String NORMAL_EXCHANGE="normal_exchange";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 1; i <= 10; i++) {String msg="info"+i;channel.basicPublish(NORMAL_EXCHANGE,"normal",null,msg.getBytes());}} }
  • 消費者a
    • 此消息被拒接,是否重新放回正常隊列, false:不放回 則會放到死信隊列
    • 1.channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
    • 2.并且開啟手動應答
    public class Consumer01 {//普通交換機名稱public static final String NORMAL_EXCHANGE="normal_exchange";//死信交換機名稱public static final String DEAD_EXCHANGE="dead_exchange";//普通隊列名稱public static final String NORMAL_QUEUE="normal_queue";//死信隊列名稱public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("登錄賬戶");factory.setPassword("登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明普通交換機和死信交換機channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");channel.exchangeDeclare(DEAD_EXCHANGE,"direct");//聲明普通隊列HashMap<String, Object> map = new HashMap<>();//正常隊列設置死信交換機map.put("x-dead-letter-exchange",DEAD_EXCHANGE);//設置死信隊列的routingKeymap.put("x-dead-letter-routing-key","dead");channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//聲明死信隊列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//綁定普通交換機與普通隊列channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normal");//綁定死信交換機與死信隊列channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"dead");DeliverCallback deliverCallback=( consumerTag, message)->{String msg=new String(message.getBody());if("info5".equals(msg)){System.out.println("Consumer1接收消息===>"+msg+"此消息被拒絕");//此消息被拒接,是否重新放回正常隊列, false:不放回 則會放到死信隊列channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else {System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);//開啟手動應答channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback);} }
  • 消費者b
  • public class Consumer02 {//死信隊列名稱public static final String DEAD_QUEUE="dead_queue";public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();DeliverCallback deliverCallback=( consumerTag, message)->{System.out.println("Consumer1接收消息===>"+new String(message.getBody(),"UTF-8"));};CancelCallback cancelCallback=(consumerTag)-> System.out.println(consumerTag);channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);} }

    3.2 基于SpringBoot實現延遲隊列

  • 配置隊列交換機
  • @Configuration public class QueueConfig {@Bean("exchange")public DirectExchange exchange(){return new DirectExchange("msg");}@Bean("simpleQue")public Queue simpleQue(){HashMap<String, Object> map = new HashMap<>();//設置死信交換機map.put("x-dead-letter-exchange","dead");//設置死信路由map.put("x-dead-letter-routing-key","deadKey");//消息失效時間map.put("x-message-ttl",10000);return new Queue("simple",false,false,false,map);}@Beanpublic Binding simpleQueueBandingExchange(@Qualifier("simpleQue") Queue simple,@Qualifier("exchange") DirectExchange msg)throws Exception{return BindingBuilder.bind(simple).to(msg).with("info");}@Bean("deadExchange")public DirectExchange exchange1(){return new DirectExchange("dead");}@Bean("deadQueue")public Queue deadQ(){return new Queue("deadQue",false,false,false,null);}@Beanpublic Binding deadKeyBindingDeadExchange(@Qualifier("deadQueue")Queue queue,@Qualifier("deadExchange")DirectExchange dead){//綁定死信隊列到死信交換機通過路由return BindingBuilder.bind(queue).to(dead).with("deadKey");} }
  • 生產者
  • @RestController public class Provider {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ttl/{message}")public void t1(@PathVariable String message){String queueName="simple";Date date = new Date();System.out.println(date);rabbitTemplate.convertAndSend("msg","info",message);} }
  • 消費者
  • @Component public class Consumer {@RabbitListener(queues = "deadQue")public void hello(Message msg, Channel channel)throws Exception{System.out.println("接收到消息"+new String(msg.getBody()));Date date1 = new Date();System.out.println(date1);} }
    • 我們看到消息每隔十秒更新一次

    3.3 發布確認 高級特性

    3.3.1 可靠性投遞confirm模式

    • 場景:在生產環境中由于一些不明原因,導致rabbitmq重啟,在rabbitmq重啟期間的生產者消息投遞失敗,導致消息丟失,需要手動處理和恢復。-可靠性投遞confirm模式
    • 需要在application核心配置文件中設置發布確認類型
    • spring-rabbitmq-publisher-confirm-type: correlated
    • 類型1:none:禁用發布確認模式,是默認值
    • 類型2:correlated:發布消息成功到交換機后出發回調方法
    • 類型3:simple:和correlated效果一樣,但是如果回調返回的是false,會關閉信道,接下來無法發送消息
  • 配置類
  • @Component public class confirmConfig {public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";public static final String CONFIRM_QUEUE="confirm.queue";public static final String CONFIRM_ROUTING_KEY="confirm";@Bean("confirmExchange")public DirectExchange confirmExchange(){return new DirectExchange(CONFIRM_EXCHANGE_NAME);}@Bean("confirmQueue")public Queue confirmQueue(){return new Queue(CONFIRM_QUEUE);}@Beanpublic Binding confirmQueueBindingConfirmExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,@Qualifier("confirmQueue")Queue confirmQueue){return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);} }
    • 當生產者發送給交換機消息時,交換機的名字錯了,或者交換機掛掉了,會導致消息的丟失,那么我們需要實現回調接口,當交換機收到消息后會給生產者發送回調消息
  • 實現回調接口:實現 RabbitTemplate.ConfirmCallback接口的confirm方法并且將其注入到rabbit模板的內部類中
  • @Component @Slf4j public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstruct //當所有注解執行完后,再執行這個注解public void init(){rabbitTemplate.setConfirmCallback(this);}/*** 交換機確認回調方法* 發消息,交換機接收到了,回調* 參數* 1. correlationData:保存消息的ID及相關信息,這個消息是我們生產者手動傳入的* 2. 交換機收到消息 true* 3. null*//*** 交換機確認回調方法* 發消息,交換機接收失敗,回調* 參數* 1. correlationData:保存消息的ID及相關信息* 2. 交換機收到消息 false* 3. cause:失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id=correlationData!=null?correlationData.getId():"";if(b){log.info("交換機已經收到了ID為{}的消息",id);}else {log.info("交換機為收到了ID為{}的消息,原因是:{}",id,s);}} }
  • 生產者
  • @RestController public class ConfirmProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{msg}")public void t1(@PathVariable String msg){CorrelationData correlationData = new CorrelationData();correlationData.setId("1");rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,"嘿嘿嘿".getBytes(),correlationData);} }
  • 消費者
  • @Component public class ConfirmConsumer {@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)public void consumer(Message message){System.out.println("高級特性確認發布消費者收到了消息===>"+new String(message.getBody()));} }
    • 測試:當我們正常發送消息
    • 測試:當我們把交換機名字換掉

    3.3.2 可靠性投遞return模式

    • 場景:若交換機收到消息,隊列沒有收到消息,應該如何解決?
    • 需要在application核心配置文件中設置是否回退消息,當消息路由不到消費者
    • spring-rabbitmq-publisher-returns=true 開啟回退消息
    @Component @Slf4j public class ConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstruct //當所有注解執行完后,再執行這個注解public void init(){rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交換機確認回調方法* 發消息,交換機接收到了,回調* 參數* 1. correlationData:保存消息的ID及相關信息* 2. 交換機收到消息 true* 3. null*//*** 交換機確認回調方法* 發消息,交換機接收失敗,回調* 參數* 1. correlationData:保存消息的ID及相關信息* 2. 交換機收到消息 false* 3. cause:失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {String id=correlationData!=null?correlationData.getId():"";if(b){log.info("交換機已經收到了ID為{}的消息",id);}else {log.info("交換機未收到了ID為{}的消息,原因是:{}",id,s);}}/*** 消息傳遞過程中 不可達 消費者的隊列時將消息返回給生產者* 只有當消息 不可達 目的地的時候 才進行回調* 參數1:消息體* 參數2:回復代碼* 參數3:回復原因* 參數4:交換機* 參數5:路由key*/@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {log.info("消息{},被交換機{}退回,原因是{},路由是{}",new String(message.getBody()),s1,s,s2);}}

    3.4 優先級隊列

    • 優先級越高,消息先被消費者消費
    • 官方設置最大優先級 0-255 超出優先級則報錯 自己使用時數字不必設置很大,會浪費CPU效率
  • 生產者
  • public class PriorityProducer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//設置優先級參數AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().priority(10).build();for (int i = 1; i <= 10; i++) {String msg="info"+i;if(i==5){channel.basicPublish("","hi",build,msg.getBytes());}else {channel.basicPublish("","hi",null,msg.getBytes());}}} }
  • 消費者
  • public class PriorityConsumer {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip地址");factory.setUsername("RabbitMQ登錄用戶名");factory.setPassword("RabbitMQ登錄密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();HashMap<String, Object> map = new HashMap<>();//設置當前隊列為優先級隊列map.put("x-max-priority",10);channel.queueDeclare("hi",false,false,false,map);channel.basicConsume("hi",true,(consumerTag,message)->{System.out.println("優先級隊列接收消息順序===>"+new String(message.getBody()));},(consumerTag) -> System.out.println("取消回調"));} }
    • 測試結果:我們定義的是消息5優先級最高,其他消息為默認優先級

    3.5 消費端限流

    • 參數一:prefetchSize:預先載入的大小 0表示不限制大小
    • 參數二:prefetchCount:預先載入的消息條數
    • 參數三:global:false
    • 注意:autoAck手動應答一定要為false
    //設置每次確定一個消息channel.basicQos(0,1,false); 12
    • 生產者
    public class AckProvider {//隊列名稱public static final String QUEUE_NAME="hello_Ack";//發消息public static void main(String[] args) throws Exception{//創建一個連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("用戶");factory.setPassword("密碼");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);while (true){String msg = scanner.nextLine();channel.basicPublish("",QUEUE_NAME, null,msg.getBytes());System.out.println("消息發送完畢");}} }
    • 消費者
    public class AckConsumer2 {//隊列名稱,接收此隊列的消息public static final String QUEUE_NAME="hello_Ack";public static void main(String[] args) throws Exception{//創建連接工程ConnectionFactory factory = new ConnectionFactory();factory.setHost("ip");factory.setUsername("用戶");factory.setPassword("密碼");//創建連接Connection connection = factory.newConnection();//通過連接來獲取 信道來收消息Channel channel = connection.createChannel();//聲明 接收消息的回調DeliverCallback deliverCallback=(consumerTag, message)-> {//message:包含消息頭和消息體,我們只想拿到消息體//若不進行轉換,直接輸出message我們拿到的則是地址System.out.println(new String(message.getBody()));try {Thread.sleep(1000*5);} catch (InterruptedException e) {e.printStackTrace();}//手動確認消息://參數1:確認隊列中那個具體的消息 參數2:是否開啟多個消息同時確認channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//聲明 取消消費的回調CancelCallback cancelCallback=consumerTag->{System.out.println("消費消息被中斷");};//每次只消費一個channel.basicQos(0,1,false);/*** 消費者消費消息* 1.消費哪個隊列* 2.消費成功之后是否要自動答應* true:代表自動應答* false:手動應答* 3.消費成者成功消費的回調* 4.消費者取消消費的回調*///關閉自動應答 falsechannel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);} }

    總結

    以上是生活随笔為你收集整理的RabbitMQ保姆级教程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。