日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ集群

發布時間:2025/3/15 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ集群 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.ActiveMQ集群介紹

1.為什么要集群?   

  實現高可用,以排除單點故障引起的服務中斷

  實現負載均衡,以提升效率為更多客戶提供服務

2.集群方式

  客戶端集群:讓多個消費者消費同一個隊列

  Broker Cluster:多個Broker之間同步消息(做不了高可用,可以實現負載均衡)

  Master-Slave:高可用(做不了負載均衡)

3.ActiveMq失效轉移

  允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接其他消息服務器。

  語法:failover:(uri1,...uriN)?transportOptions

    transportOptions參數說明:randomize:默認為true,表示在URI列表中選擇URI連接時是否采用隨機策略

                initialReconnectDelay默認為10,默認10毫秒,表示第一次嘗試重連之間等待的時間。

                maxReconnectDelay:默認30000,單位毫秒,最長重連時間的間隔

?4.Master/Slave集群配置

  Share nothing storage master/slave(已經過時,5.8之后移除)

  shared storage master/slave 共享存儲(實際上是共享同一文件夾,只不過采用排他鎖,所以只有一個master節點可以訪問,當此服務宕機,另一臺slave可以快速強占排他鎖,所以不會造成數據丟失,使用同一文件夾下的東西。如果多態服務器的話需要搭建文件共享服務器)

  Replicated LevelDB Store 基于復制的LevelDB Store

1.共享存儲原理:(獲取排他鎖才可以提供消息服務)--簡單方式

    

?2.Replicated LevelDB Store 基于復制的LevelDB Store原理(基于zookeper)

3.兩種方式對比:

?高可用負載均衡
Master/Slave
Broker Cluster

?

4.三臺機器的完美集群方案:(實現高可用和負載均衡)

?

2.ActiveMQ集群配置

1.方案介紹

?方案如下: B與C采用master-slave共享文件夾存儲(任一時刻只有一個可以占有排他鎖,也就是只有一個可以提供服務),當A宕機之后B會獲取資源鎖提供服務---實現高可用

    ? ?A與B、A與C都采用Broker 集群,可以同時提供服務,不管B與C誰獲取鎖,A都可以提供服務(A只能提供服務消費消息,不生成消息)---實現負載均衡

  

?

配置如下:(同一個電腦不同端口模擬集群)

    

?

?2.配置文件?(修改的配置文件都是在apache-activemq-5.15.6\conf文件夾下)

?  

?

(1)activemq-a下面的配置:---A服務器

activemq.xml:(注釋掉其他協議,服務端口使用61616端口;增加靜態網絡連接器,同時連接B與C)

?jetty.xml:? 采用默認的8161端口

?

(2)activemq-b下面的配置:---B服務器

?activemq.xml:(增加與A的靜態連接器,修改服務端口采用61617,修改共享文件夾的地址)

?

?

?

jitty.xml:修改端口采用8162

?

?

(3)activemq-c下面的配置:---C服務器

?activemq.xml:(增加與A的靜態連接器,修改服務端口采用61618,修改共享文件夾的地址)

?

?

jitty.xml:修改端口采用8163

?

3.依次啟動ActiveMq進行測試?

  啟動順序是:A->B->C,下面通過端口信息驗證集群:

liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2423756TCP [::]:8162 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP 127.0.0.1:55338 127.0.0.1:61616 ESTABLISHED 2423756TCP 127.0.0.1:61616 127.0.0.1:55338 ESTABLISHED 2423496TCP [::]:61616 [::]:0 LISTENING 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2423756TCP 127.0.0.1:55345 127.0.0.1:61617 ESTABLISHED 2423496TCP 127.0.0.1:61617 127.0.0.1:55345 ESTABLISHED 2423756TCP [::]:61617 [::]:0 LISTENING 2423756 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618

  由于先啟動的B,B占有排斥鎖,所以B(8162-61617)處于監聽狀態,而C與B采用共享文件排他鎖集群,所以C處于阻塞狀態,也就是沒有監聽端口,被阻塞。

?

 停掉B服務器,模擬B服務器宕機的情況,再次查看端口:

liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2423848TCP [::]:8163 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP 127.0.0.1:56704 127.0.0.1:61616 ESTABLISHED 2423848TCP 127.0.0.1:61616 127.0.0.1:56704 ESTABLISHED 2423496TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618TCP 0.0.0.0:61618 0.0.0.0:0 LISTENING 2423848TCP 127.0.0.1:56791 127.0.0.1:61618 ESTABLISHED 2423496TCP 127.0.0.1:61618 127.0.0.1:56791 ESTABLISHED 2423848TCP [::]:61618 [::]:0 LISTENING 2423848 liqiang@root MINGW64 ~/Desktop

  由于B宕機,所以C會強占排他鎖,也就是會監聽端口,所以可以看到8163與61618端口的監聽狀態。

?

  現在模擬C也宕機,將C服務器也停掉。再次查看端口信息:

liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2423496TCP [::]:8161 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2423496TCP [::]:61616 [::]:0 LISTENING 2423496 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 127.0.0.1:57242 127.0.0.1:61617 SYN_SENT 2423496liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop

