日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...

發(fā)布時(shí)間:2024/7/23 编程问答 52 豆豆
生活随笔 收集整理的這篇文章主要介紹了 程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播... 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.背景

了解過RabbitMQ的Fanout模式,應(yīng)該知道它原本的Fanout模式就是用來做廣播的。但是它的廣播有一點(diǎn)區(qū)別,來回顧下它的含義:Fanout類型沒有路由鍵的概念,只要隊(duì)列綁定到了改exchange上面,就會(huì)接收到所有的消息。

使用過程一般就是先new 出一個(gè)Fanout類型的交換機(jī),然后往這個(gè)交換機(jī)上綁定多個(gè)隊(duì)列queue,不同的消費(fèi)者各自監(jiān)聽不同的隊(duì)列,這就實(shí)現(xiàn)了廣播效果,因?yàn)橥粋€(gè)消息,會(huì)分發(fā)到所有隊(duì)列中。

舉個(gè)例子:

應(yīng)用A監(jiān)聽了隊(duì)列A,應(yīng)用B監(jiān)聽了隊(duì)列B,Fanout類型交換機(jī)同時(shí)綁定了隊(duì)列A和B.假設(shè)生產(chǎn)者端發(fā)送了一條消息到Fanout類型交換機(jī),交換機(jī)就會(huì)把消息分發(fā)到所有隊(duì)列,這時(shí)應(yīng)用A和應(yīng)用B會(huì)收到同一條消息,這就是廣播。

說了上面一大堆,只是為了強(qiáng)調(diào),對(duì)于RabbitMQ的原本Fanout模式,它的設(shè)計(jì)就是多個(gè)消費(fèi)者必須監(jiān)聽不同的隊(duì)列,多個(gè)消費(fèi)者之間才會(huì)形成廣播關(guān)系。

那么問題來了,假如在Fanout工作模式下,多個(gè)消費(fèi)者同時(shí)監(jiān)聽的是同一個(gè)隊(duì)列,會(huì)怎樣?實(shí)踐過的同學(xué)應(yīng)該都知道,這種情況下,這些消費(fèi)者會(huì)形成競(jìng)爭(zhēng)關(guān)系,現(xiàn)象是同一個(gè)消息只會(huì)被其中一個(gè)消費(fèi)者接收,達(dá)不到廣播的效果。。

2.需求

假如現(xiàn)在有一個(gè)需求,要做到對(duì)同一個(gè)應(yīng)用的多個(gè)節(jié)點(diǎn)進(jìn)行廣播,怎么實(shí)現(xiàn)?

注意,這里所說的同一個(gè)應(yīng)用多個(gè)節(jié)點(diǎn),通俗點(diǎn)理解就是一個(gè)war包,布在多個(gè)服務(wù)器節(jié)點(diǎn)上。

在實(shí)際部署集群時(shí),為了高可用,同一個(gè)應(yīng)用可能會(huì)部署多個(gè)節(jié)點(diǎn),那假如工程里已經(jīng)通過配置定義某個(gè)隊(duì)列,那多個(gè)節(jié)點(diǎn)它們定義的隊(duì)列就會(huì)是相同的,那按照上面的背景,那這些節(jié)點(diǎn)間肯定就會(huì)存在競(jìng)爭(zhēng)關(guān)系,即便是Fanout模式的交換機(jī),一條消息也只能被其中一個(gè)節(jié)點(diǎn)接收,其他節(jié)點(diǎn)收不到,達(dá)不到廣播的效果。那該如何做?

相信看到這里,有人會(huì)問,為何會(huì)有 對(duì)同一個(gè)應(yīng)用的多個(gè)節(jié)點(diǎn)進(jìn)行廣播的需求場(chǎng)景?為什么要有這個(gè)需求。生產(chǎn)中的業(yè)務(wù)系統(tǒng)很多,自然而然場(chǎng)景就很多。

舉兩個(gè)經(jīng)典的例子:

1.想要同時(shí)刷新所有節(jié)點(diǎn)的緩存

