RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.
RabbitMq的提供了六種模式分別是:簡單模式,工作模式,發布\訂閱模式,路由模式,通配符模式,RPC遠程調用模式
下面將詳細介紹常用的前五種模式,附上測試代碼.
公共的代碼---連接工具類:
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設置服務地址factory.setHost("localhost");//端口factory.setPort(5672);//設置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/taotao");factory.setUsername("admin");factory.setPassword("admin");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;}}1.簡單模式Hello World
/** 簡單模式Hello World 功能:一個生產者P發送消息到隊列Q,一個消費者C接收生產者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672,設置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創建通道channel,使用通道channel創建隊列queue, 使用通道channel向隊列中發送消息,關閉通道和連接。消費者實現思路 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672,設置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創建通道channel,使用通道channel創建隊列queue, 創建消費者并監聽隊列,從隊列中讀取消息。*/ public class Send {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();// 從連接中創建通道Channel channel = connection.createChannel();// 聲明(創建)隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息內容String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//關閉通道和連接channel.close();connection.close();} } public class Recv {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定義隊列的消費者QueueingConsumer consumer = new QueueingConsumer(channel);// 監聽隊列channel.basicConsume(QUEUE_NAME, true, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}} }效果:
一個生產者發,一個消費者收.
2 工作隊列模式Work Queue
/** 工作隊列模式Work Queue 功能:一個生產者,多個消費者,每個消費者獲取到的消息唯一,多個消費者只有一個隊列任務隊列:避免立即做一個資源密集型任務,必須等待它完成,而是把這個任務安排到稍后再做。 我們將任務封裝為消息并將其發送給隊列。后臺運行的工作進程將彈出任務并最終執行作業。 當有多個worker同時運行時,任務將在它們之間共享。生產者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672, 設置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創建通道channel, 使用通道channel創建隊列queue,使用通道channel向隊列中發送消息,2條消息之間間隔一定時間,關閉通道和連接。消費者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672, 設置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創建通道channel, 使用通道channel創建隊列queue,創建消費者C1并監聽隊列,獲取消息并暫停10ms, 另外一個消費者C2暫停1000ms,由于消費者C1消費速度快,所以C1可以執行更多的任務。*/ public class Send {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 100; i++) {// 消息內容String message = "" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 10);}channel.close();connection.close();} } public class Recv {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一時刻服務器只會發一條消息給消費者channel.basicQos(0);// 定義隊列的消費者QueueingConsumer consumer = new QueueingConsumer(channel);// 監聽隊列,手動返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");//休眠Thread.sleep(10);// 返回確認狀態channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }效果:
生產者分發的100份,會按照輪詢給消費者1和消費者2分別消費(一奇一偶),即使消費者1消費速度快,也是按照輪詢分發,你一份我一份!
當然了,如果在消費期間,消費者2停止服務,消費者1會消費剩下所有的服務(把2沒干完的也干了)
.
3 發布/訂閱模式Publish/Subscribe
/**發布/訂閱模式Publish/Subscribe功能:一個生產者發送的消息會被多個消費者獲取。一個生產者、一個交換機、多個隊列、多個消費者 生產者:可以將消息發送到隊列或者是交換機。 消費者:只能從隊列中獲取消息。如果消息發送到沒有隊列綁定的交換機上,那么消息將丟失。 交換機不能存儲消息,消息存儲在隊列中生產者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672,設置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創建通道channel,使用通道channel創建隊列queue, 使用通道channel創建交換機并指定交換機類型為fanout,使用通道向交換機發送消息,關閉通道和連接。消費者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672,設置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創建通道channel,使用通道channel創建隊列queue,綁定隊列到交換機, 設置Qos=1,創建消費者并監聽隊列,使用手動方式返回完成。可以有多個隊列綁定到交換機,多個消費者進行監聽。*/ public class Send {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息內容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }效果:
生產者發布消息后,所有"訂閱"了該交換機的消費者都會收到一份相同的信息.相當于廣播,這里Recv1和Recv2都會同一時間收到消息.
4 路由模式Routing
/** 路由模式Routing說明:生產者發送消息到交換機并且要指定路由key,消費者將隊列綁定到交換機時需要指定路由key生產者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672, 設置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創建通道channel, 使用通道channel創建隊列queue,使用通道channel創建交換機并指定交換機類型為direct, 使用通道向交換機發送消息并指定key=b,關閉通道和連接。消費者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672, 設置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創建通道channel, 使用通道channel創建隊列queue,綁定隊列到交換機,設置Qos=1,創建消費者并監聽隊列,使用手動方式返回完成。 可以有多個隊列綁定到交換機,但只要綁定key=b的隊列key接收到消息,多個消費者進行監聽。*/ public class Send {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息內容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}}public class Recv {private final static String QUEUE_NAME = "test_queue_work";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");// 同一時刻服務器只會發一條消息給消費者channel.basicQos(1);// 定義隊列的消費者QueueingConsumer consumer = new QueueingConsumer(channel);// 監聽隊列,手動返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }
public class Recv2 {private final static String QUEUE_NAME = "test_queue_work2";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊列到交換機channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");// 同一時刻服務器只會發一條消息給消費者channel.basicQos(1);// 定義隊列的消費者QueueingConsumer consumer = new QueueingConsumer(channel);// 監聽隊列,手動返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }
效果:
生產者發送消息到交換機并且要指定路由為key,消費者將隊列綁定到交換機時需要指定路由key,只有指定路由是key的消費者才會獲取到該生產者發送消息.
5 通配符模式Topics??
/** 通配符模式 說明:生產者P發送消息到交換機X,type=topic,交換機根據綁定隊列的routing key的值進行通配符匹配;符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor生產者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672,設置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創建通道channel,使用通道channel創建隊列queue, 使用通道channel創建交換機并指定交換機類型為topic,使用通道向交換機發送消息并指定key=key.1,關閉通道和連接。消費者實現思路: 創建連接工廠ConnectionFactory,設置服務地址127.0.0.1,端口號5672,設置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創建通道channel,使用通道channel創建隊列queue,綁定隊列到交換機,設置Qos=1, 創建消費者并監聽隊列,使用手動方式返回完成。可以有多個隊列綁定到交換機,凡是綁定規則符合通配符規則的隊列均可以接收到消息,比如key.*,key.#,多個消費者進行監聽。*/ public class Send {private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息內容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "product.delete", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }效果:
生產者規定的路由規則key為"product.delete",只有消費者2匹配,所以只有消費者2能收到生產者的消息,消費者1不能收到生產者的消息.
總結
RabbitMQ提供了6種模式,分別是HelloWorld,Work Queue,Publish/Subscribe,Routing,Topics,RPC Request/reply,本文詳細講述了前5種,并給出代碼實現和思路。其中Publish/Subscribe,Routing,Topics三種模式可以統一歸為Exchange模式,只是創建時交換機的類型不一樣,分別是fanout、direct、topic.
三種Exchange模式如下:
Fanout Exchange?– 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。?
Direct Exchange?– 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。?
Topic Exchange?– 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。
總結
以上是生活随笔為你收集整理的RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 奥赛金牌计算机博士中学老师,博士教师从6
- 下一篇: 关于onload的事件权柄以及踩过的坑