?

  此時A服務器仍然在監聽端口。

  上面證明集群搭建成功,下面重新開啟A->B->C服務器開始程序驗證。

4.程序驗證集群

服務器開啟順序性:A->B->C(此時B占有排他鎖,B監聽端口,A一直監聽)

liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61616TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING 2428232TCP 127.0.0.1:60520 127.0.0.1:61616 ESTABLISHED 2427192TCP 127.0.0.1:61616 127.0.0.1:60520 ESTABLISHED 2428232TCP [::]:61616 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61617TCP 0.0.0.0:61617 0.0.0.0:0 LISTENING 2427192TCP 127.0.0.1:60513 127.0.0.1:61617 ESTABLISHED 2428232TCP 127.0.0.1:61617 127.0.0.1:60513 ESTABLISHED 2427192TCP [::]:61617 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 61618liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2428232TCP [::]:8161 [::]:0 LISTENING 2428232 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162TCP 0.0.0.0:8162 0.0.0.0:0 LISTENING 2427192TCP [::]:8162 [::]:0 LISTENING 2427192 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163

?

代碼測試:

生產者:? url加了失效策略,而且采用隨機選取,生產消息的地址只有B與C服務器地址

package cn.qlq.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 生產消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:04:41*/ public class MsgProducer {private static final String url = "failover:(tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1創建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory創建connectionConnection connection = connectionFactory.createConnection();// 3.啟動connection connection.start();// 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.創建Destination(Queue繼承Queue)Queue destination = session.createQueue(queueName);// 6.創建生產者producerMessageProducer producer = session.createProducer(destination);for (int i = 0; i < 100; i++) {// 7.創建Message,有好多類型,這里用最簡單的TextMessageTextMessage tms = session.createTextMessage("textMessage:" + i);// 8.生產者發送消息 producer.send(tms);System.out.println("send:" + tms.getText());}// 9.關閉connection connection.close();}}

?

消費者:url加了失效策略,而且采用隨機選取,生產消息的地址有ABC服務器

package cn.qlq.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;/*** 消費消息* * @author QiaoLiQiang* @time 2018年9月18日下午11:26:41*/ public class MsgConsumer {private static final String url = "failover:(tcp://localhost:61616,tcp://localhost:61617,tcp://localhost:61618)?randomize=true";private static final String queueName = "myQueue";public static void main(String[] args) throws JMSException {// 1創建ConnectionFactoryConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);// 2.由connectionFactory創建connectionConnection connection = connectionFactory.createConnection();// 3.啟動connection connection.start();// 4.創建Session===第一個參數是是否事務管理,第二個參數是應答模式Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5.創建Destination(Queue繼承Queue)Queue destination = session.createQueue(queueName);// 6.創建消費者consumerMessageConsumer consumer = session.createConsumer(destination);// 7.給消費者綁定監聽器(消息的監聽是一個異步的過程,不可以關閉連接,綁定監聽器線程是一直開啟的,處于阻塞狀態,所以可以在程序退出關閉)consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {// 7.1由于消費者接受的是TextMessage,所以強轉一下TextMessage tms = (TextMessage) message;try {System.out.println("接收消息:" + tms.getText());} catch (JMSException e) {e.printStackTrace();}}});}}

?

啟動生產者發布100條消息:

管理界面查看:(消息被發布在B服務器,B占有排他鎖,C處于阻塞)

?

?A服務不生產消息所有A不會消費消息:(通過A查看網絡連接器)

?

關掉B服務器之后查看C服務器:(驗證B與C共享同一文件夾,且強占排他鎖)

?

查看共享文件夾:

?

?此時A與C處于監聽狀態,啟動消費者:

liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8161TCP 0.0.0.0:8161 0.0.0.0:0 LISTENING 2431676TCP [::]:8161 [::]:0 LISTENING 2431676 liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8162liqiang@root MINGW64 ~/Desktop $ netstat -ano |findstr 8163TCP 0.0.0.0:8163 0.0.0.0:0 LISTENING 2432188TCP [::]:8163 [::]:0 LISTENING 2432188TCP [::1]:8163 [::1]:63600 TIME_WAIT 0TCP [::1]:8163 [::1]:63648 TIME_WAIT 0TCP [::1]:8163 [::1]:63649 TIME_WAIT 0TCP [::1]:8163 [::1]:63650 TIME_WAIT 0TCP [::1]:8163 [::1]:63651 TIME_WAIT 0TCP [::1]:8163 [::1]:63652 TIME_WAIT 0

?

查看控制臺如下:連接到A服務器消費信息

?

?總結:上面的配置方案B與C是為了實現高可用,也就是一臺宕機之后另一臺馬上強占排他鎖提供服務(需要共享文件夾實現共用同一文件夾下的資源與鎖),B與C可以生產消息,也可以提供消費消息,但是同一時刻只有一個提供服務。

  ? ?提供A是為了與B、C實現負載均衡,A不生產消息,但是可以消費消息,替B或者C分擔壓力。

?

轉載于:https://www.cnblogs.com/qlqwjy/p/9728425.html

總結

以上是生活随笔為你收集整理的ActiveMQ集群的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。