業(yè)務(wù)系統(tǒng)離不開緩存,有時(shí)會(huì)用內(nèi)存緩存,假如我要刷新所有節(jié)點(diǎn)的內(nèi)存緩存,多個(gè)節(jié)點(diǎn)前可能有負(fù)載均衡例如nginx之類的,我只需要訪問其中一個(gè)節(jié)點(diǎn),然后讓這個(gè)節(jié)點(diǎn)做廣播通知所有其他節(jié)點(diǎn)刷緩存。(廣播刷緩存)

2.websocket會(huì)話尋找

websocket是比較受歡迎的實(shí)時(shí)消息推送方案。用過websocket應(yīng)該知道,websocket只能與多個(gè)節(jié)點(diǎn)中的其中一個(gè)節(jié)點(diǎn)做長連接會(huì)話保持,也就是說用戶的會(huì)話只會(huì)存在于一個(gè)節(jié)點(diǎn)上,假設(shè)服務(wù)端要主動(dòng)向用戶推一條消息,必須要知道用戶的會(huì)話在哪個(gè)節(jié)點(diǎn)上,怎么得知?可以通過廣播,通過消息廣播,把消息發(fā)到多個(gè)節(jié)點(diǎn)上,然后節(jié)點(diǎn)收到消息只需要判斷用戶會(huì)話是否就在本節(jié)點(diǎn)上,假如在則主動(dòng)推消息,不在,則丟棄這條消息。

類似上面這兩種需求,就需要用到廣播,并且是對(duì)同一個(gè)應(yīng)用的多個(gè)節(jié)點(diǎn)進(jìn)行廣播。當(dāng)然不用廣播肯定也有其他通知方案,本文我們只討論用MQ怎么做到。

3.思路

假如繼續(xù)用RabbitMQ的Fanout模式,怎么做到對(duì)同一個(gè)應(yīng)用的多個(gè)節(jié)點(diǎn)進(jìn)行廣播?

要起到廣播效果,關(guān)鍵就是讓多個(gè)應(yīng)用節(jié)點(diǎn)間不要存在競(jìng)爭(zhēng)關(guān)系或者存在競(jìng)爭(zhēng)關(guān)系時(shí)它們的消息怎么共享?可以從這兩個(gè)方向解決這個(gè)問題。

方法可能很多種,在這里,我只描述兩種比較容易實(shí)現(xiàn)的方案。

方案1

