日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ(四):Exchange交换器--direct

發布時間:2024/9/30 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ(四):Exchange交换器--direct 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

內容翻譯自:RabbitMQ Tutorials Java版


RabbitMQ(一):Hello World程序

RabbitMQ(二):Work Queues、循環分發、消息確認、持久化、公平分發

RabbitMQ(三):Exchange交換器--fanout

RabbitMQ(四):Exchange交換器--direct

RabbitMQ(五):Exchange交換器--topic

RabbitMQ(六):回調隊列callback queue、關聯標識correlation id、實現簡單的RPC系統

RabbitMQ(七):常用方法說明 與 學習小結


Routing:

在上一篇博客中,我們創建了一個簡單的日志系統。我們可以將日志消息廣播給所有的接收者(消費者)。

在這個教程中,我們將為我們的日志系統添加一個功能:僅僅訂閱一部分消息。比如,我們可以直接將關鍵的錯誤類型日志消息保存到日志文件中,還可以同時將所有的日志消息打印到控制臺。


綁定(Bindings):

在之前的例子中,我們已經創建了綁定:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一個綁定是建立在一個隊列和一個路由器之間的關系,可以解讀為:該隊列對這個路由器中的消息感興趣。

綁定可以設置另外的參數:路由鍵routingKey。為了避免和void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)中的routingKey混淆,我們將這里的key稱為綁定鍵binding key,下面的代碼展示了如何使用綁定鍵來創建一個綁定關系:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

綁定鍵的含義取決于路由器的類型,我們之前使用的fanout類型路由器會忽略該值。


直接路由器 (Direct Exchange):

我們之前的日志系統會將所有消息廣播給所有消費者?,F在我們想根據日志的嚴重程度來過濾日志。比如,我們想要一個程序來將error日志寫到磁盤文件中,而不要將warning或info日志寫到磁盤中,以免浪費磁盤空間。

我們之前使用的fanout路由器缺少靈活性,它只是沒頭腦地廣播消息。所以,我們用direct路由器來替換它。direct路由器背后的路由算法很簡單:只有當消息的路由鍵routing key與隊列的綁定鍵binding key完全匹配時,該消息才會進入該隊列。

為了演示上面拗口的表述中的意思,考慮下面的設置:

上圖中,直接路由器X與兩個隊列綁定。第一個隊列以綁定鍵orange來綁定,第二個隊列以兩個綁定鍵black和green和路由器綁定。

按照這種設置,路由鍵為orange的消息在發布給路由器后,將會被路由到隊列Q1,路由鍵為black或者green的消息將會路由到隊列Q2。


多重綁定(Multiple bindings):

多個隊列以相同的綁定鍵binding key綁定到同一個Exchange上,是完全可以的。按照這種方式設置的話,直接路由器就會像fanout路由器一樣,將消息廣播給所有符合路由規則的隊列。一個路由鍵為black的消息將會發布到隊列Q1和Q2。


發布消息:

在這個教程中,我們使用direct路由器來代替上個教程中的fanout路由器。同時,我們為日志設置嚴重級別,并將此作為路由鍵。這樣,接收者(消費者)就可以選擇性地接收日志消息。

首先,創建一個路由器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接著,發送一個消息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

簡單起見,我們假設severity只能是info、warning 、error中的一種。


消息訂閱:

接收消息將會和之前的教程類似,只是我們會為每一個級別的消息來創建不同的綁定:

String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity); }

放在一塊:

生產者EmitLogDirect.java的完整代碼:

import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {//創建連接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和路由器的類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String severity = "info";String message = ".........i am msg.........";//發布消息channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");channel.close();connection.close();}}

消費者ReceiveLogsDirect.java的完整代碼如下:

import com.rabbitmq.client.*; import java.io.IOException;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);//聲明隊列String queueName = channel.queueDeclare().getQueue();//定義要監聽的級別String[] severities = {"info", "warning", "error"};//根據綁定鍵綁定for (String severity : severities) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }

現在可以進行測試了。首先,啟動一個消費者實例(ReceiveLogsDirect.java),然后將其中的要監聽的級別改為String[] severities = {"error"};,再啟動另一個消費者實例。此時,這兩個消費者都開始監聽了,一個監聽所有級別的日志消息,另一個監聽error日志消息。
然后,啟動生產者(EmitLogDirect.java),之后將String severity = "info";中的info,分別改為warning、error后運行。
這樣,就可以在控制臺看到如下輸出:

//生產者 [x] Sent 'warning':'.........i am msg.........' [x] Sent 'info':'.........i am msg.........' [x] Sent 'error':'.........i am msg.........' //消費者1[*] Waiting for messages. To exit press CTRL+C[x] Received 'info':'.........i am msg.........'[x] Received 'error':'.........i am msg.........'[x] Received 'warning':'.........i am msg.........' //消費者2[*] Waiting for messages. To exit press CTRL+C[x] Received 'error':'.........i am msg.........'

?


說明:

①與原文略有出入,如有疑問,請參閱原文

②原文均是編譯后通過javacp命令直接運行程序,我是在IDE中進行的,相應的操作做了修改。

總結

以上是生活随笔為你收集整理的RabbitMQ(四):Exchange交换器--direct的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。