RabbitMQ实例教程:主题交换机
前面的例子中,盡管我們使用了direct路由代替fanout路由解決了盲目廣播的問題,但direct路由也有它的缺陷,他不能基于多個標準做路由轉發。
在上面的日志系統中,如果不僅想基于日志等級做訂閱,也想根據日志的發生源做訂閱該怎么處理呢?這時候你可能想到了unix系統工具中的syslog服務,它不僅基于日志等級(info/warn/crit...)進行路由轉發,也會根據操作(auth/cron/kern...)做路由轉發。
如果是那樣的話,日志系統就靈活多了,它不僅能夠監聽來自‘cron’的關鍵錯誤,也能監聽來自'kern'的所有日志。其實主題交換機(topic exchange)就能解決這種問題。
主題交換機(Topic exchange)
主題交換機的路由代碼不能是任意寫的,必須是小樹點分隔開的一組單詞列表。這些單詞可以隨便寫,但通常是與連接消息特征有關的單詞。有效地路由代碼應該是這樣的“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由代碼可以隨便寫,但是長度限制在255字節。
注意,綁定代碼也必須在同一個表單中。topic交換機與direct交換機類似-具有特定路由代碼的消息會傳送給所有匹配綁定代碼的隊列,但有兩個特殊的綁定代碼:
* :它能替代一個單詞
#:它能替代0或多個單詞
該例子中,我們給所有的動物發送消息,符合由三個單詞(第一個單詞描述速度;第二個單詞描述顏色;第三個單詞描述物種)組成的路由代碼將會發送消息:“<speed>.<colour>.<species>”。
? ?
我們創建了三個綁定:Q1使用“*.orange.*”綁定,Q2使用“*.*.rabbit”和“lazy.#”綁定。這些綁定的意義如下:
??
Q1描述了所有顏色為橙色的動物。
??
Q2描述了是兔子的動物和懶惰的動物。
??
這樣,“quick.orange.rabbit”消息通過路由轉發給Q1、Q2兩個隊列。"lazy.orange.elephant"消息也會轉發給Q1、Q2兩個隊列。“quick.orange.fox”消息只會轉發給Q1隊列,"lazy.brown.fox"也只會轉發給Q2隊列。"lazy.pink.rabbit"會轉發給Q2隊列一次,盡管它匹配兩個綁定。"quick.brown.fox"并不匹配任何一個隊列就會被廢棄。
??
如果我們打破規則,每次只發一個或四個單詞的話,如“orange”或”quick.orange.male.rabbit“,這些消息不匹配任何綁定,就會被廢棄。但如果發送”lazy.orange.male.rabbit“這樣的消息的話,由于它匹配最后的綁定仍會被轉發到Q2隊列中。
? ??
主題交換機是一種非常強大的交換機,當它只綁定”#“時,它會接收所有的消息,與fanout交換機類似。當沒有使用”*“和”#“符號時,主題交換機的作用等同與direct交換機。
源代碼
EmitLogTopic.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | package?com.favccxx.favrabbit; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; public?class?EmitLogTopic?{ ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs"; ????public?static?void?main(String[]?argv)?{ ????????Connection?connection?=?null; ????????Channel?channel?=?null; ????????try?{ ????????????ConnectionFactory?factory?=?new?ConnectionFactory(); ????????????factory.setHost("localhost"); ????????????connection?=?factory.newConnection(); ????????????channel?=?connection.createChannel(); ????????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic"); ????????????String[]?routingKeys?=?{?"fast.orange.duck",?"slow.orange.fish",?"grey.rabbit",?"fast.black.rabbit", ????????????????????"quick.white.rabbit",?"lazy.dog",?"lazy.black.pig"?}; ????????????String[]?messages?=?{?"Hello",?"Guys",?"Girls",?"Babies"?}; ????????????for?(int?i?=?0;?i?<?routingKeys.length;?i++)?{ ????????????????for?(int?j?=?0;?j?<?messages.length;?j++)?{ ????????????????????channel.basicPublish(EXCHANGE_NAME,?routingKeys[i],?null,?messages[j].getBytes("UTF-8")); ????????????????????System.out.println("?[x]?Sent?'"?+?routingKeys[i]?+?"':'"?+?messages[j]?+?"'"); ????????????????} ????????????} ????????}?catch?(Exception?e)?{ ????????????e.printStackTrace(); ????????}?finally?{ ????????????if?(connection?!=?null)?{ ????????????????try?{ ????????????????????connection.close(); ????????????????}?catch?(Exception?ignore)?{ ????????????????} ????????????} ????????} ????} } |
ReceiveLogsTopic.java
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 | package?com.favccxx.favrabbit; import?java.io.IOException; import?com.rabbitmq.client.AMQP; import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.Consumer; import?com.rabbitmq.client.DefaultConsumer; import?com.rabbitmq.client.Envelope; public?class?ReceiveLogsTopic?{ ????private?static?final?String?EXCHANGE_NAME?=?"topic_logs"; ????public?static?void?main(String[]?argv)?throws?Exception?{ ????????ConnectionFactory?factory?=?new?ConnectionFactory(); ????????factory.setHost("localhost"); ????????Connection?connection?=?factory.newConnection(); ????????Channel?channel?=?connection.createChannel(); ????????channel.exchangeDeclare(EXCHANGE_NAME,?"topic"); ????????String?queueName?=?channel.queueDeclare().getQueue(); ????????String[]?bindingKeys?=?{?"*.orange.*",?"*.*.rabbit",?"lazy.#"?}; ????????for?(final?String?bindingKey?:?bindingKeys)?{ ????????????channel.queueBind(queueName,?EXCHANGE_NAME,?bindingKey); ????????????Consumer?consumer?=?new?DefaultConsumer(channel)?{ ????????????????@Override ????????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties, ????????????????????????byte[]?body)?throws?IOException?{ ????????????????????String?message?=?new?String(body,?"UTF-8"); ????????????????????System.out.println("["?+?bindingKey?+?"]?Received?message?:'"?+?message?+?"'?from?routingKey?:?"?+?envelope.getRoutingKey()); ????????????????} ????????????}; ????????????channel.basicConsume(queueName,?true,?consumer); ????????} ????} } |
運行消息發送器,在消息接收平臺輸出內容如下
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | [*.orange.*]?Received?message?:'Hello'?from?routingKey?:?fast.orange.duck [*.*.rabbit]?Received?message?:'Guys'?from?routingKey?:?fast.orange.duck [lazy.#]?Received?message?:'Girls'?from?routingKey?:?fast.orange.duck [*.orange.*]?Received?message?:'Babies'?from?routingKey?:?fast.orange.duck [*.*.rabbit]?Received?message?:'Hello'?from?routingKey?:?slow.orange.fish [lazy.#]?Received?message?:'Guys'?from?routingKey?:?slow.orange.fish [*.orange.*]?Received?message?:'Girls'?from?routingKey?:?slow.orange.fish [*.*.rabbit]?Received?message?:'Babies'?from?routingKey?:?slow.orange.fish [lazy.#]?Received?message?:'Hello'?from?routingKey?:?fast.black.rabbit [*.orange.*]?Received?message?:'Guys'?from?routingKey?:?fast.black.rabbit [*.*.rabbit]?Received?message?:'Girls'?from?routingKey?:?fast.black.rabbit [lazy.#]?Received?message?:'Babies'?from?routingKey?:?fast.black.rabbit [*.orange.*]?Received?message?:'Hello'?from?routingKey?:?quick.white.rabbit [*.*.rabbit]?Received?message?:'Guys'?from?routingKey?:?quick.white.rabbit [lazy.#]?Received?message?:'Girls'?from?routingKey?:?quick.white.rabbit [*.orange.*]?Received?message?:'Babies'?from?routingKey?:?quick.white.rabbit [*.*.rabbit]?Received?message?:'Hello'?from?routingKey?:?lazy.dog [lazy.#]?Received?message?:'Guys'?from?routingKey?:?lazy.dog [*.orange.*]?Received?message?:'Girls'?from?routingKey?:?lazy.dog [*.*.rabbit]?Received?message?:'Babies'?from?routingKey?:?lazy.dog [lazy.#]?Received?message?:'Hello'?from?routingKey?:?lazy.black.pig [*.orange.*]?Received?message?:'Guys'?from?routingKey?:?lazy.black.pig [*.*.rabbit]?Received?message?:'Girls'?from?routingKey?:?lazy.black.pig [lazy.#]?Received?message?:'Babies'?from?routingKey?:?lazy.black.pig |
本文轉自 genuinecx 51CTO博客,原文鏈接:http://blog.51cto.com/favccxx/1703031,如需轉載請自行聯系原作者
總結
以上是生活随笔為你收集整理的RabbitMQ实例教程:主题交换机的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Laravel Eloquent O
- 下一篇: vbkey常数