消息队列rabitMq
rabbitmq
MQ全稱為Message Queue,?消息隊(duì)列(MQ)是一種應(yīng)用程序對應(yīng)用程序的通信方法。應(yīng)用程序通過讀寫出入隊(duì)列的消息(針對應(yīng)用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進(jìn)行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠(yuǎn)程過程調(diào)用的技術(shù)。排隊(duì)指的是應(yīng)用程序通過 隊(duì)列來通信。隊(duì)列的使用除去了接收和發(fā)送應(yīng)用程序同時執(zhí)行的要求。
使用場景
在項(xiàng)目中,將一些無需即時返回且耗時的操作提取出來,進(jìn)行了異步處理,而這種異步處理的方式大大的節(jié)省了服務(wù)器的請求響應(yīng)時間,從而提高了系統(tǒng)的吞吐量。
含義
RabbitMQ是一個在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。
客戶端
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws.IOException{ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }消費(fèi)者端
public class RabbitMQRecv { public static void main(String avg[]) throws.IOException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}} }
幾個概念
Exchange:交換機(jī),決定了消息路由規(guī)則; Queue:消息隊(duì)列; Channel:進(jìn)行消息讀寫的通道; Bind:綁定了Queue和Exchange,意即為符合什么樣路由規(guī)則的消息,將會放置入哪一個消息隊(duì)列;RabbitMQ的結(jié)構(gòu)圖如下:
?
?
?
幾個概念說明: Broker:簡單來說就是消息隊(duì)列服務(wù)器實(shí)體。Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個隊(duì)列。
Queue:消息隊(duì)列載體,每個消息都會被投入到一個或多個隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個broker里可以開設(shè)多個vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務(wù)。 消息隊(duì)列的使用過程大概如下: (1)客戶端連接到消息隊(duì)列服務(wù)器,打開一個channel。
(2)客戶端聲明一個exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange。 exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個或多個隊(duì)列里。 exchange也有幾個類型,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會投遞到隊(duì)列。對key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個消息進(jìn)來時,投遞到與該交換機(jī)綁定的所有隊(duì)列。 RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊(duì)列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
什么是MQ?
?????? MQ全稱為Message Queue,?消息隊(duì)列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。MQ是消費(fèi)-生產(chǎn)者模型的一個典型的代表,一端往消息隊(duì)列中不斷寫入消息,而另一端則可以讀取隊(duì)列中的消息。
????? RabbitMQ是MQ的一種。下面詳細(xì)介紹一下RabbitMQ的基本概念。
1、隊(duì)列、生產(chǎn)者、消費(fèi)者
?? ?? 隊(duì)列是RabbitMQ的內(nèi)部對象,用于存儲消息。生產(chǎn)者(下圖中的P)生產(chǎn)消息并投遞到隊(duì)列中,消費(fèi)者(下圖中的C)可以從隊(duì)列中獲取消息并消費(fèi)。
?????
????? 多個消費(fèi)者可以訂閱同一個隊(duì)列,這時隊(duì)列中的消息會被平均分?jǐn)偨o多個消費(fèi)者進(jìn)行處理,而不是每個消費(fèi)者都收到所有的消息并處理。
?????
2、Exchange、Binding
????? 剛才我們看到生產(chǎn)者將消息投遞到隊(duì)列中,實(shí)際上這在RabbitMQ中這種事情永遠(yuǎn)都不會發(fā)生。實(shí)際的情況是,生產(chǎn)者將消息發(fā)送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關(guān)聯(lián)起來。
?????3、Exchange Type、Bingding key、routing key
????? 在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
????? 生產(chǎn)者在將消息發(fā)送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規(guī)則,生產(chǎn)者就可以在發(fā)送消息給Exchange時,通過指定routing key來決定消息流向哪里。
????? RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
????? fanout:把所有發(fā)送到該Exchange的消息投遞到所有與它綁定的隊(duì)列中。
????? direct:把消息投遞到那些binding key與routing key完全匹配的隊(duì)列中。
????? topic:將消息路由到binding key與routing key模式匹配的隊(duì)列中。
????? 附上一張RabbitMQ的結(jié)構(gòu)圖:
?????
????
最后來具體解析一下幾個問題:
1、可以自動創(chuàng)建隊(duì)列,也可以手動創(chuàng)建隊(duì)列,如果自動創(chuàng)建隊(duì)列,那么是誰負(fù)責(zé)創(chuàng)建隊(duì)列呢?是生產(chǎn)者?還是消費(fèi)者??
????? 如果隊(duì)列不存在,當(dāng)然消費(fèi)者不會收到任何的消息。但是如果隊(duì)列不存在,那么生產(chǎn)者發(fā)送的消息就會丟失。所以,為了數(shù)據(jù)不丟失,消費(fèi)者和生產(chǎn)者都可以創(chuàng)建隊(duì)列。那么如果創(chuàng)建一個已經(jīng)存在的隊(duì)列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是隊(duì)列屬性并不會改變。
??????隊(duì)列對于負(fù)載均衡的處理是完美的。對于多個消費(fèi)者來說,RabbitMQ使用輪詢的方式均衡的發(fā)送給不同的消費(fèi)者。
2、RabbitMQ的消息確認(rèn)機(jī)制
????? 默認(rèn)情況下,如果消息已經(jīng)被某個消費(fèi)者正確的接收到了,那么該消息就會被從隊(duì)列中移除。當(dāng)然也可以讓同一個消息發(fā)送到很多的消費(fèi)者。
?? ?? 如果一個隊(duì)列沒有消費(fèi)者,那么,如果這個隊(duì)列有數(shù)據(jù)到達(dá),那么這個數(shù)據(jù)會被緩存,不會被丟棄。當(dāng)有消費(fèi)者時,這個數(shù)據(jù)會被立即發(fā)送到這個消費(fèi)者,這個數(shù)據(jù)被消費(fèi)者正確收到時,這個數(shù)據(jù)就被從隊(duì)列中刪除。
???? 那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認(rèn),ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數(shù)據(jù)沒有被ack,那么:
???? RabbitMQ Server會把這個信息發(fā)送到下一個消費(fèi)者。
?? ? 如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發(fā)送數(shù)據(jù)給它,因?yàn)镾erver認(rèn)為這個消費(fèi)者處理能力有限。
??? 而且ack的機(jī)制可以起到限流的作用(Benefitto throttling):在消費(fèi)者處理完成數(shù)據(jù)后發(fā)送ack,甚至在額外的延時后發(fā)送ack,將有效的均衡消費(fèi)者的負(fù)載。
?
?二:代碼示例
2.1:首先引入rabbitMQ jar包
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>2.2:創(chuàng)建消費(fèi)者Producer
/*** 消息生成者*/ public class Producer {public final static String QUEUE_NAME="rabbitMQ.test";public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ相關(guān)信息factory.setHost("localhost");//factory.setUsername("lp");//factory.setPassword("");// factory.setPort(2088);//創(chuàng)建一個新的連接Connection connection = factory.newConnection();//創(chuàng)建一個通道Channel channel = connection.createChannel();// 聲明一個隊(duì)列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello RabbitMQ";//發(fā)送消息到隊(duì)列中channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("Producer Send +'" + message + "'");//關(guān)閉通道和連接 channel.close();connection.close();} }注1:queueDeclare第一個參數(shù)表示隊(duì)列名稱、第二個參數(shù)為是否持久化(true表示是,隊(duì)列將在服務(wù)器重啟時生存)、第三個參數(shù)為是否是獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動刪除)、第四個參數(shù)為當(dāng)所有消費(fèi)者客戶端連接斷開時是否自動刪除隊(duì)列、第五個參數(shù)為隊(duì)列的其他參數(shù)
注2:basicPublish第一個參數(shù)為交換機(jī)名稱、第二個參數(shù)為隊(duì)列映射的路由key、第三個參數(shù)為消息的其他屬性、第四個參數(shù)為發(fā)送信息的主體
2.3:創(chuàng)建消費(fèi)者
?
public class Customer {private final static String QUEUE_NAME = "rabbitMQ.test";public static void main(String[] args) throws IOException, TimeoutException {// 創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ地址factory.setHost("localhost");//創(chuàng)建一個新的連接Connection connection = factory.newConnection();//創(chuàng)建一個通道Channel channel = connection.createChannel();//聲明要關(guān)注的隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, true, null);System.out.println("Customer Waiting Received messages");//DefaultConsumer類實(shí)現(xiàn)了Consumer接口,通過傳入一個頻道,// 告訴服務(wù)器我們需要那個頻道的消息,如果頻道中有消息,就會執(zhí)行回調(diào)函數(shù)handleDeliveryConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println("Customer Received '" + message + "'");}};//自動回復(fù)隊(duì)列應(yīng)答 -- RabbitMQ中的消息確認(rèn)機(jī)制channel.basicConsume(QUEUE_NAME, true, consumer);}前面代碼我們可以看出和生成者一樣的,后面的是獲取生產(chǎn)者發(fā)送的信息,其中envelope主要存放生產(chǎn)者相關(guān)信息(比如交換機(jī)、路由key等)body是消息實(shí)體。
2.4:運(yùn)行結(jié)果
生產(chǎn)者:
?
消費(fèi)者:
?三:實(shí)現(xiàn)任務(wù)分發(fā)
工作隊(duì)列
一個隊(duì)列的優(yōu)點(diǎn)就是很容易處理并行化的工作能力,但是如果我們積累了大量的工作,我們就需要更多的工作者來處理,這里就要采用分布機(jī)制了。
我們新創(chuàng)建一個生產(chǎn)者NewTask
public class NewTask {private static final String TASK_QUEUE_NAME="task_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("localhost");Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//分發(fā)信息for (int i=0;i<10;i++){String message="Hello RabbitMQ"+i;channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println("NewTask send '"+message+"'");}channel.close();connection.close();} }然后創(chuàng)建2個工作者Work1和Work2代碼一樣
public class Work1 {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {final ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("Worker1 Waiting for messages");//每次從隊(duì)列獲取的數(shù)量channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Worker1 Received '" + message + "'");try {throw new Exception();//doWork(message);}catch (Exception e){channel.abort();}finally {System.out.println("Worker1 Done");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck=false;//消息消費(fèi)完成確認(rèn)channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);}private static void doWork(String task) {try {Thread.sleep(1000); // 暫停1秒鐘} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}} }注:channel.basicQos(1);保證一次只分發(fā)一個 。autoAck是否自動回復(fù),如果為true的話,每次生產(chǎn)者只要發(fā)送信息就會從內(nèi)存中刪除,那么如果消費(fèi)者程序異常退出,那么就無法獲取數(shù)據(jù),我們當(dāng)然是不希望出現(xiàn)這樣的情況,所以才去手動回復(fù),每當(dāng)消費(fèi)者收到并處理信息然后在通知生成者。最后從隊(duì)列中刪除這條信息。如果消費(fèi)者異常退出,如果還有其他消費(fèi)者,那么就會把隊(duì)列中的消息發(fā)送給其他消費(fèi)者,如果沒有,等消費(fèi)者啟動時候再次發(fā)送。
?
?
參考:https://www.cnblogs.com/LipeiNet/p/5977028.html
轉(zhuǎn)載于:https://www.cnblogs.com/UncleWang001/p/9734651.html
總結(jié)
以上是生活随笔為你收集整理的消息队列rabitMq的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: T1330最少步数(#Ⅱ- 8)(广度优
- 下一篇: 转MQTT SERVER 性能测试报告