日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

1_RabbitMQ初入门入门Hello消费者+生产者

發(fā)布時間:2024/7/19 编程问答 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 1_RabbitMQ初入门入门Hello消费者+生产者 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

文章目錄

  • 1_RabbitMQ初入門
    • 1.RabbitMQ的介紹
      • 1.工作原理&發(fā)送/接收消息的流程
      • 2. 為什么要使用消息隊列?
      • 3.使用了消息隊列有什么缺點?
    • 2.安裝RabbitMQ
    • 3.入門程序Hello_消費者&生產(chǎn)者_
      • 1.導(dǎo)入依賴
      • 2.生產(chǎn)者
        • 1.設(shè)置連接信息
        • 2.獲取connection(連接),channel(信道)
        • 3.channel(信道)綁定隊列&實現(xiàn)消費方法
        • 4.接收消息
        • 完整代碼:
      • 3.消費者
        • 1.設(shè)置連接信息
        • 2.獲取connection(連接),channel(信道)
        • 3.channel(信道)綁定隊列
        • 4.發(fā)送消息
        • 完整代碼:

1_RabbitMQ初入門

1.RabbitMQ的介紹

1.工作原理&發(fā)送/接收消息的流程

  • Broker:消息隊列服務(wù)進程,此進程包括兩個部分:Exchange和Queue。

  • Exchange:消息隊列交換機,按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個隊列,對消息進行過慮。

  • Queue:消息隊列,存儲消息的隊列,消息到達隊列并轉(zhuǎn)發(fā)給指定的消費方。

  • Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。

  • Consumer:消息消費者,即消費方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。

    消息發(fā)布接收流程:

-----發(fā)送消息----- 1、生產(chǎn)者和Broker建立TCP連接。 2、生產(chǎn)者和Broker建立通道。 3、生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進行轉(zhuǎn)發(fā)。 4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊列)----接收消息----- 1、消費者和Broker建立TCP連接 2、消費者和Broker建立通道 3、消費者監(jiān)聽指定的Queue(隊列) 4、當有消息到達Queue時Broker默認將消息推送給消費者。 5、消費者接收到消息。

2. 為什么要使用消息隊列?

解耦,異步,削峰

3.使用了消息隊列有什么缺點?

分析:一個使用了MQ的項目,如果連這個問題都沒有考慮過,就把MQ引進去了,那就給自己的項目帶來了風險。我們引入一個技術(shù),要對這個技術(shù)的弊端有充分的認識,才能做好防御。

系統(tǒng)的可用性降低:如果消息隊列掛了,那么系統(tǒng)也會受到影響

系統(tǒng)的復(fù)雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重復(fù)消費,如何保證消息可靠傳輸。因此,需要考慮的東更多,系統(tǒng)的復(fù)雜性增大。

2.安裝RabbitMQ

詳情請參考此鏈接

3.入門程序Hello_消費者&生產(chǎn)者_

1.導(dǎo)入依賴

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId> </dependency><!-- 這個是日志包的依賴 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId> </dependency>

2.生產(chǎn)者

1.設(shè)置連接信息

通過ConnectionFactory獲取連接信息

//通過連接工廠創(chuàng)建新的連接和mq建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬機,一個mq服務(wù)可以設(shè)置多個虛擬機,每個虛擬機就相當于一個獨立的mq connectionFactory.setVirtualHost("/");

2.獲取connection(連接),channel(信道)

Connection connection = null; Channel channel = null; //建立新連接 connection = connectionFactory.newConnection(); //創(chuàng)建會話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成 channel = connection.createChannel();

3.channel(信道)綁定隊列&實現(xiàn)消費方法

//隊列 private static final String QUEUE = "helloworld";//監(jiān)聽隊列 //聲明隊列,如果隊列在mq 中沒有則要創(chuàng)建 //參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, M /*** 參數(shù)明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關(guān)閉隊列則自動刪除,如果將此參數(shù)設(shè)置true可用于臨時* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實現(xiàn)臨時隊列(隊列不* 5、arguments 參數(shù),可以設(shè)置一個隊列的擴展參數(shù),比如:可設(shè)置存活時間*/ channel.queueDeclare(QUEUE,true,false,false,null);//實現(xiàn)消費方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調(diào)用* @param consumerTag 消費者標簽,用來標識消費者的,在監(jiān)聽隊列時設(shè)置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.B//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);} };

4.接收消息

