rabbitmq-通配符模式
【README】
本文介紹 通配符模式,及代碼示例
【1】intro to rabbitmq通配符模式
0)通配符模式-交換機類型為 Topic;
1)與路由模式相比,相同點是 兩者都可以通過 routingkey 把消息轉發到不同的隊列;
不同點是通配符模式-topic類型的exchange可以讓隊列在綁定routing key的時候使用通配符;
2)通配符模式的routingkey 通常使用多個單詞并用點號連接,如 item.insert ;
3)通配符規則:
# 匹配一個或多個詞;
* 匹配不多不少一個詞; ?
荔枝:
item.# 能夠匹配 item.insert.abc 或 item.insert? ; (可以多層)
item.* 能夠匹配 item.insert ;? (只能一層)
refers2 https://www.rabbitmq.com/tutorials/tutorial-five-java.html?
4)新建隊列
5)把隊列綁定到交換機?
6)生產者發送消息到隊列,路由key 分別是 item.insert , item.update, item.delete ; 如下:
【2】代碼
生產者
/*** 通配符模式-交換機類型為TOPIC*/ public class WildProducer {/* 交換機名稱 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*隊列名稱1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";/*隊列名稱2*/static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創建頻道 Channel channel = conn.createChannel();/*** 聲明交換機* 參數1-交換機名稱 * 參數2-交換機類型(fanout, topic, direct, headers)*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); /*** routingkey-路由鍵 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/* 發送消息-insert */ /*** 參數1 交換機名稱 如果沒有指定則使用默認 default exchange * 參數2 routingkey-路由key, 簡單模式可以傳遞隊列名稱 * 參數3 消息其他屬性* 參數4 消息內容 */String insertMsg = "我是消息,通配符模式,routingkey=" + itemInsertRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemInsertRoutingKey, null, insertMsg.getBytes());System.out.println("已發送消息=" + insertMsg); String updMsg = "我是消息,通配符模式,routingkey=" + itemUpdateRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemUpdateRoutingKey, null, updMsg.getBytes());System.out.println("已發送消息=" + updMsg);String deleteMsg = "我是消息,通配符模式,routingkey=" + itemDeleteRoutingKey + MyDateUtil.getNow();channel.basicPublish(TOPIC_EXCHANGE, itemDeleteRoutingKey, null, deleteMsg.getBytes());System.out.println("已發送消息=" + deleteMsg);/* 關閉連接和信道 */ channel.close();conn.close(); } }消費者1? topic_queue_1
/*** 通配符模式消費者-routingkey */ public class RouteConsumerWild1 {/* 交換機名稱 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*隊列名稱1*/ static final String TOPIC_QUEUE_1 = "topic_queue_1";public static void main(String[] args) throws Exception {/*創建連接 */Connection conn = RBConnectionUtil.getConn();/*創建隊列*/Channel channel = conn.createChannel(); /*聲明交換機*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由鍵 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 聲明/創建隊列 * 參數1 隊列名稱 * 參數2 是否持久化* 參數3 是否獨占本連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */ // channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null); // ui界面可以創建隊列 /*** 隊列綁定交換機* 參數1 隊列名稱* 參數2 交換機* 參數3 routingkey-路由鍵 */ // channel.queueBind(TOPIC_QUEUE_1, TOPIC_EXCHANGE, "item.#"); // ui界面可以把隊列綁定到交換機 /* 創建消費者,設置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內容,包括消息id,消息routingkey,交換機,消息和重轉標記(收到消息失敗后是否需要重新發送) * @param properties 基本屬性* @param body 消息字節數組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者1 end ===\n"); } };/*** 監聽消息 * 參數1 隊列名稱 * 參數2 是否自動確認, 設置為true表示消息接收到自動向 mq回復ack;mq收到ack后會刪除消息; 設置為false則需要手動發送ack; * 參數3 消息接收后的回調 */channel.basicConsume(TOPIC_QUEUE_1, true, consumer); } }消費者2 topic_queue_2
/*** 通配符模式消費者-routingkey */ public class RouteConsumerWild2 {/* 交換機名稱 */static final String TOPIC_EXCHANGE = "topic_exchange"; /*隊列名稱1*/ static final String TOPIC_QUEUE_2 = "topic_queue_2";public static void main(String[] args) throws Exception {/*創建連接 */Connection conn = RBConnectionUtil.getConn();/*創建隊列*/Channel channel = conn.createChannel(); /*聲明交換機*/channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);/*** routingkey-路由鍵 */String itemInsertRoutingKey = "item.insert"; String itemUpdateRoutingKey = "item.update";String itemDeleteRoutingKey = "item.delete";/*** 聲明/創建隊列 * 參數1 隊列名稱 * 參數2 是否持久化* 參數3 是否獨占本連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */ // channel.queueDeclare(TOPIC_QUEUE_2, true, false, false, null); // ui界面可以創建隊列 /*** 隊列綁定交換機* 參數1 隊列名稱 * 參數2 交換機* 參數3 routingkey-路由鍵 */ // channel.queueBind(TOPIC_QUEUE_2 TOPIC_EXCHANGE, "*.delete"); // ui界面可以把隊列綁定到交換機 /* 創建消費者,設置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/** * @param consumerTag 消費者標簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內容,包括消息id,消息routingkey,交換機,消息和重轉標記(收到消息失敗后是否需要重新發送) * @param properties 基本屬性* @param body 消息字節數組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者1 end ===\n"); } };/*** 監聽消息 * 參數1 隊列名稱 * 參數2 是否自動確認, 設置為true表示消息接收到自動向 mq回復ack;mq收到ack后會刪除消息; 設置為false則需要手動發送ack; * 參數3 消息接收后的回調 */channel.basicConsume(TOPIC_QUEUE_2, true, consumer); } }【3】 rabbitmq 模式總結??
8.1)模式1 簡單模式 helloworld
一個生產者,一個消費者,不需要設置交換機,使用默認交換機;
8.2)模式2 工作隊列模式 work queue
一個生產者,多個消費者(競爭關系),不需要設置交換機(使用默認交換機);
8.3)發布訂閱模式? publish/subscribe
需要設置類型為 fanout-廣播的交換機,并且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列;
8.4)路由模式 routing
需要設置類型為 direct的交換機, 交換機和隊列進行綁定,并且指定routing key,當發送消息到交換機后,交換機會根據routing key 將消息發送到對應隊列;
8.5)通配符模式 topic
需要設置類型為 topic的交換機, 交換機和隊列進行綁定, 并且指定通配符方式的routing key, 當發送消息到交換機后,交換機會根據 routing key將消息發送到對應的隊列;
?
?
總結
以上是生活随笔為你收集整理的rabbitmq-通配符模式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 哪个安卓模拟器不卡不玩游戏(哪个安卓模拟
- 下一篇: 转: java多线程-ThreadPoo