日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 >

【消息中间件】AMQPRabbitMQ工作模式

發(fā)布時(shí)間:2025/5/22 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【消息中间件】AMQPRabbitMQ工作模式 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

AMQP 相關(guān)概念介紹

AMQP 一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。

AMQP是一個(gè)二進(jìn)制協(xié)議,擁有一些現(xiàn)代化特點(diǎn):多信道、協(xié)商式,異步,安全,擴(kuò)平臺(tái),中立,高效。

RabbitMQ是AMQP協(xié)議的Erlang的實(shí)現(xiàn)。

概念說明
連接Connection一個(gè)網(wǎng)絡(luò)連接,比如TCP/IP套接字連接。
會(huì)話Session端點(diǎn)之間的命名對(duì)話。在一個(gè)會(huì)話上下文中,保證“恰好傳遞一次”。
信道Channel多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。為會(huì)話提供物理傳輸介質(zhì)。
客戶端ClientAMQP連接或者會(huì)話的發(fā)起者。AMQP是非對(duì)稱的,客戶端生產(chǎn)和消費(fèi)消息,服務(wù)器存儲(chǔ)和路由這些消息。
服務(wù)節(jié)點(diǎn)Broker消息中間件的服務(wù)節(jié)點(diǎn);一般情況下可以將一個(gè)RabbitMQ Broker看作一臺(tái)RabbitMQ 服務(wù)器。
端點(diǎn)AMQP對(duì)話的任意一方。一個(gè)AMQP連接包括兩個(gè)端點(diǎn)(一個(gè)是客戶端,一個(gè)是服務(wù)器)。
消費(fèi)者Consumer一個(gè)從消息隊(duì)列里請(qǐng)求消息的客戶端程序。
生產(chǎn)者Producer一個(gè)向交換機(jī)發(fā)布消息的客戶端應(yīng)用程序。

RabbitMQ運(yùn)轉(zhuǎn)流程

在入門案例中:

  • 生產(chǎn)者發(fā)送消息
  • 生產(chǎn)者創(chuàng)建連接(Connection),開啟一個(gè)信道(Channel),連接到RabbitMQ Broker;
  • 聲明隊(duì)列并設(shè)置屬性;如是否排它,是否持久化,是否自動(dòng)刪除;
  • 將路由鍵(空字符串)與隊(duì)列綁定起來;
  • 發(fā)送消息至RabbitMQ Broker;
  • 關(guān)閉信道;
  • 關(guān)閉連接;
  • 消費(fèi)者接收消息
  • 消費(fèi)者創(chuàng)建連接(Connection),開啟一個(gè)信道(Channel),連接到RabbitMQ Broker
  • 向Broker 請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,設(shè)置相應(yīng)的回調(diào)函數(shù);
  • 等待Broker回應(yīng)閉關(guān)投遞響應(yīng)隊(duì)列中的消息,消費(fèi)者接收消息;
  • 確認(rèn)(ack,自動(dòng)確認(rèn))接收到的消息;
  • RabbitMQ從隊(duì)列中刪除相應(yīng)已經(jīng)被確認(rèn)的消息;
  • 關(guān)閉信道;
  • 關(guān)閉連接;

生產(chǎn)者流轉(zhuǎn)過程說明

  • 客戶端與代理服務(wù)器Broker建立連接。會(huì)調(diào)用newConnection() 方法,這個(gè)方法會(huì)進(jìn)一步封裝Protocol Header 0-9-1 的報(bào)文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 協(xié)議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個(gè)命令的交互。
  • 客戶端調(diào)用connection.createChannel方法。此方法開啟信道,其包裝的channel.open命令發(fā)送給Broker,等待channel.basicPublish方法,對(duì)應(yīng)的AMQP命令為Basic.Publish,這個(gè)命令包含了content Header 和content Body()。content Header 包含了消息體的屬性,例如:投遞模式,優(yōu)先級(jí)等,content Body 包含了消息體本身。
  • 客戶端發(fā)送完消息需要關(guān)閉資源時(shí),涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。

消費(fèi)者流轉(zhuǎn)過程說明

  • 消費(fèi)者客戶端與代理服務(wù)器Broker建立連接。會(huì)調(diào)用newConnection() 方法,這個(gè)方法會(huì)進(jìn)一步封裝Protocol Header 0-9-1 的報(bào)文頭發(fā)送給Broker ,以此通知Broker 本次交互采用的是AMQPO-9-1 協(xié)議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 這6 個(gè)命令的交互。
  • 消費(fèi)者客戶端調(diào)用connection.createChannel方法。和生產(chǎn)者客戶端一樣,協(xié)議涉及Channel . Open/Open-Ok命令。
  • 在真正消費(fèi)之前,消費(fèi)者客戶端需要向Broker 發(fā)送Basic.Consume 命令(即調(diào)用channel.basicConsume 方法〉將Channel 置為接收模式,之后Broker 回執(zhí)Basic . Consume - Ok 以告訴消費(fèi)者客戶端準(zhǔn)備好消費(fèi)消息。
  • Broker 向消費(fèi)者客戶端推送(Push) 消息,即Basic.Deliver 命令,這個(gè)命令和Basic.Publish 命令一樣會(huì)攜帶Content Header 和Content Body。
  • 消費(fèi)者接收到消息并正確消費(fèi)之后,向Broker 發(fā)送確認(rèn),即Basic.Ack 命令。
  • 客戶端發(fā)送完消息需要關(guān)閉資源時(shí),涉及到Channel.Close和Channl.Close-Ok 與Connetion.Close和Connection.Close-Ok的命令交互。

RabbitMQ工作模式

Work queues工作隊(duì)列模式


Work Queues與入門程序的簡單模式相比,多了一個(gè)或一些消費(fèi)端,多個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。

應(yīng)用場景:對(duì)于 任務(wù)過重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。

代碼

Work Queues與入門程序的簡單模式的代碼是幾乎一樣的;可以完全復(fù)制,并復(fù)制多一個(gè)消費(fèi)者進(jìn)行多個(gè)消費(fèi)者同時(shí)消費(fèi)消息的測試。

1)生產(chǎn)者