過程大致如下

  • 應(yīng)用啟動(dòng),多個(gè)節(jié)點(diǎn)監(jiān)聽同一個(gè)隊(duì)列(此時(shí)多個(gè)節(jié)點(diǎn)是競(jìng)爭(zhēng)關(guān)系,一條消息只會(huì)發(fā)到其中一個(gè)節(jié)點(diǎn)上)
  • 消息生產(chǎn)者發(fā)送消息,同一條消息只被其中一個(gè)節(jié)點(diǎn)收到
  • 收到消息的節(jié)點(diǎn)通過redis的發(fā)布訂閱模式來通知其他兄弟節(jié)點(diǎn)
  • 這種方案是最容易想到的,思路就是依賴其他組件來做消息共享,例如redis這種可以替換成其他方案,只要能做到消息共享就行,那么最終的效果就肯定是廣播效果了。

    方案2

    過程大致如下

  • 應(yīng)用啟動(dòng),利用監(jiān)聽器生成唯一ID
  • 生成的唯一ID,通過文件寫入的方式寫到配置文件中
  • spring啟動(dòng),把這個(gè)唯一ID加載為全局屬性(為何要用唯一ID,就是為了用這個(gè)ID作為該節(jié)點(diǎn)的監(jiān)聽隊(duì)列名,當(dāng)然前綴可以用相同的,后綴用唯一ID區(qū)分即可,舉個(gè)例子就是:節(jié)點(diǎn)1監(jiān)聽隊(duì)列 kunghsu-123 節(jié)點(diǎn)2監(jiān)聽隊(duì)列 kunghsu-456.必須保證它們的唯一ID是唯一的,不然還是會(huì)存在競(jìng)爭(zhēng)關(guān)系)
  • 多個(gè)節(jié)點(diǎn)監(jiān)聽了多個(gè)隊(duì)列(讓每個(gè)隊(duì)列名都不同,目的就是讓他們不存在競(jìng)爭(zhēng)關(guān)系,沒有競(jìng)爭(zhēng)關(guān)系就不用做消息共享,只管由MQ分發(fā)即可,這時(shí)同一條消息就會(huì)發(fā)到多個(gè)節(jié)點(diǎn)上)
  • 到MQ控制臺(tái),將所有節(jié)點(diǎn)生成的隊(duì)列手動(dòng)綁定到指定的Fanout交換機(jī)上(這一步是手動(dòng)的,當(dāng)然也可以通過API做到,下面會(huì)說到)
  • 生產(chǎn)者發(fā)送消息指定的Fanout交換機(jī),交換機(jī)將同一條消息被分發(fā)到多個(gè)節(jié)點(diǎn)上
  • 廣播效果達(dá)成!
  • 這種方案,也比較容易。這樣做,就是為了讓多個(gè)節(jié)點(diǎn)間是廣播關(guān)系。總的來說不麻煩,其中第五步手動(dòng)操作其實(shí)有點(diǎn)挫,這種手動(dòng)操作步驟其實(shí)是應(yīng)該轉(zhuǎn)成自動(dòng)化,讓應(yīng)用程序來完成,方便以后自動(dòng)化建設(shè)。

    這種方案的spring配置也比較簡(jiǎn)單,參考Fanout模式的配置即可。本文重點(diǎn)在這個(gè)思路的實(shí)現(xiàn)過程。

    只列舉部分代碼如下:

    消息生產(chǎn)者

    消息消費(fèi)者

    另外,RabbitMQ的客戶端API支持讓我們 將隊(duì)列綁定到指定的交換機(jī)上。具體可參考我的工具類代碼。

    代碼如下:

    package com.lunch.foo.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /*** Created by xuyaokun On 2019/3/10 2:26* @desc:*/ public class RabbitMQUtil { private static final String HOST = "192.168.3.128"; private static final int PORT = AMQP.PROTOCOL.PORT; private static final String USERNAME = "kunghsu"; private static final String PASSWORD = "123456"; private static final String VIRTUALHOST = "/"; public static void main(String[] args) { String QUEUE_NAME = "queueOneX"; String EXCHANGE_NAME = "exchangeFour"; try { queueBind(EXCHANGE_NAME, QUEUE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } /*** 獲取會(huì)話鏈接** @return* @throws IOException* @throws TimeoutException*/ private static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUALHOST); return factory.newConnection(); } /*** 綁定隊(duì)列到指定交換機(jī)** @param exchangeName* @param queueName* @throws IOException* @throws TimeoutException*/ public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException { Channel channel = null; try{ channel = getConnection().createChannel(); } catch(Exception e){ System.out.println("獲取RabbitMQ會(huì)話連接失敗!取消做隊(duì)列綁定。"); return ; } //默認(rèn)持久化 channel.queueDeclare(queueName, true, false, false, null); // 聲明交換機(jī):指定交換機(jī)的名稱和類型(廣播:fanout) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true); // 在消費(fèi)者端隊(duì)列綁定 channel.queueBind(queueName, exchangeName, ""); channel.close(); } }

    總結(jié)

    RabbitMQ的Fanout模式相關(guān)的文章,網(wǎng)上一抓一大把,但是幾乎沒有人講到 如何實(shí)現(xiàn) 對(duì)同一個(gè)應(yīng)用的多個(gè)節(jié)點(diǎn)進(jìn)行廣播。。希望通過這篇文章,能幫助到有需要的同學(xué)。另外,假如大家有更好的方案,歡迎交流。感謝閱讀!

    歡迎工作一到五年的Java工程師朋友們加入Java程序員開發(fā): 721575865

    群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來學(xué)習(xí)提升自己,不要再用"沒有時(shí)間“來掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來的自己一個(gè)交代!

    總結(jié)

    以上是生活随笔為你收集整理的程序发出的广播其他程序收不到_RabbitMQ 如何实现对同一个应用的多个节点进行广播...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。