//監(jiān)聽隊列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細:* 1、queue 隊列名稱* 2、autoAck 自動回復(fù),當消費者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會自動回復(fù)mq,如果設(shè)置為false要通過編程實現(xiàn)回復(fù)* 3、callback,消費方法,當消費者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);

結(jié)果截圖:

完整代碼:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer01 {//隊列private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {//通過連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機,一個mq服務(wù)可以設(shè)置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();//監(jiān)聽隊列//聲明隊列,如果隊列在mq 中沒有則要創(chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關(guān)閉隊列則自動刪除,如果將此參數(shù)設(shè)置true可用于臨時隊列的創(chuàng)建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實現(xiàn)臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數(shù),可以設(shè)置一個隊列的擴展參數(shù),比如:可設(shè)置存活時間*/channel.queueDeclare(QUEUE,true,false,false,null);//實現(xiàn)消費方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當接收到消息后此方法將被調(diào)用* @param consumerTag 消費者標簽,用來標識消費者的,在監(jiān)聽隊列時設(shè)置channel.basicConsume* @param envelope 信封,通過envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機String exchange = envelope.getExchange();//消息id,mq在channel中用來標識消息的id,可用于確認消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽隊列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細:* 1、queue 隊列名稱* 2、autoAck 自動回復(fù),當消費者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會自動回復(fù)mq,如果設(shè)置為false要通過編程實現(xiàn)回復(fù)* 3、callback,消費方法,當消費者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);} }

3.消費者

1.設(shè)置連接信息

通過ConnectionFactory獲取連接信息

//通過連接工廠創(chuàng)建新的連接和mq建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬機,一個mq服務(wù)可以設(shè)置多個虛擬機,每個虛擬機就相當于一個獨立的mq connectionFactory.setVirtualHost("/");

2.獲取connection(連接),channel(信道)

Connection connection = null; Channel channel = null; //建立新連接 connection = connectionFactory.newConnection(); //創(chuàng)建會話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成 channel = connection.createChannel();

3.channel(信道)綁定隊列

//隊列 private static final String QUEUE = "helloworld";//聲明隊列,如果隊列在mq 中沒有則要創(chuàng)建 //參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /*** 參數(shù)明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關(guān)閉隊列則自動刪除,如果將此參數(shù)設(shè)置true可用于臨時隊列的創(chuàng)建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實現(xiàn)臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數(shù),可以設(shè)置一個隊列的擴展參數(shù),比如:可設(shè)置存活時間*/ channel.queueDeclare(QUEUE,true,false,false,null);

4.發(fā)送消息

//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] bod /*** 參數(shù)明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設(shè)置為"")* 2、routingKey,路由key,交換機根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊列,如果使用默認交換機,routingKey設(shè)置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內(nèi)容*/ //消息內(nèi)容 String message = "hello world 黑馬程序員"; channel.basicPublish("",QUEUE,null,message.getBytes()); System.out.println("send to mq "+message);

結(jié)果截圖:

完整代碼:

package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer01 {//隊列private static final String QUEUE = "helloworld";public static void main(String[] args) {//通過連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機,一個mq服務(wù)可以設(shè)置多個虛擬機,每個虛擬機就相當于一個獨立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創(chuàng)建會話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊列,如果隊列在mq 中沒有則要創(chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細* 1、queue 隊列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊列還在* 3、exclusive 是否獨占連接,隊列只允許在該連接中訪問,如果connection連接關(guān)閉隊列則自動刪除,如果將此參數(shù)設(shè)置true可用于臨時隊列的創(chuàng)建* 4、autoDelete 自動刪除,隊列不再使用時是否自動刪除此隊列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實現(xiàn)臨時隊列(隊列不用了就自動刪除)* 5、arguments 參數(shù),可以設(shè)置一個隊列的擴展參數(shù),比如:可設(shè)置存活時間*/channel.queueDeclare(QUEUE,true,false,false,null);//發(fā)送消息//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數(shù)明細:* 1、exchange,交換機,如果不指定將使用mq的默認交換機(設(shè)置為"")* 2、routingKey,路由key,交換機根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊列,如果使用默認交換機,routingKey設(shè)置為隊列的名稱* 3、props,消息的屬性* 4、body,消息內(nèi)容*///消息內(nèi)容String message = "hello world 黑馬程序員";channel.basicPublish("",QUEUE,null,message.getBytes());System.out.println("send to mq "+message);} catch (Exception e) {e.printStackTrace();} finally {//關(guān)閉連接//先關(guān)閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }

總結(jié)

以上是生活随笔為你收集整理的1_RabbitMQ初入门入门Hello消费者+生产者的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。