public class Producer {static final String QUEUE_NAME = "work_queue";public static void main(String[] args) throws Exception {//創(chuàng)建連接Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(QUEUE_NAME, true, false, false, null);for (int i = 1; i <= 30; i++) {// 發(fā)送信息String message = "你好;小兔子!work模式--" + i;/*** 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage* 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱* 參數(shù)3:消息其它屬性* 參數(shù)4:消息內(nèi)容*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("已發(fā)送消息:" + message);}// 關(guān)閉資源channel.close();connection.close();}}

2)消費(fèi)者1

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并處理一個(gè)消息channel.basicQos(1);//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));Thread.sleep(1000);//確認(rèn)消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}}

3)消費(fèi)者2

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);//一次只能接收并處理一個(gè)消息channel.basicQos(1);//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));Thread.sleep(1000);//確認(rèn)消息channel.basicAck(envelope.getDeliveryTag(), false);} catch (InterruptedException e) {e.printStackTrace();}}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.QUEUE_NAME, false, consumer);}}

啟動(dòng)兩個(gè)消費(fèi)者,然后再啟動(dòng)生產(chǎn)者發(fā)送消息;到IDEA的兩個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)查看是競爭性的接收到消息。

在一個(gè)隊(duì)列中如果有多個(gè)消費(fèi)者,那么消費(fèi)者之間對(duì)于同一個(gè)消息的關(guān)系是競爭的關(guān)系。

訂閱模式類型

訂閱模式示例圖:

在訂閱模型中,多了一個(gè)exchange角色,而且過程略有變化:

  • P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
  • C:消費(fèi)者,消息的接受者,會(huì)一直等待消息到來。
  • Queue:消息隊(duì)列,接收消息、緩存消息。
  • Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見以下3種類型:
    • Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
    • Direct:定向,把消息交給符合指定routing key 的隊(duì)列
    • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列

Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒有任何隊(duì)列與Exchange綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

Publish/Subscribe發(fā)布與訂閱模式

發(fā)布訂閱模式:
1、每個(gè)消費(fèi)者監(jiān)聽自己的隊(duì)列。
2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收到消息

代碼

1)生產(chǎn)者

/*** 發(fā)布與訂閱使用的交換機(jī)類型為:fanout*/public class Producer {//交換機(jī)名稱static final String FANOUT_EXCHAGE = "fanout_exchange";//隊(duì)列名稱static final String FANOUT_QUEUE_1 = "fanout_queue_1";//隊(duì)列名稱static final String FANOUT_QUEUE_2 = "fanout_queue_2";public static void main(String[] args) throws Exception {//創(chuàng)建連接Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();/*** 聲明交換機(jī)* 參數(shù)1:交換機(jī)名稱* 參數(shù)2:交換機(jī)類型,fanout、topic、direct、headers*/channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");for (int i = 1; i <= 10; i++) {// 發(fā)送信息String message = "你好;小兔子!發(fā)布訂閱模式--" + i;/*** 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage* 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱* 參數(shù)3:消息其它屬性* 參數(shù)4:消息內(nèi)容*/channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);}// 關(guān)閉資源channel.close();connection.close();}}

2)消費(fèi)者1

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();//聲明交換機(jī)channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);}}

3)消費(fèi)者2

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();//聲明交換機(jī)channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);}}

交換機(jī)需要與隊(duì)列進(jìn)行綁定,綁定之后;一個(gè)消息可以被多個(gè)消費(fèi)者都收到。

發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別

1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。

2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。

3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。

Routing路由模式

路由模式特點(diǎn):

  • 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
  • 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的 RoutingKey。
  • Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的 Routing key完全一致,才會(huì)接收到消息


圖解:

  • P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
  • X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
  • C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
  • C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
代碼

在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key。

1)生產(chǎn)者

