日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

消息队列rabitMq

發布時間:2025/4/16 53 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息队列rabitMq 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

rabbitmq

MQ全稱為Message Queue,?消息隊列(MQ)是一種應用程序應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信而不是通過直接調用彼此來通信,直接調用通常是用于諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求

使用場景

在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。

含義

RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。

客戶端

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws.IOException{ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }

消費者端

public class RabbitMQRecv {   public static void main(String avg[]) throws.IOException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");
Connection connection
= factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.
out.println(" [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}} }

幾個概念

Exchange:交換機,決定了消息路由規則; Queue:消息隊列; Channel:進行消息讀寫的通道; Bind:綁定了Queue和Exchange,意即為符合什么樣路由規則的消息,將會放置入哪一個消息隊列;

RabbitMQ的結構圖如下:

?

?

?

幾個概念說明: Broker:簡單來說就是消息隊列服務器實體。
  Exchange:消息交換機,它指定消息按什么規則,路由到哪個隊列。
  Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
  Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  vhost:虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離。
  producer:消息生產者,就是投遞消息的程序。
  consumer:消息消費者,就是接受消息的程序。
  channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務。 消息隊列的使用過程大概如下: (1)客戶端連接到消息隊列服務器,打開一個channel。
  (2)客戶端聲明一個exchange,并設置相關屬性。
  (3)客戶端聲明一個queue,并設置相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間建立好綁定關系。
  (5)客戶端投遞消息到exchange。 exchange接收到消息后,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列里。 exchange也有幾個類型,完全根據key進行投遞的叫做Direct交換機,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。 RabbitMQ支持消息的持久化,也就是數據寫在磁盤上,為了數據安全考慮,我想大多數用戶都會選擇持久化。消息隊列持久化包括3個部分:
  (1)exchange持久化,在聲明時指定durable => 1
  (2)queue持久化,在聲明時指定durable => 1
  (3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。

什么是MQ?

?????? MQ全稱為Message Queue,?消息隊列(MQ)是一種應用程序對應用程序的通信方法。MQ是消費-生產者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。

????? RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。

1、隊列、生產者、消費者

?? ?? 隊列是RabbitMQ的內部對象,用于存儲消息。生產者(下圖中的P)生產消息并投遞到隊列中,消費者(下圖中的C)可以從隊列中獲取消息并消費。

?????

????? 多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。

?????

2、Exchange、Binding

????? 剛才我們看到生產者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關聯起來。

?????

3、Exchange Type、Bingding key、routing key

????? 在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。

????? 生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,生產者就可以在發送消息給Exchange時,通過指定routing key來決定消息流向哪里。

????? RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

????? fanout:把所有發送到該Exchange的消息投遞到所有與它綁定的隊列中。

????? direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。

????? topic:將消息路由到binding key與routing key模式匹配的隊列中。

????? 附上一張RabbitMQ的結構圖:

?????

????

最后來具體解析一下幾個問題:

1、可以自動創建隊列,也可以手動創建隊列,如果自動創建隊列,那么是誰負責創建隊列呢?是生產者?還是消費者??

????? 如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那么生產者發送的消息就會丟失。所以,為了數據不丟失,消費者和生產者都可以創建隊列。那么如果創建一個已經存在的隊列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是隊列屬性并不會改變。

??????隊列對于負載均衡的處理是完美的。對于多個消費者來說,RabbitMQ使用輪詢的方式均衡的發送給不同的消費者。

2、RabbitMQ的消息確認機制

????? 默認情況下,如果消息已經被某個消費者正確的接收到了,那么該消息就會被從隊列中移除。當然也可以讓同一個消息發送到很多的消費者。

?? ?? 如果一個隊列沒有消費者,那么,如果這個隊列有數據到達,那么這個數據會被緩存,不會被丟棄。當有消費者時,這個數據會被立即發送到這個消費者,這個數據被消費者正確收到時,這個數據就被從隊列中刪除。

???? 那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:

???? RabbitMQ Server會把這個信息發送到下一個消費者。

?? ? 如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發送數據給它,因為Server認為這個消費者處理能力有限。

??? 而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的均衡消費者的負載。

?

?二:代碼示例

2.1:首先引入rabbitMQ jar包

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>

2.2:創建消費者Producer

/*** 消息生成者*/ public class Producer {public final static String QUEUE_NAME="rabbitMQ.test";public static void main(String[] args) throws IOException, TimeoutException {//創建連接工廠ConnectionFactory factory = new ConnectionFactory();//設置RabbitMQ相關信息factory.setHost("localhost");//factory.setUsername("lp");//factory.setPassword("");// factory.setPort(2088);//創建一個新的連接Connection connection = factory.newConnection();//創建一個通道Channel channel = connection.createChannel();// 聲明一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello RabbitMQ";//發送消息到隊列中channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("Producer Send +'" + message + "'");//關閉通道和連接 channel.close();connection.close();} }

注1:queueDeclare第一個參數表示隊列名稱、第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數

注2:basicPublish第一個參數為交換機名稱、第二個參數為隊列映射的路由key、第三個參數為消息的其他屬性、第四個參數為發送信息的主體

2.3:創建消費者

?

public class Customer {private final static String QUEUE_NAME = "rabbitMQ.test";public static void main(String[] args) throws IOException, TimeoutException {// 創建連接工廠ConnectionFactory factory = new ConnectionFactory();//設置RabbitMQ地址factory.setHost("localhost");//創建一個新的連接Connection connection = factory.newConnection();//創建一個通道Channel channel = connection.createChannel();//聲明要關注的隊列channel.queueDeclare(QUEUE_NAME, false, false, true, null);System.out.println("Customer Waiting Received messages");//DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,// 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDeliveryConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println("Customer Received '" + message + "'");}};//自動回復隊列應答 -- RabbitMQ中的消息確認機制channel.basicConsume(QUEUE_NAME, true, consumer);}

前面代碼我們可以看出和生成者一樣的,后面的是獲取生產者發送的信息,其中envelope主要存放生產者相關信息(比如交換機、路由key等)body是消息實體。

2.4:運行結果

生產者:

?

消費者:

?三:實現任務分發

工作隊列

一個隊列的優點就是很容易處理并行化的工作能力,但是如果我們積累了大量的工作,我們就需要更多的工作者來處理,這里就要采用分布機制了。

我們新創建一個生產者NewTask

public class NewTask {private static final String TASK_QUEUE_NAME="task_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("localhost");Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//分發信息for (int i=0;i<10;i++){String message="Hello RabbitMQ"+i;channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println("NewTask send '"+message+"'");}channel.close();connection.close();} }

然后創建2個工作者Work1和Work2代碼一樣

public class Work1 {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {final ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("Worker1 Waiting for messages");//每次從隊列獲取的數量channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Worker1 Received '" + message + "'");try {throw new Exception();//doWork(message);}catch (Exception e){channel.abort();}finally {System.out.println("Worker1 Done");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck=false;//消息消費完成確認channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);}private static void doWork(String task) {try {Thread.sleep(1000); // 暫停1秒鐘} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}} }

注:channel.basicQos(1);保證一次只分發一個 。autoAck是否自動回復,如果為true的話,每次生產者只要發送信息就會從內存中刪除,那么如果消費者程序異常退出,那么就無法獲取數據,我們當然是不希望出現這樣的情況,所以才去手動回復,每當消費者收到并處理信息然后在通知生成者。最后從隊列中刪除這條信息。如果消費者異常退出,如果還有其他消費者,那么就會把隊列中的消息發送給其他消費者,如果沒有,等消費者啟動時候再次發送。

?

?

參考:https://www.cnblogs.com/LipeiNet/p/5977028.html

轉載于:https://www.cnblogs.com/UncleWang001/p/9734651.html

總結

以上是生活随笔為你收集整理的消息队列rabitMq的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。