日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMq消费端实现集群部署

發布時間:2024/9/30 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMq消费端实现集群部署 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.問題背景

一個事件中心接收網關通過ActiveMq上報的告警事件,處理后持久化到數據庫,消息模型為發布訂閱模式。為了實現高可用,決定將事件中心進行集群部署,運行兩個實例。
但是由于消息模型為發布/訂閱(publish/subscribe,topic),每個eps實例都會收到告警消息。如不加以控制,勢必會造成告警消息重復消費的問題。

即我們需要不同的應用系統關心相同的消息,同時單個應用系統內部又可以部署多個實例達到負載均衡和故障轉移,提高系統的健壯性。

2.解決思路

我們首先想到可以通過消費端增加檢測重復消息的邏輯,來解決重復消費的問題,但是這種方式增加額外判斷邏輯,且浪費消費端性能,并不可取。
實際上我們可以通過ActiveMq的一些高級特性來很好的解決此問題。ActiveMq提供了虛擬主題(Virtual Topics)的功能,如下圖:

即通過一些配置后,單個應用系統內多個實例監聽同一個queue,實例之間即可對消息進行平均消費,共同承擔消費的功能。如果其中一個eps實例宕機,其他實例仍可以正常消費消息,消息不會丟失遺漏。
Virtual Topic這個功能特性可以關閉,即useVirtualTopics屬性,默認為true,設置為false即可關閉此功能。當此功能開啟,并且使用了持久化的存儲時,ActiveMq的broker啟動的時候會從持久化存儲里拿到所有的Topic的名稱,如果名稱模式與VirtualTopics匹配,則把它們添加到系統的Virtual Topics列表中去。當有consumer訪問此VirtualTopics時,系統會自動創建持久化的queue,并在每次Topic收到消息時,分發到具體的queue。

3.使用方法

虛擬主題有兩種使用方式,下面分別進行介紹。

3.1 topic-queue名稱匹配方式

此種方式需要發布者發布的Topic以“VirtualTopic.”這樣的前綴來命名(大小寫敏感)。比如我們定義一個VirtualTopic.event,然后發布者將消息發布到VirtualTopic.event。訂閱者需要訂閱名稱為”Consumer.*. VirtualTopic.event”這樣的隊列。
下面使用springboot搭建工程進行演示,由于配置簡單只涉及到topic和queue的名稱,下面只顯示關鍵步驟。
首先發布者創建名稱VirtualTopic.event的topic,然后每隔3秒向此topic發布一條消息,

