程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...
1.背景
了解過RabbitMQ的Fanout模式,應該知道它原本的Fanout模式就是用來做廣播的。但是它的廣播有一點區別,來回顧下它的含義:Fanout類型沒有路由鍵的概念,只要隊列綁定到了改exchange上面,就會接收到所有的消息。
使用過程一般就是先new 出一個Fanout類型的交換機,然后往這個交換機上綁定多個隊列queue,不同的消費者各自監聽不同的隊列,這就實現了廣播效果,因為同一個消息,會分發到所有隊列中。
舉個例子:
應用A監聽了隊列A,應用B監聽了隊列B,Fanout類型交換機同時綁定了隊列A和B.假設生產者端發送了一條消息到Fanout類型交換機,交換機就會把消息分發到所有隊列,這時應用A和應用B會收到同一條消息,這就是廣播。
說了上面一大堆,只是為了強調,對于RabbitMQ的原本Fanout模式,它的設計就是多個消費者必須監聽不同的隊列,多個消費者之間才會形成廣播關系。
那么問題來了,假如在Fanout工作模式下,多個消費者同時監聽的是同一個隊列,會怎樣?實踐過的同學應該都知道,這種情況下,這些消費者會形成競爭關系,現象是同一個消息只會被其中一個消費者接收,達不到廣播的效果。。
2.需求
假如現在有一個需求,要做到對同一個應用的多個節點進行廣播,怎么實現?
注意,這里所說的同一個應用多個節點,通俗點理解就是一個war包,布在多個服務器節點上。
在實際部署集群時,為了高可用,同一個應用可能會部署多個節點,那假如工程里已經通過配置定義某個隊列,那多個節點它們定義的隊列就會是相同的,那按照上面的背景,那這些節點間肯定就會存在競爭關系,即便是Fanout模式的交換機,一條消息也只能被其中一個節點接收,其他節點收不到,達不到廣播的效果。那該如何做?
相信看到這里,有人會問,為何會有 對同一個應用的多個節點進行廣播的需求場景?為什么要有這個需求。生產中的業務系統很多,自然而然場景就很多。
舉兩個經典的例子:
1.想要同時刷新所有節點的緩存
業務系統離不開緩存,有時會用內存緩存,假如我要刷新所有節點的內存緩存,多個節點前可能有負載均衡例如nginx之類的,我只需要訪問其中一個節點,然后讓這個節點做廣播通知所有其他節點刷緩存。(廣播刷緩存)
2.websocket會話尋找
websocket是比較受歡迎的實時消息推送方案。用過websocket應該知道,websocket只能與多個節點中的其中一個節點做長連接會話保持,也就是說用戶的會話只會存在于一個節點上,假設服務端要主動向用戶推一條消息,必須要知道用戶的會話在哪個節點上,怎么得知?可以通過廣播,通過消息廣播,把消息發到多個節點上,然后節點收到消息只需要判斷用戶會話是否就在本節點上,假如在則主動推消息,不在,則丟棄這條消息。
類似上面這兩種需求,就需要用到廣播,并且是對同一個應用的多個節點進行廣播。當然不用廣播肯定也有其他通知方案,本文我們只討論用MQ怎么做到。
3.思路
假如繼續用RabbitMQ的Fanout模式,怎么做到對同一個應用的多個節點進行廣播?
要起到廣播效果,關鍵就是讓多個應用節點間不要存在競爭關系或者存在競爭關系時它們的消息怎么共享?可以從這兩個方向解決這個問題。
方法可能很多種,在這里,我只描述兩種比較容易實現的方案。
方案1
過程大致如下
這種方案是最容易想到的,思路就是依賴其他組件來做消息共享,例如redis這種可以替換成其他方案,只要能做到消息共享就行,那么最終的效果就肯定是廣播效果了。
方案2
過程大致如下
這種方案,也比較容易。這樣做,就是為了讓多個節點間是廣播關系。總的來說不麻煩,其中第五步手動操作其實有點挫,這種手動操作步驟其實是應該轉成自動化,讓應用程序來完成,方便以后自動化建設。
這種方案的spring配置也比較簡單,參考Fanout模式的配置即可。本文重點在這個思路的實現過程。
只列舉部分代碼如下:
消息生產者
消息消費者
另外,RabbitMQ的客戶端API支持讓我們 將隊列綁定到指定的交換機上。具體可參考我的工具類代碼。
代碼如下:
package com.lunch.foo.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*** Created by xuyaokun On 2019/3/10 2:26* @desc:*/ public class RabbitMQUtil { private static final String HOST = "192.168.3.128"; private static final int PORT = AMQP.PROTOCOL.PORT; private static final String USERNAME = "kunghsu"; private static final String PASSWORD = "123456"; private static final String VIRTUALHOST = "/"; public static void main(String[] args) { String QUEUE_NAME = "queueOneX"; String EXCHANGE_NAME = "exchangeFour"; try { queueBind(EXCHANGE_NAME, QUEUE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } /*** 獲取會話鏈接** @return* @throws IOException* @throws TimeoutException*/ private static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUALHOST); return factory.newConnection(); } /*** 綁定隊列到指定交換機** @param exchangeName* @param queueName* @throws IOException* @throws TimeoutException*/ public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException { Channel channel = null; try{ channel = getConnection().createChannel(); } catch(Exception e){ System.out.println("獲取RabbitMQ會話連接失敗!取消做隊列綁定。"); return ; } //默認持久化 channel.queueDeclare(queueName, true, false, false, null); // 聲明交換機:指定交換機的名稱和類型(廣播:fanout) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true); // 在消費者端隊列綁定 channel.queueBind(queueName, exchangeName, ""); channel.close(); } }總結
RabbitMQ的Fanout模式相關的文章,網上一抓一大把,但是幾乎沒有人講到 如何實現 對同一個應用的多個節點進行廣播。。希望通過這篇文章,能幫助到有需要的同學。另外,假如大家有更好的方案,歡迎交流。感謝閱讀!
歡迎工作一到五年的Java工程師朋友們加入Java程序員開發: 721575865
群內提供免費的Java架構學習資料(里面有高可用、高并發、高性能及分布式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用自己每一分每一秒的時間來學習提升自己,不要再用"沒有時間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個交代!
總結
以上是生活随笔為你收集整理的程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 台式机dp接口_精品导购:你想要的商务台
- 下一篇: 计算机主机核心通常包括,计算机一级考试题