/*** 路由模式的交換機(jī)類型為:direct*/public class Producer {//交換機(jī)名稱static final String DIRECT_EXCHAGE = "direct_exchange";//隊(duì)列名稱static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";//隊(duì)列名稱static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";public static void main(String[] args) throws Exception {//創(chuàng)建連接Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();/*** 聲明交換機(jī)* 參數(shù)1:交換機(jī)名稱* 參數(shù)2:交換機(jī)類型,fanout、topic、direct、headers*/channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");// 發(fā)送信息String message = "新增了商品。路由模式;routing key 為 insert " ;/*** 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage* 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱* 參數(shù)3:消息其它屬性* 參數(shù)4:消息內(nèi)容*/channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 發(fā)送信息message = "修改了商品。路由模式;routing key 為 update" ;/*** 參數(shù)1:交換機(jī)名稱,如果沒有指定則使用默認(rèn)Default Exchage* 參數(shù)2:路由key,簡單模式可以傳遞隊(duì)列名稱* 參數(shù)3:消息其它屬性* 參數(shù)4:消息內(nèi)容*/channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 關(guān)閉資源channel.close();connection.close();}}

2)消費(fèi)者1

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();//聲明交換機(jī)channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);}}

3)消費(fèi)者2

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();//聲明交換機(jī)channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);}}

Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定routing key,消息會(huì)轉(zhuǎn)發(fā)到符合routing key的隊(duì)列。

Topics通配符模式

4.5.1. 模式說明

Topic類型與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!

Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert

通配符規(guī)則:

#:匹配一個(gè)或多個(gè)詞

*:匹配不多不少恰好1個(gè)詞

舉例:

cs.#:能夠匹配cs.insert.abc 或者 cs.insert

cs.*:只能匹配cs.insert


圖解:

  • 紅色Queue:綁定的是usa.# ,因此凡是以 usa.開頭的routing key 都會(huì)被匹配到
  • 黃色Queue:綁定的是#.news ,因此凡是以 .news結(jié)尾的 routing key 都會(huì)被匹配
代碼

1)生產(chǎn)者

使用topic類型的Exchange,發(fā)送消息的routing key有3種: item.insert、item.update、item.delete:

/*** 通配符Topic的交換機(jī)類型為:topic*/public class Producer {//交換機(jī)名稱static final String TOPIC_EXCHAGE = "topic_exchange";//隊(duì)列名稱static final String TOPIC_QUEUE_1 = "topic_queue_1";//隊(duì)列名稱static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {//創(chuàng)建連接Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();/*** 聲明交換機(jī)* 參數(shù)1:交換機(jī)名稱* 參數(shù)2:交換機(jī)類型,fanout、topic、topic、headers*/channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 發(fā)送信息String message = "新增了商品。Topic模式;routing key 為 item.insert " ;channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 發(fā)送信息message = "修改了商品。Topic模式;routing key 為 item.update" ;channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 發(fā)送信息message = "刪除了商品。Topic模式;routing key 為 item.delete" ;channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());System.out.println("已發(fā)送消息:" + message);// 關(guān)閉資源channel.close();connection.close();}}

2)消費(fèi)者1

接收兩種類型的消息:更新商品和刪除商品

public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();//聲明交換機(jī)channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);}}

3)消費(fèi)者2

接收所有類型的消息:新增商品,更新商品和刪除商品。

public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection = ConnectionUtil.getConnection();// 創(chuàng)建頻道Channel channel = connection.createChannel();//聲明交換機(jī)channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);// 聲明(創(chuàng)建)隊(duì)列/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否定義持久化隊(duì)列* 參數(shù)3:是否獨(dú)占本次連接* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列* 參數(shù)5:隊(duì)列其它參數(shù)*/channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);//隊(duì)列綁定交換機(jī)channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");//創(chuàng)建消費(fèi)者;并設(shè)置消息處理DefaultConsumer consumer = new DefaultConsumer(channel){@Override/*** consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)* properties 屬性信息* body 消息*/public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//路由keySystem.out.println("路由key為:" + envelope.getRoutingKey());//交換機(jī)System.out.println("交換機(jī)為:" + envelope.getExchange());//消息idSystem.out.println("消息id為:" + envelope.getDeliveryTag());//收到的消息System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));}};//監(jiān)聽消息/*** 參數(shù)1:隊(duì)列名稱* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)* 參數(shù)3:消息接收到后回調(diào)*/channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);}}

Topic主題模式可以實(shí)現(xiàn) Publish/Subscribe發(fā)布與訂閱模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的時(shí)候可以使用通配符,顯得更加靈活。

模式總結(jié)

RabbitMQ工作模式:

  • 簡單模式 HelloWorld
    一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))

  • 工作隊(duì)列模式 Work Queue
    一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者(競爭關(guān)系),不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))

  • 發(fā)布訂閱模式 Publish/subscribe
    需要設(shè)置類型為fanout的交換機(jī),并且交換機(jī)和隊(duì)列進(jìn)行綁定,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)將消息發(fā)送到綁定的隊(duì)列

  • 路由模式 Routing
    需要設(shè)置類型為direct的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列

  • 通配符模式 Topic
    需要設(shè)置類型為topic的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定通配符方式的routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列

總結(jié)

以上是生活随笔為你收集整理的【消息中间件】AMQPRabbitMQ工作模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。