rabbitmq-路由模式-routingkey
生活随笔
收集整理的這篇文章主要介紹了
rabbitmq-路由模式-routingkey
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
【README】
本文po出 rabbitmq路由模式;
?
【1】intro to 路由模式
特點1)隊列與交換機的綁定,不能是任意綁定, 而是指定一個路由key-routingkey;
特點2)消息的發送方向在向 exchange-交換機發送消息時,也必須指定消息的routingkey;
特點3)exchange-交換機不再把消息發送給每一個綁定的隊列,而是根據消息的routingkey發送到對應的隊列;
與發布訂閱模式不同,路由模式的交換機類型是 Direct,還有隊列綁定交換機的時候需要指定routingkey;?
【2】代碼
生產者
/*** 路由模式生產者*/ public class RouteProducer {/* 交換機名稱 */static final String DIRECT_EXCHANGE = "direct_exchange"; /*隊列名稱1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*隊列名稱2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*獲取連接*/Connection conn = RBConnectionUtil.getConn();// 創建頻道 Channel channel = conn.createChannel();/*** 聲明交換機* 參數1-交換機名稱 * 參數2-交換機類型(fanout, topic, direct, headers)*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); /*** 聲明隊列* 參數1 隊列名稱 * 參數2 是否定義持久化隊列 * 參數3 是否獨占本次連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */channel.queueDeclare(ROUTE_QUEUE_INSERT, true, false, false, null);channel.queueDeclare(ROUTE_QUEUE_UPDATE, true, false, false, null);/*** routingkey-路由鍵*/String insertRoutingKey = "insert";String updateRoutingKey = "update";/*** 隊列綁定交換機* 參數1 隊列名稱 * 參數2 交換機 * 參數3 routingkey-路由鍵 */channel.queueBind(ROUTE_QUEUE_INSERT, DIRECT_EXCHANGE, insertRoutingKey);channel.queueBind(ROUTE_QUEUE_UPDATE, DIRECT_EXCHANGE, updateRoutingKey);/* 發送消息-insert */ String insertMsg = "我是消息,路由模式routingkey=" + insertRoutingKey + MyDateUtil.getNow();/*** 參數1 交換機名稱 如果沒有指定則使用默認 default exchange * 參數2 routingkey-路由key, 簡單模式可以傳遞隊列名稱 * 參數3 消息其他屬性* 參數4 消息內容 */channel.basicPublish(DIRECT_EXCHANGE, insertRoutingKey, null, insertMsg.getBytes());System.out.println("已發送消息=" + insertMsg); /* 發送消息-update */String updateMsg = "我是消息,路由模式routingkey=" + updateRoutingKey + MyDateUtil.getNow();channel.basicPublish(DIRECT_EXCHANGE, updateRoutingKey, null, updateMsg.getBytes());System.out.println("已發送消息=" + updateMsg); /* 關閉連接和信道 */ channel.close();conn.close(); } }消費者-insert
/*** 路由模式消費者-routingkey */ public class RouteConsumerInsert {/* 交換機名稱 */static final String DIRECT_EXCHANGE = "direct_exchange"; /*隊列名稱1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*隊列名稱2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*創建連接 */Connection conn = RBConnectionUtil.getConn();/*創建隊列*/Channel channel = conn.createChannel(); /*聲明交換機*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** routingkey-路由鍵*/String insertRoutingKey = "insert";String updateRoutingKey = "update";/*** 聲明/創建隊列 * 參數1 隊列名稱 * 參數2 是否持久化* 參數3 是否獨占本連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */channel.queueDeclare(ROUTE_QUEUE_INSERT, true, false, false, null);/*** 隊列綁定交換機* 參數1 隊列名稱* 參數2 交換機* 參數3 routingkey-路由鍵 */channel.queueBind(ROUTE_QUEUE_INSERT, DIRECT_EXCHANGE, insertRoutingKey);/* 創建消費者,設置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內容,包括消息id,消息routingkey,交換機,消息和重轉標記(收到消息失敗后是否需要重新發送) * @param properties 基本屬性* @param body 消息字節數組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者1 end ===\n"); } };/*** 監聽消息 * 參數1 隊列名稱 * 參數2 是否自動確認, 設置為true表示消息接收到自動向 mq回復ack;mq收到ack后會刪除消息; 設置為false則需要手動發送ack; * 參數3 消息接收后的回調 */channel.basicConsume(ROUTE_QUEUE_INSERT, true, consumer); } }消費者-update
/*** 路由模式消費者-routingkey */ public class RouteConsumerUpdate {/* 交換機名稱 */static final String DIRECT_EXCHANGE = "topic_exchange"; /*隊列名稱1*/ static final String ROUTE_QUEUE_INSERT = "route_queue_insert";/*隊列名稱2*/static final String ROUTE_QUEUE_UPDATE = "route_queue_update";public static void main(String[] args) throws Exception {/*創建連接 */Connection conn = RBConnectionUtil.getConn();/*創建隊列*/Channel channel = conn.createChannel(); /*聲明交換機*/channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);/*** routingkey-路由鍵*/String updateRoutingKey = "update";/*** 聲明/創建隊列 * 參數1 隊列名稱 * 參數2 是否持久化* 參數3 是否獨占本連接 * 參數4 是否在不使用的時候自動刪除隊列* 參數5 隊列其他參數 */channel.queueDeclare(ROUTE_QUEUE_UPDATE, true, false, false, null);/*** 隊列綁定交換機* 參數1 隊列名稱* 參數2 交換機* 參數3 routingkey-路由鍵 */channel.queueBind(ROUTE_QUEUE_UPDATE, DIRECT_EXCHANGE, updateRoutingKey);/* 創建消費者,設置消息處理邏輯 */Consumer consumer = new DefaultConsumer(channel) {/*** @param consumerTag 消費者標簽,在 channel.basicConsume 可以指定 * @param envelope 消息包內容,包括消息id,消息routingkey,交換機,消息和重轉標記(收到消息失敗后是否需要重新發送) * @param properties 基本屬性* @param body 消息字節數組 */@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("=== 消費者1 start ===");System.out.println("路由key=" + envelope.getRoutingKey());System.out.println("交換機=" + envelope.getExchange());System.out.println("消息id=" + envelope.getDeliveryTag()); String message = new String(body, "UTF-8");System.out.println(String.format("消費者收到的消息【%s】", message)); System.out.println("=== 消費者1 end ===\n"); } };/*** 監聽消息 * 參數1 隊列名稱 * 參數2 是否自動確認, 設置為true表示消息接收到自動向 mq回復ack;mq收到ack后會刪除消息; 設置為false則需要手動發送ack; * 參數3 消息接收后的回調 */channel.basicConsume(ROUTE_QUEUE_UPDATE, true, consumer); } }?
?
總結
以上是生活随笔為你收集整理的rabbitmq-路由模式-routingkey的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rabbitmq-发布订阅模式
- 下一篇: rabbitmq-通配符模式