RabbitMQ 学习一 了解+点对点模式
消息隊列
1、了解
RabbitMQ:
RabbitMQ是使用Erlang語言開發(fā)的開源消息隊列系統(tǒng),基于AMQP協(xié)議來實現(xiàn)。AMQP的主要特征是面向消息、隊列、路由(包括點對點和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi)對數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
消息中間件 用于消息的接收 基于AMPQ協(xié)議,具有高度的一致性,安全性,跨平臺性等。
生產(chǎn)者,RabbitMQ,消費者:
生產(chǎn)者將消息放在消息中間件的交換機(jī)中,交換機(jī)對應(yīng)著消息隊列,消費者從消息隊列中取出生產(chǎn)者所存放的消息。實現(xiàn)消息的分發(fā)。消費者與消息隊列綁定。
點對點的模式中是將消息放在默認(rèn)的交換機(jī)中。
2、安裝
盡量不要安裝在windows中,安裝在虛擬機(jī)中,可以直接下載安裝包,也可以使用docker拉取。
docker拉取:先查詢是否有該鏡像
docker search rabbitmq:management
有則可以拉取
docker pull rabbitmq:management
然后創(chuàng)建容器并發(fā)布
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
命令中的【RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin】是web管理平臺的用戶名和密碼
【 -p 15672:15672】 是控制平臺docker映射到系統(tǒng)的對應(yīng)端口
【 -p 5672:5672】 是應(yīng)用程序的訪問端口
http://自己的虛擬機(jī)ip:15672
3、使用說明:
首先要新建一個虛擬主機(jī),然后給虛擬主機(jī)綁定用戶,通過生產(chǎn)者發(fā)送消息時,要連接到server,連接到server中對應(yīng)的某一個虛機(jī)主機(jī),通過具體的用戶名密碼,緊接著才可以把消息發(fā)布給交換機(jī)或者消息隊列中,進(jìn)而消費者才能去消息隊列中消費消息(消費者也需要連接到rabbitMQ中的server,以及它對應(yīng)的虛擬主機(jī)),當(dāng)生產(chǎn)者發(fā)送完消息后,它就可以進(jìn)行別的工作了,它與消費者是完全解耦的,消費者只需要去消息隊列中查是否有對應(yīng)的消息即可,當(dāng)需要再次發(fā)送消息時,是需要重新建立連接即可。
虛擬主機(jī)就類似于數(shù)據(jù)庫中的庫。
學(xué)習(xí)時,生產(chǎn)者可以使用Test環(huán)境進(jìn)行測試,而消費不行,因為消費者需要一直監(jiān)聽隊列中的信息,若使用Test,那么或許當(dāng)消費崗剛消費了這個消息時,線程控制權(quán)已經(jīng)失去了,那么它便沒有可能去對body做處理。
所以要放在main函數(shù)中。
點對點模式“HelloWorld”
引入依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
需要一個消息提供者,一個消息消費者和RabbitMQ
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
/**
* 點對點 只能有一個消費者 業(yè)務(wù)場景 用戶注冊進(jìn)行短信驗證時,短信驗證部分可以交由消費這邊的系統(tǒng)去完成
* @param args
* @throws IOException
* @throws TimeoutException
*/
//消費消息的代碼
public static void main(String[] args) throws IOException, TimeoutException {
//獲取虛擬主機(jī)的連接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.235.130");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello",false,false,false,null);
channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } } ); } }
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
//生產(chǎn)消息的代碼
@Test
public void testSendMessage() throws IOException, TimeoutException {
//創(chuàng)建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//設(shè)置連接rabbitmq的主機(jī)
connectionFactory.setHost("192.168.235.130");
//設(shè)置端口號
connectionFactory.setPort(5672);
//設(shè)置連接哪個虛擬主機(jī)
connectionFactory.setVirtualHost("/ems");
//設(shè)置訪問虛擬主機(jī)的用戶名密碼
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
//獲取連接對象
Connection connection = connectionFactory.newConnection();
//通過連接獲取連接中的通道對象
Channel channel = connection.createChannel();
//通道綁定對應(yīng)的消息隊列
//參數(shù)一 隊列的名稱,不存在會自動創(chuàng)建
//參數(shù)二 用來定義隊列的特性 是否持久化 true持久化(不管是否重啟Rabbitmq,隊列都會存在,當(dāng)RabbitMQ關(guān)閉時他會把隊列存在磁盤中)
//參數(shù)三 是否獨占隊列 true獨占 false可以被其他連接使用
//參數(shù)四 是否在消費完成后自動刪除隊列 true自動刪除 false 不會自動刪除
//參數(shù)五 附加參數(shù)
channel.queueDeclare("hello",false,false,false,null);
//發(fā)布消息
//參數(shù)一 交換機(jī)名稱
//參數(shù)二 隊列名稱
//參數(shù)三 傳遞消息的額外設(shè)置
//參數(shù)四 傳遞的消息
channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
//關(guān)閉通道和連接
channel.close();
connection.close();
}
}
為了方便后續(xù)學(xué)習(xí),可以將共同的代碼抽取出來
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Objects;
public class RabbitMQConnection {
//工廠是重量級的 每次都創(chuàng)建耗費資源 所以做一個靜態(tài)的成員 只創(chuàng)建一次
private static ConnectionFactory connectionFactory;
//具體賦值是在類加載時執(zhí)行,只執(zhí)行一次
static{
connectionFactory = new ConnectionFactory();
//屬性的賦值一旦確定下來很少變 所以也放在static中
connectionFactory.setHost("192.168.235.130");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/ems");
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
}
//定義提供連接對象的方法
public static Connection getConnection(){
try {
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
//關(guān)閉通道和關(guān)閉連接的方法
public static void closeConnectionAndChanel(Channel channel,Connection connection){
try {
if(Objects.nonNull(channel))
{channel.close();}
if (Objects.nonNull(connection))
{connection.close();}
} catch (Exception e) {
e.printStackTrace();
}
}
}
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 学习一 了解+点对点模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: phpmyadmin出现1862错误怎么
- 下一篇: yii1.1怎么查询数据库