日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ消息发送和接收

發布時間:2025/4/16 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ消息发送和接收 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.RabbitMQ的消息發送和接受機制

所有 MQ 產品從模型抽象上來說都是一樣的過程:

消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。

上面是MQ的基本抽象模型,但是不同的MQ產品有有者不同的機制,RabbitMQ實際基于AMQP協議的一個開源實現,因此RabbitMQ內部也是AMQP的基本概念。

RabbitMQ的內部接收如下:

1、Message
消息,消息是不具體的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
2、Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
3、Exchange
交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
4、Binding
綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
5、Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
6、Connection
網絡連接,比如一個TCP連接。
7、Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
8、Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
9、Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
10、Broker
表示消息隊列服務器實體。

2.AMQP中的消息路由

AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產者把消息發布到 Exchange 上,消息最終到達隊列并被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列

3.Exchange類型

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型

1、direct

消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。

2、fanout
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。

3、topic
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”。#匹配0個或多個單詞,“”匹配不多不少一個單詞。

4.Java發送和接收Queue的消息

4.1創建Maven工程01-rabbitmq-send-java添加Maven依賴

<dependencies> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version> </dependency> </dependencies>

4.2 編寫消息發送類
在01-rabbitmq-send-java項目中創建,com.xxxx.rabbitmq.queue.Send類

