日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【消息中间件】AMQPRabbitMQ工作模式

發布時間:2025/5/22 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【消息中间件】AMQPRabbitMQ工作模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

AMQP 相關概念介紹

AMQP 一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。

AMQP是一個二進制協議,擁有一些現代化特點:多信道、協商式,異步,安全,擴平臺,中立,高效。

RabbitMQ是AMQP協議的Erlang的實現。

概念說明
連接Connection一個網絡連接,比如TCP/IP套接字連接。
會話Session端點之間的命名對話。在一個會話上下文中,保證“恰好傳遞一次”。
信道Channel多路復用連接中的一條獨立的雙向數據流通道。為會話提供物理傳輸介質。
客戶端ClientAMQP連接或者會話的發起者。AMQP是非對稱的,客戶端生產和消費消息,服務器存儲和路由這些消息。
服務節點Broker消息中間件的服務節點;一般情況下可以將一個RabbitMQ Broker看作一臺RabbitMQ 服務器。
端點AMQP對話的任意一方。一個AMQP連接包括兩個端點(一個是客戶端,一個是服務器)。
消費者Consumer一個從消息隊列里請求消息的客戶端程序。
生產者Producer一個向交換機發布消息的客戶端應用程序。

RabbitMQ運轉流程

在入門案例中:

  • 生產者發送消息
  • 生產者創建連接(Connection),開啟一個信道(Channel),連接到RabbitMQ Broker;
  • 聲明隊列并設置屬性;如是否排它,是否持久化,是否自動刪除;
  • 將路由鍵(空字符串)與隊列綁定起來;
  • 發送消息至RabbitMQ Broker;
  • 關閉信道;
  • 關閉連接;
  • 消費者接收消息
  • 消費者創建連接(Connection),開啟一個信道(Channel),連接到RabbitMQ Broker
  • 向Broker 請求消費相應隊列中的消息,設置相應的回調函數;
  • 等待Broker回應閉關投遞響應隊列中的消息,消費者接收消息;
  • 確認(ack,自動確認)接收到的消息;
  • RabbitMQ從隊列中刪除相應已經被確認的消息;
  • 關閉信道;
  • 關閉連接;

生產者流轉過程說明

  • 客戶端與代理服務器Broker建立連接。會調用newConnection() 方法,這個方法會進一步封裝Protocol Header 0-9-1 的報文頭發送給Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 協議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個命令的交互。
  • 客戶端調用connection.createChannel方法。此方法開啟信道,其包裝的channel.open命令發送給Broker,等待channel.basicPublish方法,對應的AMQP命令為Basic.Publish,這個命令包含了content Header 和content Body()。content Header 包含了消息體的屬性,例如:投遞模式,優先級等,content Body 包含了消息體本身。
  • 客戶端發送完消息需要關閉資源時,涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。

消費者流轉過程說明

  • 消費者客戶端與代理服務器Broker建立連接。會調用newConnection() 方法,這個方法會進一步封裝Protocol Header 0-9-1 的報文頭發送給Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 協議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個命令的交互。
  • 消費者客戶端調用connection.createChannel方法。和生產者客戶端一樣,協議涉及Channel . Open/Open-Ok命令。
  • 在真正消費之前,消費者客戶端需要向Broker 發送Basic.Consume 命令(即調用channel.basicConsume 方法〉將Channel 置為接收模式,之后Broker 回執Basic . Consume - Ok 以告訴消費者客戶端準備好消費消息。
  • Broker 向消費者客戶端推送(Push) 消息,即Basic.Deliver 命令,這個命令和Basic.Publish 命令一樣會攜帶Content Header 和Content Body。
  • 消費者接收到消息并正確消費之后,向Broker 發送確認,即Basic.Ack 命令。
  • 客戶端發送完消息需要關閉資源時,涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。

RabbitMQ工作模式

Work queues工作隊列模式


Work Queues與入門程序的簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。

應用場景:對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

代碼

Work Queues與入門程序的簡單模式的代碼是幾乎一樣的;可以完全復制,并復制多一個消費者進行多個消費者同時消費消息的測試。

1)生產者

public class Producer {static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {//創建連接Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 30; i++) {// 發送信息String message = "你好;小兔子!work模式--" + i;/*** 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:消息其它屬性* 參數4:消息內容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已發送消息:" + message);}// 關閉資源channel.close();connection.close();}}

2)消費者1

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并處理一個消息channel.basicQos(1);//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者1-接收到的消息為:" + new String(body, "utf-8"));Thread.sleep(1000);//確認消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}}

3)消費者2

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并處理一個消息channel.basicQos(1);//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者2-接收到的消息為:" + new String(body, "utf-8"));Thread.sleep(1000);//確認消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}}

啟動兩個消費者,然后再啟動生產者發送消息;到IDEA的兩個消費者對應的控制臺查看是競爭性的接收到消息。

在一個隊列中如果有多個消費者,那么消費者之間對于同一個消息的關系是競爭的關系。

訂閱模式類型

訂閱模式示例圖:

在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機的隊列
    • Direct:定向,把消息交給符合指定routing key 的隊列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

Publish/Subscribe發布與訂閱模式

發布訂閱模式:
1、每個消費者監聽自己的隊列。
2、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息

代碼

1)生產者

