1_RabbitMQ初入门入门Hello消费者+生产者
文章目錄
- 1_RabbitMQ初入門(mén)
- 1.RabbitMQ的介紹
- 1.工作原理&發(fā)送/接收消息的流程
- 2. 為什么要使用消息隊(duì)列?
- 3.使用了消息隊(duì)列有什么缺點(diǎn)?
- 2.安裝RabbitMQ
- 3.入門(mén)程序Hello_消費(fèi)者&生產(chǎn)者_(dá)
- 1.導(dǎo)入依賴
- 2.生產(chǎn)者
- 1.設(shè)置連接信息
- 2.獲取connection(連接),channel(信道)
- 3.channel(信道)綁定隊(duì)列&實(shí)現(xiàn)消費(fèi)方法
- 4.接收消息
- 完整代碼:
- 3.消費(fèi)者
- 1.設(shè)置連接信息
- 2.獲取connection(連接),channel(信道)
- 3.channel(信道)綁定隊(duì)列
- 4.發(fā)送消息
- 完整代碼:
1_RabbitMQ初入門(mén)
1.RabbitMQ的介紹
1.工作原理&發(fā)送/接收消息的流程
-
Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue。
-
Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過(guò)慮。
-
Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。
-
Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。
-
Consumer:消息消費(fèi)者,即消費(fèi)方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
消息發(fā)布接收流程:
2. 為什么要使用消息隊(duì)列?
解耦,異步,削峰
3.使用了消息隊(duì)列有什么缺點(diǎn)?
分析:一個(gè)使用了MQ的項(xiàng)目,如果連這個(gè)問(wèn)題都沒(méi)有考慮過(guò),就把MQ引進(jìn)去了,那就給自己的項(xiàng)目帶來(lái)了風(fēng)險(xiǎn)。我們引入一個(gè)技術(shù),要對(duì)這個(gè)技術(shù)的弊端有充分的認(rèn)識(shí),才能做好防御。
系統(tǒng)的可用性降低:如果消息隊(duì)列掛了,那么系統(tǒng)也會(huì)受到影響
系統(tǒng)的復(fù)雜性增加:要多考慮很多方面的問(wèn)題,比如一致性問(wèn)題、如何保證消息不被重復(fù)消費(fèi),如何保證消息可靠傳輸。因此,需要考慮的東更多,系統(tǒng)的復(fù)雜性增大。
2.安裝RabbitMQ
詳情請(qǐng)參考此鏈接
3.入門(mén)程序Hello_消費(fèi)者&生產(chǎn)者_(dá)
1.導(dǎo)入依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId> </dependency><!-- 這個(gè)是日志包的依賴 --> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId> </dependency>2.生產(chǎn)者
1.設(shè)置連接信息
通過(guò)ConnectionFactory獲取連接信息
//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mq connectionFactory.setVirtualHost("/");2.獲取connection(連接),channel(信道)
Connection connection = null; Channel channel = null; //建立新連接 connection = connectionFactory.newConnection(); //創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成 channel = connection.createChannel();3.channel(信道)綁定隊(duì)列&實(shí)現(xiàn)消費(fèi)方法
//隊(duì)列 private static final String QUEUE = "helloworld";//監(jiān)聽(tīng)隊(duì)列 //聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建 //參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, M /*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/ channel.queueDeclare(QUEUE,true,false,false,null);//實(shí)現(xiàn)消費(fèi)方法 DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.B//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);} };4.接收消息
//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);結(jié)果截圖:
完整代碼:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer01 {//隊(duì)列private static final String QUEUE = "helloworld";public static void main(String[] args) throws IOException, TimeoutException {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");//建立新連接Connection connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成Channel channel = connection.createChannel();//監(jiān)聽(tīng)隊(duì)列//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE,true,false,false,null);//實(shí)現(xiàn)消費(fèi)方法DefaultConsumer defaultConsumer = new DefaultConsumer(channel){/*** 當(dāng)接收到消息后此方法將被調(diào)用* @param consumerTag 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume* @param envelope 信封,通過(guò)envelope* @param properties 消息屬性* @param body 消息內(nèi)容* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交換機(jī)String exchange = envelope.getExchange();//消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收long deliveryTag = envelope.getDeliveryTag();//消息內(nèi)容String message= new String(body,"utf-8");System.out.println("receive message:"+message);}};//監(jiān)聽(tīng)隊(duì)列//參數(shù):String queue, boolean autoAck, Consumer callback/*** 參數(shù)明細(xì):* 1、queue 隊(duì)列名稱* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法*/channel.basicConsume(QUEUE,true,defaultConsumer);} }3.消費(fèi)者
1.設(shè)置連接信息
通過(guò)ConnectionFactory獲取連接信息
//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672);//端口 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mq connectionFactory.setVirtualHost("/");2.獲取connection(連接),channel(信道)
Connection connection = null; Channel channel = null; //建立新連接 connection = connectionFactory.newConnection(); //創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成 channel = connection.createChannel();3.channel(信道)綁定隊(duì)列
//隊(duì)列 private static final String QUEUE = "helloworld";//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建 //參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments /*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/ channel.queueDeclare(QUEUE,true,false,false,null);4.發(fā)送消息
//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] bod /*** 參數(shù)明細(xì):* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來(lái)將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱* 3、props,消息的屬性* 4、body,消息內(nèi)容*/ //消息內(nèi)容 String message = "hello world 黑馬程序員"; channel.basicPublish("",QUEUE,null,message.getBytes()); System.out.println("send to mq "+message);結(jié)果截圖:
完整代碼:
package com.xuecheng.test.rabbitmq;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer01 {//隊(duì)列private static final String QUEUE = "helloworld";public static void main(String[] args) {//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);//端口connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");//設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mqconnectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {//建立新連接connection = connectionFactory.newConnection();//創(chuàng)建會(huì)話通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成channel = connection.createChannel();//聲明隊(duì)列,如果隊(duì)列在mq 中沒(méi)有則要?jiǎng)?chuàng)建//參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments/*** 參數(shù)明細(xì)* 1、queue 隊(duì)列名稱* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間*/channel.queueDeclare(QUEUE,true,false,false,null);//發(fā)送消息//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body/*** 參數(shù)明細(xì):* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來(lái)將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱* 3、props,消息的屬性* 4、body,消息內(nèi)容*///消息內(nèi)容String message = "hello world 黑馬程序員";channel.basicPublish("",QUEUE,null,message.getBytes());System.out.println("send to mq "+message);} catch (Exception e) {e.printStackTrace();} finally {//關(guān)閉連接//先關(guān)閉通道try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}try {connection.close();} catch (IOException e) {e.printStackTrace();}}} }總結(jié)
以上是生活随笔為你收集整理的1_RabbitMQ初入门入门Hello消费者+生产者的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: C++学习——set与map
- 下一篇: 3.1_ 5_动态分区分配算法