public class Send{public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象 ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.222.128");//設置RabbitMQ的主機IP factory.setPort(5672);//設置RabbitMQ的端口號 factory.setUsername("root");//設置訪問用戶名 factory.setPassword("root");//設置訪問密碼 Connection connection=null;//定義鏈接對象 Channel channel=null;//定義通道對象 connection=factory.newConnection();//實例化鏈接對象 channel=connection.createChannel();//實例化通道對象 String message ="Hello World!3"; //創建隊列 ,名字為myQueue channel.queueDeclare("myQueue", true, false, false, null); //發送消息到指定隊列 channel.basicPublish("","myQueue",null,message.getBytes("UTF-8")); System.out.println("消息發送成功: "+message); channel.close(); connection.close();} }

以運行Send類觀看管控臺的變化

4.3 創建Maven工程01-rabbitmq-receive-java添加Maven依賴

<dependencies> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version> </dependency> </dependencies>

4.4 編寫消息接收類
在01-rabbitmq-receive-java項目中創建,com.xxxx.rabbitmq.queue.Receive類

public class Receive {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服務器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明隊列channel.queueDeclare("myQueue", true, false, false, null);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息//參數1 隊列名稱//參數2 是否自動確認消息 true表示自動確認 false表示手動確認//參數3 為消息標簽 用來區分不同的消費者這里暫時為""// 參數4 消費者回調方法用于編寫處理消息的具體代碼(例如打印或將消息寫入數據庫)channel.basicConsume("myQueue", autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數據String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}}); // channel.close(); // conn.close();} }

注意:
1、Queue的消息只能被同一個消費者消費,如果沒有消費監聽隊列那么消息會存放到隊列中持久化保存,直到有消費者來消費這個消息,如果以有消費者監聽隊列則立即消費發送到隊列中的消息
2、Queue的消息可以保證每個消息都一定能被消費

5.Java綁定Exchange發送和接收消息

AMQP 協議中的核心思想就是生產者和消費者的解耦,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由 Exchange 來接收,然后 Exchange 按照特定的策略轉發到 Queue 進行存儲。Exchange 就類似于一個交換機,將各個消息分發到相應的隊列中。

在實際應用中我們只需要定義好 Exchange 的路由策略,而生產者則不需要關心消息會發送到哪個 Queue 或被哪些 Consumer 消費。在這種模式下生產者只面向 Exchange 發布消息,消費者只面向 Queue 消費消息,Exchange 定義了消息路由到 Queue 的規則,將各個層面的消息傳遞隔離開,使每一層只需要關心自己面向的下一層,降低了整體的耦合度。

5.1 Exchange的direct消息綁定

5.1.1 編寫direct消息發送類
在01-rabbitmq-send-java項目中創建,com.xxxx.rabbitmq.direct.Send類

public class Send {public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";String exchangeName="myExchange"; channel.queueDeclare("myQueueDirect", true, false, false, null);//指定Exchange的類型//參數1為 交換機名稱//參數2為交換機類型取值為 direct、queue、topic、headers//參數3 為是否為持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "direct", true);//發送消息到RabbitMQ//參數1 我們自定義的交換機名稱//參數2 自定義的RoutingKey值//參數3 設置消息的屬性,可以通過消息屬性設置消息是否是持久化的//參數4 具體要發送的消息信息channel.basicPublish(exchangeName,"myRoutingKey",null,message.getBytes("UTF-8"));System.out.println("消息發送成功: "+message); // channel.close(); // connection.close();} }

注意:使用direct消息模式時必須要指定RoutingKey(路由鍵),將指定的消息綁定到指定的路由鍵上

5.1.2 編寫direct消息接收類
在01-rabbitmq-Receive-java項目中創建,com.xxxx.rabbitmq.direct.Receive類

public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3111"; channel.queueDeclare("myQueueDirect", true, false, false, null);String exchangeName="myExchange";//指定Exchange的類型//參數1為 交換機名稱//參數2為交換機類型取值為 direct、queue、topic、headers//參數3 為是否為持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "direct", true);channel.queueDeclare("myQueueDirect", true, false, false, null);channel.basicConsume("myQueueDirect ", autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數據String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}}); // channel.close(); // conn.close();}}

注意:
1、使用Exchange的direct模式時接收者的RoutingKey必須要與發送時的RoutingKey完全一致否則無法獲取消息
2、接收消息時隊列名也必須要發送消息時的完全一致

5.2 Exchange的fanout消息綁定

5.2.1 編寫fanout消息發送類
在01-rabbitmq-send-java項目中創建,com.xxxx.rabbitmq.fanout.Send類

public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3111";String exchangeName="myExchangeFanout";//指定Exchange的類型//參數1為 交換機名稱//參數2為交換機類型取值為 direct、queue、topic、headers//參數3 為是否為持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "fanout", true);//接收消息//參數1 隊列名稱//參數2 是否自動確認消息 true表示自動確認 false表示手動確認//參數3 為消息標簽 用來區分不同的消費者這列暫時為""// 參數4 消費者回調方法用于編寫處理消息的具體代碼(例如打印或將消息寫入數據庫)System.out.println(queueName);channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});}

注意:
fanout模式的消息需要將一個消息同時綁定到多個隊列中因此這里不能創建并指定某個隊列

5.2.2 編寫fanout消息接收類

在01-rabbitmq-receive-java項目中創建,com.xxxx.rabbitmq.fanout.Receive類

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服務器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchangeFanout";channel.exchangeDeclare(exchangeName, "fanout", true);//聲明隊列String queueName = channel.queueDeclare().getQueue();String routingKey = "";//綁定隊列,通過鍵 hola 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息//參數1 隊列名稱//參數2 是否自動確認消息 true表示自動確認 false表示手動確認//參數3 為消息標簽 用來區分不同的消費者這列暫時為""// 參數4 消費者回調方法用于編寫處理消息的具體代碼(例如打印或將消息寫入數據庫)System.out.println(queueName);channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});}

注意:
1、使用fanout模式獲取消息時不需要綁定特定的隊列名稱,只需使用channel.queueDeclare().getQueue();獲取一個隨機的隊列名稱,然后綁定到指定的Exchange即可獲取消息。
2、這種模式中可以同時啟動多個接收者只要都綁定到同一個Exchang即可讓所有接收者同時接收同一個消息是一種廣播的消息機制

5.3 Exchange的topic消息綁定

5.3.1編寫topic消息發送類

在01-rabbitmq-send-java項目中創建,com.xxxx.rabbitmq.topic.Send類

public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!";String exchangeName="myExchangeTopic";//指定Exchange的類型//參數1為 交換機名稱//參數2為交換機類型取值為 direct、queue、topic、headers//參數3 為是否為持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "topic", true);//發送消息到RabbitMQ//參數1 我們自定義的交換機名稱//參數2 自定義的RoutingKey值//參數3 設置消息的屬性,可以通過消息屬性設置消息是否是持久化的//參數4 具體要發送的消息信息channel.basicPublish(exchangeName,"test.myRoutingKey",null,message.getBytes("UTF-8"));System.out.println("消息發送成功: "+message);channel.close();connection.close(); }

注意:
1、在topic模式中必須要指定Routingkey,并且可以同時指定多層的RoutingKey,每個層次之間使用 點分隔即可 例如test.myRoutingKey

5.3.2編寫topic的消息接收類

在01-rabbitmq-receive-java項目中創建,com.xxxx.rabbitmq.topic.Receive類

public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服務器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchangeTopic";channel.exchangeDeclare(exchangeName, "topic", true);//聲明隊列String queueName = channel.queueDeclare().getQueue();String routingKey = "test.#";//綁定隊列,通過鍵 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息//參數1 隊列名稱//參數2 是否自動確認消息 true表示自動確認 false表示手動確認//參數3 為消息標簽 用來區分不同的消費者這列暫時為""// 參數4 消費者回調方法用于編寫處理消息的具體代碼(例如打印或將消息寫入數據庫)channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數據String bodyStr = new String(body, "UTF-8");System.out.println("test.#----"+bodyStr);}}); }

注意:
1、Topic模式的消息接收時必須要指定RoutingKey并且可以使用# 和 *來做統配符號,#表示通配任意一個單詞 *表示通配任意多個單詞,例如消費者的RoutingKey為test.#或#.myRoutingKey都可以獲取RoutingKey為test.myRoutingKey發送者發送的消息

5.4 事務消息

事務消息與數據庫的事務類似,只是MQ中的消息是要保證消息是否會全部發送成功,防止丟失消息的一種策略。

RabbitMQ有兩種方式來解決這個問題:

