日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ中7种消息队列和保姆级代码演示!

發布時間:2025/3/11 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ中7种消息队列和保姆级代码演示! 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

blog.csdn.net/qq_32828253/article/details/110450249

七種模式介紹與應用場景

簡單模式(Hello World)

做最簡單的事情,一個生產者對應一個消費者,RabbitMQ相當于一個消息代理,負責將A的消息轉發給B

應用場景: 將發送的電子郵件放到消息隊列,然后郵件服務在隊列中獲取郵件并發送給收件人

工作隊列模式(Work queues)

在多個消費者之間分配任務(競爭的消費者模式),一個生產者對應多個消費者,一般適用于執行資源密集型任務,單個消費者處理不過來,需要多個消費者進行處理

應用場景: 一個訂單的處理需要10s,有多個訂單可以同時放到消息隊列,然后讓多個消費者同時處理,這樣就是并行了,而不是單個消費者的串行情況

訂閱模式(Publish/Subscribe)

一次向許多消費者發送消息,一個生產者發送的消息會被多個消費者獲取,也就是將消息將廣播到所有的消費者中。

應用場景: 更新商品庫存后需要通知多個緩存和多個數據庫,這里的結構應該是:

  • 一個fanout類型交換機扇出兩個個消息隊列,分別為緩存消息隊列、數據庫消息隊列

  • 一個緩存消息隊列對應著多個緩存消費者

  • 一個數據庫消息隊列對應著多個數據庫消費者

路由模式(Routing)

有選擇地(Routing key)接收消息,發送消息到交換機并且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key,僅消費指定路由key的消息

應用場景: 如在商品庫存中增加了1臺iphone12,iphone12促銷活動消費者指定routing key為iphone12,只有此促銷活動會接收到消息,其它促銷活動不關心也不會消費此routing key的消息

主題模式(Topics)

根據主題(Topics)來接收消息,將路由key和某模式進行匹配,此時隊列需要綁定在一個模式上,#匹配一個詞或多個詞,*只匹配一個詞。

應用場景: 同上,iphone促銷活動可以接收主題為iphone的消息,如iphone12、iphone13等

遠程過程調用(RPC)

如果我們需要在遠程計算機上運行功能并等待結果就可以使用RPC,具體流程可以看圖。應用場景:需要等待接口返回數據,如訂單支付

發布者確認(Publisher Confirms)

與發布者進行可靠的發布確認,發布者確認是RabbitMQ擴展,可以實現可靠的發布。在通道上啟用發布者確認后,RabbitMQ將異步確認發送者發布的消息,這意味著它們已在服務器端處理。

應用場景: 對于消息可靠性要求較高,比如錢包扣款

代碼演示

代碼中沒有對后面兩種模式演示,有興趣可以自己研究

簡單模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?QUEUE_NAME?=?"simple_queue";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?聲明隊列// queue:隊列名// durable:是否持久化// exclusive:是否排外??即只允許該channel訪問該隊列???一般等于true的話用于一個隊列只能有一個消費者來消費的場景// autoDelete:是否自動刪除??消費完刪除// arguments:其他屬性channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//消息內容String?message?=?"simplest?mode?message";channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());System.out.println("[x]Sent?'"?+?message?+?"'");//最后關閉通關和連接channel.close();connection.close();} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver?{private?final?static?String?QUEUE_NAME?=?"simplest_queue";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{//?獲取連接ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} }

工作隊列模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?同一時刻服務器只會發送一條消息給消費者channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?同一時刻服務器只會發送一條消息給消費者channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?QUEUE_NAME?=?"queue_work";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?聲明隊列channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);for?(int?i?=?0;?i?<?100;?i++)?{String?message?=?"work?mode?message"?+?i;channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'");Thread.sleep(i?*?10);}channel.close();connection.close();} }

發布訂閱模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;public?class?Receive1?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"");System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");//?訂閱消息的回調函數DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+?message?+?"'");};//?消費者,有消息時出發訂閱回調函數channel.basicConsume(queueName,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;public?class?Receive2?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?queueName?=?channel.queueDeclare().getQueue();channel.queueBind(queueName,?EXCHANGE_NAME,?"");System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");//?訂閱消息的回調函數DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received2?'"?+?message?+?"'");};//?消費者,有消息時出發訂閱回調函數channel.basicConsume(queueName,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;public?class?Sender?{private?static?final?String?EXCHANGE_NAME?=?"logs";public?static?void?main(String[]?argv)?throws?Exception?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");String?message?=?"publish?subscribe?message";channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes("UTF-8"));System.out.println("?[x]?Sent?'"?+?message?+?"'");channel.close();connection.close();} }

路由模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_routing";private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?指定路由的key,接收key和key2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key");channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key2");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});}} import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_routing2";private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?僅接收key2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key2");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?EXCHANGE_NAME?=?"exchange_direct";private?final?static?String?EXCHANGE_TYPE?=?"direct";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();//?交換機聲明channel.exchangeDeclare(EXCHANGE_NAME,?EXCHANGE_TYPE);//?只有routingKey相同的才會消費String?message?=?"routing?mode?message";channel.basicPublish(EXCHANGE_NAME,?"key2",?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'"); //????????channel.basicPublish(EXCHANGE_NAME,?"key",?null,?message.getBytes()); //????????System.out.println("[x]?Sent?'"?+?message?+?"'");channel.close();connection.close();} }

主題模式