@Service public class JmsSendMessageDemo {@Autowiredprivate JmsTemplate jmsTemplate;private Integer i = 0;@Scheduled(fixedRate = 3000)public void sendMessage(){i++;Event event = new Event("觸碰", i, new Date());jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.event"),JsonUtil.object2Json(event));//System.out.println("發送消息: "+event);}}

然后建立兩個應用作為消費者分別監聽名為“Consumer.A.VirtualTopic.event“的queue,
第一個消費者:

@Service public class JmsMessageListener {@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")public void receiveMessage(String message){System.out.println("1 接收到消息: "+JsonUtil.json2Object(message,Event.class));} }

第二個消費者:

@Service public class JmsMessageListener {@JmsListener(destination = "Consumer.A.VirtualTopic.event",containerFactory = "QueueContainerFactory")public void receiveMessage(String message){System.out.println(" 2 接收到消息: "+JsonUtil.json2Object(message,Event.class));} }

最后啟動兩個消費者和發布者,通過兩個消費者的輸出窗口可以看到他們共同承擔了消息的消費:

1 接收到消息: Event{name='觸碰', id=2, date=Tue Aug 28 21:18:50 CST 2018} 1 接收到消息: Event{name='觸碰', id=4, date=Tue Aug 28 21:18:56 CST 2018} 1 接收到消息: Event{name='觸碰', id=6, date=Tue Aug 28 21:19:02 CST 2018} 1 接收到消息: Event{name='觸碰', id=8, date=Tue Aug 28 21:19:08 CST 2018} 1 接收到消息: Event{name='觸碰', id=10, date=Tue Aug 28 21:19:14 CST 2018} 1 接收到消息: Event{name='觸碰', id=12, date=Tue Aug 28 21:19:20 CST 2018} 1 接收到消息: Event{name='觸碰', id=14, date=Tue Aug 28 21:19:26 CST 2018} 1 接收到消息: Event{name='觸碰', id=16, date=Tue Aug 28 21:19:32 CST 2018} 1 接收到消息: Event{name='觸碰', id=18, date=Tue Aug 28 21:19:38 CST 2018} 2 接收到消息: Event{name='觸碰', id=1, date=Tue Aug 28 21:18:47 CST 2018}2 接收到消息: Event{name='觸碰', id=3, date=Tue Aug 28 21:18:53 CST 2018}2 接收到消息: Event{name='觸碰', id=5, date=Tue Aug 28 21:18:59 CST 2018}2 接收到消息: Event{name='觸碰', id=7, date=Tue Aug 28 21:19:05 CST 2018}2 接收到消息: Event{name='觸碰', id=9, date=Tue Aug 28 21:19:11 CST 2018}2 接收到消息: Event{name='觸碰', id=11, date=Tue Aug 28 21:19:17 CST 2018}2 接收到消息: Event{name='觸碰', id=13, date=Tue Aug 28 21:19:23 CST 2018}2 接收到消息: Event{name='觸碰', id=15, date=Tue Aug 28 21:19:29 CST 2018}2 接收到消息: Event{name='觸碰', id=17, date=Tue Aug 28 21:19:35 CST 2018}2 接收到消息: Event{name='觸碰', id=19, date=Tue Aug 28 21:19:41 CST 2018}2 接收到消息: Event{name='觸碰', id=21, date=Tue Aug 28 21:19:47 CST 2018}2 接收到消息: Event{name='觸碰', id=23, date=Tue Aug 28 21:19:53 CST 2018}

然后我們將消費者2關閉(宕機),可以看到消費者1消費了所有的消息

1 接收到消息: Event{name='觸碰', id=134, date=Tue Aug 28 21:25:26 CST 2018} 1 接收到消息: Event{name='觸碰', id=135, date=Tue Aug 28 21:25:29 CST 2018} 1 接收到消息: Event{name='觸碰', id=136, date=Tue Aug 28 21:25:32 CST 2018} 1 接收到消息: Event{name='觸碰', id=137, date=Tue Aug 28 21:25:35 CST 2018} 1 接收到消息: Event{name='觸碰', id=138, date=Tue Aug 28 21:25:38 CST 2018}

通過以上試驗,我們看到通過虛擬主題的方式,兩個實例可以通過監聽同一個queue來平均消費消息,如果其中一個宕機,另一個會承擔起所有的消息消費。
上面的方式需要發布者和訂閱者統一對命名規范,如果發布者和訂閱者已經存在,就需要統一升級,比較麻煩。實際上我們還可以攔截器的方法。

3.2 攔截器方式

這種方式需要修改ActiveMq的配置文件/conf/activemq.xml,添加以下攔截配置:

<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="event" prefix="demo.*." selectorAware="false"/> </virtualDestinations></virtualDestinationInterceptor> </destinationInterceptors>

然后發布者只需要往event主題上發布消息。訂閱者通過訂閱類似demo.A. event的隊列的方式來消費。其他沒有集群部署的應用仍可以訂閱event主題進行消費。
此種方式的優點在于我們不需要對發布者和不需要改造的訂閱者做任何變動,需要增加或者改造的訂閱者使用虛擬主題的方式進行訂閱即可達到負載均衡和故障轉移的目的,當無法約束發布者發布的topic規范時,這種方式很有用。當然缺點就是需要修改ActiveMQ的配置,也就是說需要重啟ActiveMQ,這對于已經上線的平臺來說可能造成消息丟失。

參考:虛擬主題開發

總結

以上是生活随笔為你收集整理的ActiveMq消费端实现集群部署的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。