  • 通過AMQP提供的事務機制實現;
  • 使用發送者確認模式實現;
  • 5.4.1 事務使用

    事務的實現主要是對信道(Channel)的設置,主要的方法有三個:

  • channel.txSelect()聲明啟動事務模式;
  • channel.txCommint()提交事務;
  • channel.txRollback()回滾事務;
  • 5.4.2 編寫消息發送類

    在01-rabbitmq-send-java項目中創建,com.xxxx.rabbitmq.transaction.Send類

    public class Send{public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.171.143");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!"; String exchangeName="myExchangeTransaction"; //指定Exchange的類型 //參數1為 交換機名稱 //參數2為交換機類型取值為 direct、fanout、topic、headers //參數3 為是否為持久化消息 true表示持久化消息 false表示非持久化 channel.exchangeDeclare(exchangeName, "direct", true);// 聲明事務 channel.txSelect();//發送消息到RabbitMQ//參數1 我們自定義的交換機名稱//參數2 自定義的RoutingKey值//參數3 設置消息的屬性,可以通過消息屬性設置消息是否是持久化的//參數4 具體要發送的消息信息channel.basicPublish(exchangeName,"myRoutingKeyTransaction",null,message.getBytes("UTF-8")); // 提交事務channel.txCommit();System.out.println("消息發送成功: "+message);channel.close();connection.close();} }

    5.1.3編寫消息接收類

    在01-rabbitmq-receive-java項目中創建,com.xxxx.rabbitmq.transaction.Receive類

    public class Receive{public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.171.143");//建立到代理服務器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchangeTransaction";channel.exchangeDeclare(exchangeName, "direct", true);//聲明隊列String queueName = channel.queueDeclare().getQueue();String routingKey = "myRoutingKeyTransaction";//綁定隊列,通過鍵 hola 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息 //參數1 隊列名稱//參數2 是否自動確認消息 true表示自動確認 false表示手動確認//參數3 為消息標簽 用來區分不同的消費者這列暫時為""// 參數4 消費者回調方法用于編寫處理消息的具體代碼(例如打印或將消息寫入數據庫) channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數據String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}}); channel.close();conn.close();} }

    5.5 消息的發送者確認模式

    Confirm發送方確認模式使用和事務類似,也是通過設置Channel進行發送方確認的,最終達到確保所有的消息全部發送成功

    Confirm的三種實現方式:

    方式一:channel.waitForConfirms()普通發送方確認模式;

    public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";//創建隊列 ,名字為myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 開啟發送方確認模式channel.confirmSelect();long time=System.currentTimeMillis();//發送消息到指定隊列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.waitForConfirms();System.out.println(System.currentTimeMillis()-time);System.out.println("消息發送成功: "+message);channel.close();connection.close();} }