import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver1?{private?final?static?String?QUEUE_NAME?=?"queue_topic";private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?可以接收key.1channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"key.*");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory; import?com.rabbitmq.client.DeliverCallback;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Receiver2?{private?final?static?String?QUEUE_NAME?=?"queue_topic2";private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";private?final?static?String?EXCHANGE_TYPE?=?"topic";public?static?void?main(String[]?args)?throws?IOException,?InterruptedException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);//?*號代表單個單詞,可以接收key.1channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"*.*");//?#號代表多個單詞,可以接收key.1.2channel.queueBind(QUEUE_NAME,?EXCHANGE_NAME,?"*.#");channel.basicQos(1);DeliverCallback?deliverCallback?=?(consumerTag,?delivery)?->?{String?message?=?new?String(delivery.getBody(),?"UTF-8");System.out.println("?[x]?Received?'"?+delivery.getEnvelope().getRoutingKey()?+?"':'"?+?message?+?"'");};channel.basicConsume(QUEUE_NAME,?true,?deliverCallback,?consumerTag?->?{});} } import?com.rabbitmq.client.Channel; import?com.rabbitmq.client.Connection; import?com.rabbitmq.client.ConnectionFactory;import?java.io.IOException; import?java.util.concurrent.TimeoutException;public?class?Sender?{private?final?static?String?EXCHANGE_NAME?=?"exchange_topic";private?final?static?String?EXCHANGE_TYPE?=?"topic";public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{ConnectionFactory?factory?=?new?ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);Connection?connection?=?factory.newConnection();Channel?channel?=?connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,?EXCHANGE_TYPE);String?message?=?"topics?model?message?with?key.1";channel.basicPublish(EXCHANGE_NAME,?"key.1",?null,?message.getBytes());System.out.println("[x]?Sent?'"?+?message?+?"'");String?message2?=?"topics?model?message?with?key.1.2";channel.basicPublish(EXCHANGE_NAME,?"key.1.2",?null,?message2.getBytes());System.out.println("[x]?Sent?'"?+?message2?+?"'");channel.close();connection.close();} }

四種交換機介紹

  • 直連交換機(Direct exchange): 具有路由功能的交換機,綁定到此交換機的時候需要指定一個routing_key,交換機發送消息的時候需要routing_key,會將消息發送道對應的隊列

  • 扇形交換機(Fanout exchange): 廣播消息到所有隊列,沒有任何處理,速度最快

  • 主題交換機(Topic exchange): 在直連交換機基礎上增加模式匹配,也就是對routing_key進行模式匹配,*代表一個單詞,#代表多個單詞

  • 首部交換機(Headers exchange): 忽略routing_key,使用Headers信息(一個Hash的數據結構)進行匹配,優勢在于可以有更多更靈活的匹配規則

總結

這么多種隊列模式中都有其應用場景,大家可以根據應用場景示例中進行選擇

參考

  • RabbitMQ官方教程

  • 官方教程源碼

總結

以上是生活随笔為你收集整理的RabbitMQ中7种消息队列和保姆级代码演示!的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲精品国产成人av在线 | 欧美在线不卡视频 | 天天操中文字幕 | 色播99| 91视频综合 | 亚洲精品网站在线播放gif | 高跟鞋调教—视频|vk | 18久久久 | 手机在线看永久av片免费 | 人妻洗澡被强公日日澡 | 台湾av在线播放 | 人妻少妇精品中文字幕av蜜桃 | 天天射综合网站 | 精品一区二区免费视频 | 91精品又粗又猛又爽 | 综合激情五月婷婷 | 亚洲一区二区三区免费观看 | 久草新| 欧美成人女星 | 狠狠的色| 日本婷婷| 特级淫片aaaaaaa级附近的 | 亚洲女优视频 | 久久免费的精品国产v∧ | 欧美激情精品久久久久久变态 | 亚洲一区二区日韩 | 极品少妇xxxx精品少妇偷拍 | 亚洲一二三级 | 夜夜操影视 | 亚洲性网站 | 黑人极品ⅴideos精品欧美棵 | 人妻无码久久精品人妻 | 免费人成在线 | 亚洲黄色网址 | 日本免费一级片 | 成人免费观看av | 亚洲精品理论片 | 色欲久久久天天天综合网 | 国产一区二区精品在线 | 爱上av | 亚洲专区免费 | av自拍一区 | 亚洲国产精品成人va在线观看 | 亚洲一区二区三区在线观看视频 | 免费不卡的av | 国产区一区二区三区 | 国产精品久久久久久免费免熟 | 国产男人天堂 | 国产喷水福利在线视频 | 成a人片亚洲日本久久 | 日日干夜夜干 | 国产成人一级片 | 91麻豆精品91久久久久同性 | 8mav在线| 成人久久一区二区 | 亚洲综合干 | 91插插插插插插插插 | 亚洲AV成人无码久久精品巨臀 | 日本高清xxxx | 伊人在线视频 | 国产精品美女一区二区 | 天堂av在线电影 | 少妇乱淫36部 | 色噜噜亚洲 | 国产一级大片在线观看 | 色老大影院 | 日本成人在线免费观看 | 亚洲人成色777777精品音频 | 欧美精品一区二区在线播放 | 国产精品久久久久野外 | 国产精品伦一区二区三区免费看 | 午夜资源| 伊人称影院 | 日韩中文电影 | 美女被娇喘流出白 | 久久亚洲AV无码专区成人国产 | 日韩国产欧美在线视频 | 日韩欧美高清一区 | 男男gay羞辱feet贱奴vk | 色婷婷国产精品视频 | 久久久精品人妻av一区二区三区 | 99精品视频免费 | 国产大片一区二区三区 | 日本国产一区 | 猎艳山村丰满少妇 | 精品一区二区视频 | 色噜噜一区二区三区 | 精品熟女一区 | 欧美精品色婷婷五月综合 | 日韩精品网站 | 日韩欧美一级大片 | 成人av影院在线观看 | 欧美精品在线播放 | 国产黄色三级网站 | 99视频国产精品 | 黄色激情小说视频 | 国产精品国产精品 | 91欧美国产 | 91色片|