RabbitMQ(五):Exchange交换器--topic
內(nèi)容翻譯自:RabbitMQ Tutorials Java版
RabbitMQ(一):Hello World程序
RabbitMQ(二):Work Queues、循環(huán)分發(fā)、消息確認、持久化、公平分發(fā)
RabbitMQ(三):Exchange交換器--fanout
RabbitMQ(四):Exchange交換器--direct
RabbitMQ(五):Exchange交換器--topic
RabbitMQ(六):回調(diào)隊列callback queue、關(guān)聯(lián)標(biāo)識correlation id、實現(xiàn)簡單的RPC系統(tǒng)
RabbitMQ(七):常用方法說明 與 學(xué)習(xí)小結(jié)
Topics:
在上一篇博客中我們改進了我們的日志系統(tǒng):使用direct路由器替代了fanout路由器,從而可以選擇性地接收日志。
盡管使用direct路由器給我們的日志系統(tǒng)帶來了改進,但仍然有一些限制:不能基于多種標(biāo)準(zhǔn)進行路由。
在我們的日志系統(tǒng)中,我們可能不僅需要根據(jù)日志的嚴重級別來接收日志,而且有時想基于日志來源進行路由。如果你知道syslog這個Unix工具,你可能了解這個概念,sysylog會基于日志嚴重級別(info/warn/crit...)和設(shè)備(auth/cron/kern...)進行日志分發(fā)。
如果我們可以監(jiān)聽來自corn的錯誤日志,同時也監(jiān)聽kern的所有日志,那么我們的日志系統(tǒng)就會更加靈活。
為了實現(xiàn)這個功能,我們需要了解一個復(fù)雜的路由器:topic路由器。
主題路由器(Topic Exchange):
發(fā)送到topic路由器的消息的路由鍵routing_key不能任意給定:它必須是一些單詞的集合,中間用點號.分割。這些單詞可以是任意的,但通常會體現(xiàn)出消息的特征。一些有效的路由鍵示例:stock.usd.nyse,nyse.vmw,quick.orange.rabbit。這些路由鍵可以包含很多單詞,但路由鍵總長度不能超過255個字節(jié)。
綁定鍵binding key也必須是這種形式。topic路由器背后的邏輯與direct路由器類似:以特定路由鍵發(fā)送的消息將會發(fā)送到所有綁定鍵與之匹配的隊列中。但綁定鍵有兩種特殊的情況:
(1)*(星號)僅代表一個單詞
(2)#(井號)代表任意個單詞
下圖可以很好地解釋這兩個符號的含義:
對于上圖的例子,我們將會發(fā)送描述動物的消息。這些消息將會以由三個單詞組成的路由鍵發(fā)送。路由鍵中的第一個單詞描述了速度,第二個描述了顏色,第三個描述了物種:<speed>.<colour>.<species>。
我們創(chuàng)建了三個綁定,Q1的綁定鍵為*.orange.*,Q2的綁定鍵有兩個,分別是*.*.rabbit和lazy.#。
上述綁定關(guān)系可以描述為:
(1)Q1關(guān)注所有顏色為orange的動物。
(2)Q2關(guān)注所有的rabbit,以及所有的lazy的動物。
如果一個消息的路由鍵是quick.orange.rabbit,那么Q1和Q2都可以接收到,路由鍵是lazy.orange.elephant的消息同樣如此。但是,路由鍵是quick.orange.fox的消息只會到達Q1,路由鍵是lazy.brown.fox的消息只會到達Q2。注意,路由鍵為lazy.pink.rabbit的消息只會到達Q2一次,盡管它匹配了兩個綁定鍵。路由鍵為quick.brown.fox的消息因為不和任意的綁定鍵匹配,所以將會被丟棄。
假如我們不按常理出牌:發(fā)送一個路由鍵只有一個單詞或者四個單詞的消息,像orange或者quick.orange.male.rabbit,這樣的話,這些消息因為不和任意綁定鍵匹配,都將會丟棄。但是,lazy.orange.male.rabbit消息因為和lazy.#匹配,所以會到達Q2,盡管它包含四個單詞。
Topic exchange::
Topic exchange非常強大,可以實現(xiàn)其他任意路由器的功能。
當(dāng)一個隊列以綁定鍵#綁定,它將會接收到所有的消息,而無視路由鍵(實際是綁定鍵#匹配了任意的路由鍵)。----這和fanout路由器一樣了。
當(dāng)*和#這兩個特殊的字符不出現(xiàn)在綁定鍵中,Topic exchange就會和direct exchange類似了。
放在一塊:
我們將會在我們的日志系統(tǒng)中使用主題路由器Topic exchange,并假設(shè)所有的日志消息以兩個單詞<facility>.<severity>為路由鍵。
代碼和上個教程幾乎一樣。
生產(chǎn)者EmitLogTopic.java:
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;public class EmitLogTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) {Connection connection = null;Channel channel = null;try {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");connection = factory.newConnection();channel = connection.createChannel();//聲明路由器和路由器類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);//定義路由鍵和消息String routingKey = "";String message = "msg.....";//發(fā)布消息channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");} catch (Exception e) {e.printStackTrace();} finally {if (connection != null) {try {connection.close();} catch (Exception ignore) {}}}} }消費者ReceiveLogsTopic.java:
import com.rabbitmq.client.*;import java.io.IOException;public class ReceiveLogsTopic {private static final String EXCHANGE_NAME = "topic_logs";public static void main(String[] argv) throws Exception {//建立連接和通道ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//聲明路由器和路由器類型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();//String bingingKeys[] = {""};for (String bindingKey : bingingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//監(jiān)聽消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");}};channel.basicConsume(queueName, true, consumer);} }現(xiàn)在,可以動手實驗了。
開頭提到的:日志嚴重級別info/warn/crit...和設(shè)備auth/cron/kern...。
消費者:
將String bingingKeys[] = {""}改為String bingingKeys[] = {"#"},啟動第一個消費者;
再改為String bingingKeys[] = {"kern.*"},啟動第二個消費者;
再改為String bingingKeys[] = {"*.critical"},啟動第三個消費者;
再改為String bingingKeys[] = {"kern.*", "*.critical"},啟動第四個消費者。
生產(chǎn)者,發(fā)送多個消息,如:
路由鍵為kern.critical 的消息:A critical kernel error;
路由鍵為kern.info 的消息:A kernel info;
路由鍵為kern.warn 的消息:A kernel warning;
路由鍵為auth.critical 的消息:A critical auth error;
路由鍵為cron.warn 的消息:A cron waning;
路由鍵為cron.critical 的消息:A critical cron error;
試試最后的結(jié)果:第一個消費者將會接收到所有的消息,第二個消費者將會kern的所有嚴重級別的日志,第三個消費者將會接收到所有設(shè)備的critical消息,第四個消費者將會接收到kern設(shè)備的所有消息和所有critical消息。
下篇博客中,我們將會學(xué)習(xí)如何讓消息往返,以此來作為一個遠程過程調(diào)用(RPC)。
?
說明
①與原文略有出入,如有疑問,請參閱原文
②原文均是編譯后通過javacp命令直接運行程序,我是在IDE中進行的,相應(yīng)的操作做了修改。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ(五):Exchange交换器--topic的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RabbitMQ(四):Exchange
- 下一篇: RabbitMQ(七):常用方法说明 与