RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.
RabbitMq的提供了六種模式分別是:簡單模式,工作模式,發(fā)布\訂閱模式,路由模式,通配符模式,RPC遠(yuǎn)程調(diào)用模式
下面將詳細(xì)介紹常用的前五種模式,附上測試代碼.
公共的代碼---連接工具類:
public class ConnectionUtil {public static Connection getConnection() throws Exception {//定義連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置服務(wù)地址factory.setHost("localhost");//端口factory.setPort(5672);//設(shè)置賬號信息,用戶名、密碼、vhostfactory.setVirtualHost("/taotao");factory.setUsername("admin");factory.setPassword("admin");// 通過工程獲取連接Connection connection = factory.newConnection();return connection;}}1.簡單模式Hello World
/** 簡單模式Hello World 功能:一個生產(chǎn)者P發(fā)送消息到隊(duì)列Q,一個消費(fèi)者C接收生產(chǎn)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊(duì)列queue, 使用通道channel向隊(duì)列中發(fā)送消息,關(guān)閉通道和連接。消費(fèi)者實(shí)現(xiàn)思路 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊(duì)列queue, 創(chuàng)建消費(fèi)者并監(jiān)聽隊(duì)列,從隊(duì)列中讀取消息。*/ public class Send {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();// 從連接中創(chuàng)建通道Channel channel = connection.createChannel();// 聲明(創(chuàng)建)隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 消息內(nèi)容String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");//關(guān)閉通道和連接channel.close();connection.close();} } public class Recv {private final static String QUEUE_NAME = "test_queue";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 定義隊(duì)列的消費(fèi)者QueueingConsumer consumer = new QueueingConsumer(channel);// 監(jiān)聽隊(duì)列channel.basicConsume(QUEUE_NAME, true, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}} }效果:
一個生產(chǎn)者發(fā),一個消費(fèi)者收.
2 工作隊(duì)列模式Work Queue
/** 工作隊(duì)列模式Work Queue 功能:一個生產(chǎn)者,多個消費(fèi)者,每個消費(fèi)者獲取到的消息唯一,多個消費(fèi)者只有一個隊(duì)列任務(wù)隊(duì)列:避免立即做一個資源密集型任務(wù),必須等待它完成,而是把這個任務(wù)安排到稍后再做。 我們將任務(wù)封裝為消息并將其發(fā)送給隊(duì)列。后臺運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。 當(dāng)有多個worker同時運(yùn)行時,任務(wù)將在它們之間共享。生產(chǎn)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672, 設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel, 使用通道channel創(chuàng)建隊(duì)列queue,使用通道channel向隊(duì)列中發(fā)送消息,2條消息之間間隔一定時間,關(guān)閉通道和連接。消費(fèi)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672, 設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel, 使用通道channel創(chuàng)建隊(duì)列queue,創(chuàng)建消費(fèi)者C1并監(jiān)聽隊(duì)列,獲取消息并暫停10ms, 另外一個消費(fèi)者C2暫停1000ms,由于消費(fèi)者C1消費(fèi)速度快,所以C1可以執(zhí)行更多的任務(wù)。*/ public class Send {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i = 0; i < 100; i++) {// 消息內(nèi)容String message = "" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");Thread.sleep(i * 10);}channel.close();connection.close();} } public class Recv {private final static String QUEUE_NAME = "test_queue_work";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 同一時刻服務(wù)器只會發(fā)一條消息給消費(fèi)者channel.basicQos(0);// 定義隊(duì)列的消費(fèi)者QueueingConsumer consumer = new QueueingConsumer(channel);// 監(jiān)聽隊(duì)列,手動返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");//休眠Thread.sleep(10);// 返回確認(rèn)狀態(tài)channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }效果:
生產(chǎn)者分發(fā)的100份,會按照輪詢給消費(fèi)者1和消費(fèi)者2分別消費(fèi)(一奇一偶),即使消費(fèi)者1消費(fèi)速度快,也是按照輪詢分發(fā),你一份我一份!
當(dāng)然了,如果在消費(fèi)期間,消費(fèi)者2停止服務(wù),消費(fèi)者1會消費(fèi)剩下所有的服務(wù)(把2沒干完的也干了)
.
3 發(fā)布/訂閱模式Publish/Subscribe
/**發(fā)布/訂閱模式Publish/Subscribe功能:一個生產(chǎn)者發(fā)送的消息會被多個消費(fèi)者獲取。一個生產(chǎn)者、一個交換機(jī)、多個隊(duì)列、多個消費(fèi)者 生產(chǎn)者:可以將消息發(fā)送到隊(duì)列或者是交換機(jī)。 消費(fèi)者:只能從隊(duì)列中獲取消息。如果消息發(fā)送到?jīng)]有隊(duì)列綁定的交換機(jī)上,那么消息將丟失。 交換機(jī)不能存儲消息,消息存儲在隊(duì)列中生產(chǎn)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊(duì)列queue, 使用通道channel創(chuàng)建交換機(jī)并指定交換機(jī)類型為fanout,使用通道向交換機(jī)發(fā)送消息,關(guān)閉通道和連接。消費(fèi)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊(duì)列queue,綁定隊(duì)列到交換機(jī), 設(shè)置Qos=1,創(chuàng)建消費(fèi)者并監(jiān)聽隊(duì)列,使用手動方式返回完成。可以有多個隊(duì)列綁定到交換機(jī),多個消費(fèi)者進(jìn)行監(jiān)聽。*/ public class Send {private final static String EXCHANGE_NAME = "test_exchange_fanout";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 消息內(nèi)容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }效果:
生產(chǎn)者發(fā)布消息后,所有"訂閱"了該交換機(jī)的消費(fèi)者都會收到一份相同的信息.相當(dāng)于廣播,這里Recv1和Recv2都會同一時間收到消息.
4 路由模式Routing
/** 路由模式Routing說明:生產(chǎn)者發(fā)送消息到交換機(jī)并且要指定路由key,消費(fèi)者將隊(duì)列綁定到交換機(jī)時需要指定路由key生產(chǎn)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672, 設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel, 使用通道channel創(chuàng)建隊(duì)列queue,使用通道channel創(chuàng)建交換機(jī)并指定交換機(jī)類型為direct, 使用通道向交換機(jī)發(fā)送消息并指定key=b,關(guān)閉通道和連接。消費(fèi)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672, 設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel, 使用通道channel創(chuàng)建隊(duì)列queue,綁定隊(duì)列到交換機(jī),設(shè)置Qos=1,創(chuàng)建消費(fèi)者并監(jiān)聽隊(duì)列,使用手動方式返回完成。 可以有多個隊(duì)列綁定到交換機(jī),但只要綁定key=b的隊(duì)列key接收到消息,多個消費(fèi)者進(jìn)行監(jiān)聽。*/ public class Send {private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息內(nèi)容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}}public class Recv {private final static String QUEUE_NAME = "test_queue_work";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");// 同一時刻服務(wù)器只會發(fā)一條消息給消費(fèi)者channel.basicQos(1);// 定義隊(duì)列的消費(fèi)者QueueingConsumer consumer = new QueueingConsumer(channel);// 監(jiān)聽隊(duì)列,手動返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }
public class Recv2 {private final static String QUEUE_NAME = "test_queue_work2";private final static String EXCHANGE_NAME = "test_exchange_direct";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");// 同一時刻服務(wù)器只會發(fā)一條消息給消費(fèi)者channel.basicQos(1);// 定義隊(duì)列的消費(fèi)者QueueingConsumer consumer = new QueueingConsumer(channel);// 監(jiān)聽隊(duì)列,手動返回完成channel.basicConsume(QUEUE_NAME, false, consumer);// 獲取消息while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");Thread.sleep(10);channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}} }
效果:
生產(chǎn)者發(fā)送消息到交換機(jī)并且要指定路由為key,消費(fèi)者將隊(duì)列綁定到交換機(jī)時需要指定路由key,只有指定路由是key的消費(fèi)者才會獲取到該生產(chǎn)者發(fā)送消息.
5 通配符模式Topics??
/** 通配符模式 說明:生產(chǎn)者P發(fā)送消息到交換機(jī)X,type=topic,交換機(jī)根據(jù)綁定隊(duì)列的routing key的值進(jìn)行通配符匹配;符號#:匹配一個或者多個詞lazy.# 可以匹配lazy.irs或者lazy.irs.cor符號*:只能匹配一個詞lazy.* 可以匹配lazy.irs或者lazy.cor生產(chǎn)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊(duì)列queue, 使用通道channel創(chuàng)建交換機(jī)并指定交換機(jī)類型為topic,使用通道向交換機(jī)發(fā)送消息并指定key=key.1,關(guān)閉通道和連接。消費(fèi)者實(shí)現(xiàn)思路: 創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host, 從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊(duì)列queue,綁定隊(duì)列到交換機(jī),設(shè)置Qos=1, 創(chuàng)建消費(fèi)者并監(jiān)聽隊(duì)列,使用手動方式返回完成。可以有多個隊(duì)列綁定到交換機(jī),凡是綁定規(guī)則符合通配符規(guī)則的隊(duì)列均可以接收到消息,比如key.*,key.#,多個消費(fèi)者進(jìn)行監(jiān)聽。*/ public class Send {private final static String EXCHANGE_NAME = "test_exchange_topic";public static void main(String[] argv) throws Exception {// 獲取到連接以及mq通道Connection connection = ConnectionUtil.getConnection();Channel channel = connection.createChannel();// 聲明exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");// 消息內(nèi)容String message = "Hello World!";channel.basicPublish(EXCHANGE_NAME, "product.delete", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }效果:
生產(chǎn)者規(guī)定的路由規(guī)則key為"product.delete",只有消費(fèi)者2匹配,所以只有消費(fèi)者2能收到生產(chǎn)者的消息,消費(fèi)者1不能收到生產(chǎn)者的消息.
總結(jié)
RabbitMQ提供了6種模式,分別是HelloWorld,Work Queue,Publish/Subscribe,Routing,Topics,RPC Request/reply,本文詳細(xì)講述了前5種,并給出代碼實(shí)現(xiàn)和思路。其中Publish/Subscribe,Routing,Topics三種模式可以統(tǒng)一歸為Exchange模式,只是創(chuàng)建時交換機(jī)的類型不一樣,分別是fanout、direct、topic.
三種Exchange模式如下:
Fanout Exchange?– 不處理路由鍵。你只需要簡單的將隊(duì)列綁定到交換機(jī)上。一個發(fā)送到交換機(jī)的消息都會被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機(jī)都獲得了一份復(fù)制的消息。Fanout交換機(jī)轉(zhuǎn)發(fā)消息是最快的。?
Direct Exchange?– 處理路由鍵。需要將一個隊(duì)列綁定到交換機(jī)上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),不會轉(zhuǎn)發(fā)dog.puppy,也不會轉(zhuǎn)發(fā)dog.guard,只會轉(zhuǎn)發(fā)dog。?
Topic Exchange?– 將路由鍵和某模式進(jìn)行匹配。此時隊(duì)列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*”只會匹配到“audit.irs”。
總結(jié)
以上是生活随笔為你收集整理的RabbitMq的工作模式 介绍+测试代码,以及三种Exchange模式介绍.的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 奥赛金牌计算机博士中学老师,博士教师从6
- 下一篇: 关于onload的事件权柄以及踩过的坑