    方式二:channel.waitForConfirmsOrDie()批量確認模式;

    public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";//創建隊列 ,名字為myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 開啟發送方確認模式channel.confirmSelect();long time=System.currentTimeMillis();//發送消息到指定隊列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.waitForConfirmsOrDie();System.out.println(System.currentTimeMillis()-time);System.out.println("消息發送成功: "+message);channel.close();connection.close();} }

    方式三:channel.addConfirmListener()異步監聽發送方確認模式

    public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";//創建隊列 ,名字為myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 開啟發送方確認模式channel.confirmSelect();long time=System.currentTimeMillis();//發送消息到指定隊列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("未確認消息,標識:" + deliveryTag+"----"+multiple);}public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("已確認消息,標識:"+deliveryTag+" ---多個消息:"+multiple);}});System.out.println(System.currentTimeMillis()-time);System.out.println("消息發送成功: "+message);channel.close();connection.close();} }

    5.6 消息的消費者確認模式

    為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,RabbitMQ會等待消費者顯式發回ack信號后才從內存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。
    在Consumer中Confirm模式中分為手動確認和自動確認。

    手動確認主要并使用以下方法:

    basicAck(): 用于肯定確認,multiple參數用于多個消息確認。

    basicRecover():是路由不成功的消息可以使用recovery重新發送到隊列中。

    basicReject():是接收端告訴服務器這個消息我拒絕接收,不處理,可以設置是否放回到隊列中還是丟掉,而且只能一次拒絕一個消息,官網中有明確說明不能批量拒絕消息,為解決批量拒絕消息才有了basicNack。

    basicNack():可以一次拒絕N條消息,客戶端可以設置basicNack方法的multiple參數為true。

    在01-rabbitmq-send-java項目中創建,
    com.xxxx.rabbitmq.ack.Send類

    public class Send {public static void main(String[] args) throws IOException, TimeoutException {//創建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設置RabbitMQ的主機IPfactory.setPort(5672);//設置RabbitMQ的端口號factory.setUsername("root");//設置訪問用戶名factory.setPassword("root");//設置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3111222";String exchangeName="myExchange";channel.queueDeclare("myQueueDirect", true, false, false, null);//指定Exchange的類型//參數1為 交換機名稱//參數2為交換機類型取值為 direct、queue、topic、headers//參數3 為是否為持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "direct", true);//發送消息到RabbitMQ//參數1 我們自定義的交換機名稱//參數2 自定義的RoutingKey值//參數3 設置消息的屬性,可以通過消息屬性設置消息是否是持久化的//參數4 具體要發送的消息信息channel.basicPublish(exchangeName,"myRoutingKeyDirect",null,message.getBytes("UTF-8"));System.out.println("消息發送成功: "+message); // channel.close(); // connection.close();} }

    在01-rabbitmq-receive-java項目中創建,
    com.xxxx.rabbitmq.ack.Receive類

    public class Receive {public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服務器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchange";String queueName = "myQueueDirect";channel.queueDeclare(queueName, true, false, false, null);channel.exchangeDeclare(exchangeName, "direct", true);//聲明隊列String routingKey = "myRoutingKeyDirect";//綁定隊列,通過鍵 hola 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = false;String consumerTag = "";//接收消息//參數1 隊列名稱//參數2 是否自動確認消息 true表示自動確認 false表示手動確認//參數3 為消息標簽 用來區分不同的消費者這列暫時為""// 參數4 消費者回調方法用于編寫處理消息的具體代碼(例如打印或將消息寫入數據庫)System.out.println(queueName); //開啟事務channel.txSelect();channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數據String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);//獲取當前消息的序列號long deliveryTag = envelope.getDeliveryTag();//確認消息//參數 1 用于確定確認那條消息//參數 2 false 表示確認這條消息, true表示確認小于這個值的所有消息channel.basicAck(deliveryTag, false);}}); //開始提交事務 channel.txCommit() //回滾事務 // channel.txRollback(); // channel.close(); // conn.close();} }

    注意:
    1、如果開啟了事務手動提交以后再開始事務,如果事務執行了回滾操作那么即使手動確認了消息那么消息也不會從隊列中移除,除非使用事務執行提交以后才會移除。

    總結

    以上是生活随笔為你收集整理的RabbitMQ消息发送和接收的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。