/*** 發布與訂閱使用的交換機類型為:fanout*/public class Producer {//交換機名稱static final String FANOUT_EXCHAGE = "fanout_exchange";//隊列名稱static final String FANOUT_QUEUE_1 = "fanout_queue_1";//隊列名稱static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//創建連接Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();/*** 聲明交換機* 參數1:交換機名稱* 參數2:交換機類型,fanout、topic、direct、headers*/channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//隊列綁定交換機channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");for (int i = 1; i <= 10; i++) {// 發送信息String message = "你好;小兔子!發布訂閱模式--" + i;/*** 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:消息其它屬性* 參數4:消息內容*/channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());System.out.println("已發送消息:" + message);}// 關閉資源channel.close();connection.close();}}

2)消費者1

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//隊列綁定交換機channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者1-接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);}}

3)消費者2

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//隊列綁定交換機channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者2-接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);}}

交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。

發布訂閱模式與工作隊列模式的區別

1、工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機。

2、發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)。

3、發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機 。

Routing路由模式

路由模式特點:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)
  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息


圖解:

  • P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。
  • X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列
  • C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
  • C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
代碼

在編碼上與 Publish/Subscribe發布與訂閱模式 的區別是交換機的類型為:Direct,還有隊列綁定交換機的時候需要指定routing key。

1)生產者

/*** 路由模式的交換機類型為:direct*/public class Producer {//交換機名稱static final String DIRECT_EXCHAGE = "direct_exchange";//隊列名稱static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";//隊列名稱static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";public static void main(String[] args) throws Exception {//創建連接Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();/*** 聲明交換機* 參數1:交換機名稱* 參數2:交換機類型,fanout、topic、direct、headers*/channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);//隊列綁定交換機channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");// 發送信息String message = "新增了商品。路由模式;routing key 為 insert " ;/*** 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:消息其它屬性* 參數4:消息內容*/channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());System.out.println("已發送消息:" + message);// 發送信息message = "修改了商品。路由模式;routing key 為 update" ;/*** 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:消息其它屬性* 參數4:消息內容*/channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());System.out.println("已發送消息:" + message);// 關閉資源channel.close();connection.close();}}

2)消費者1

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);//隊列綁定交換機channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者1-接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);}}

3)消費者2

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);//隊列綁定交換機channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者2-接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);}}

Routing模式要求隊列在綁定交換機時要指定routing key,消息會轉發到符合routing key的隊列。

Topics通配符模式

4.5.1. 模式說明

Topic類型與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

舉例:

cs.#:能夠匹配cs.insert.abc 或者 cs.insert

cs.*:只能匹配cs.insert


圖解:

  • 紅色Queue:綁定的是usa.# ,因此凡是以 usa.開頭的routing key 都會被匹配到
  • 黃色Queue:綁定的是#.news ,因此凡是以 .news結尾的 routing key 都會被匹配
代碼

1)生產者

使用topic類型的Exchange,發送消息的routing key有3種: item.insert、item.update、item.delete:

/*** 通配符Topic的交換機類型為:topic*/public class Producer {//交換機名稱static final String TOPIC_EXCHAGE = "topic_exchange";//隊列名稱static final String TOPIC_QUEUE_1 = "topic_queue_1";//隊列名稱static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {//創建連接Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();/*** 聲明交換機* 參數1:交換機名稱* 參數2:交換機類型,fanout、topic、topic、headers*/channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 發送信息String message = "新增了商品。Topic模式;routing key 為 item.insert " ;channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());System.out.println("已發送消息:" + message);// 發送信息message = "修改了商品。Topic模式;routing key 為 item.update" ;channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());System.out.println("已發送消息:" + message);// 發送信息message = "刪除了商品。Topic模式;routing key 為 item.delete" ;channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());System.out.println("已發送消息:" + message);// 關閉資源channel.close();connection.close();}}

2)消費者1

接收兩種類型的消息:更新商品和刪除商品

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);//隊列綁定交換機channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者1-接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);}}

3)消費者2

接收所有類型的消息:新增商品,更新商品和刪除商品。

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創建頻道Channel channel = connection.createChannel();//聲明交換機channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 聲明(創建)隊列/*** 參數1:隊列名稱* 參數2:是否定義持久化隊列* 參數3:是否獨占本次連接* 參數4:是否在不使用的時候自動刪除隊列* 參數5:隊列其它參數*/channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);//隊列綁定交換機channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");//創建消費者;并設置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標簽,在channel.basicConsume時候可以指定* envelope 消息包的內容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標志(收到消息失敗后是否需要重新發送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機System.out.println("交換機為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費者2-接收到的消息為:" + new String(body, "utf-8"));}};//監聽消息/*** 參數1:隊列名稱* 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了,mq接收到回復會刪除消息,設置為false則需要手動確認* 參數3:消息接收到后回調*/channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);}}

Topic主題模式可以實現 Publish/Subscribe發布與訂閱模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的時候可以使用通配符,顯得更加靈活。

模式總結

RabbitMQ工作模式:

  • 簡單模式 HelloWorld
    一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)

  • 工作隊列模式 Work Queue
    一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)

  • 發布訂閱模式 Publish/subscribe
    需要設置類型為fanout的交換機,并且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列

  • 路由模式 Routing
    需要設置類型為direct的交換機,交換機和隊列進行綁定,并且指定routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列

  • 通配符模式 Topic
    需要設置類型為topic的交換機,交換機和隊列進行綁定,并且指定通配符方式的routing key,當發送消息到交換機后,交換機會根據routing key將消息發送到對應的隊列

總結

以上是生活随笔為你收集整理的【消息中间件】AMQPRabbitMQ工作模式的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。