日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

activemq mysql集群配置_ActiveMQ专题--集群,高可用方案

發布時間:2025/3/12 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activemq mysql集群配置_ActiveMQ专题--集群,高可用方案 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ActiveMQ集群部署方式

Master-Slave部署方式

Broker-Cluster部署方式

Master-Slave與Broker-Cluster相結合的部署方式

Shared database Master-Slave方式

image.png

實戰

環境

centos 7

在服務器器部署兩個節點,一個是A節點只需要添加配置信息,B節點需要修改相應的端口。

使用數據庫進行消息持久化

引入數據庫驅動包和數據庫連接池mysql驅動包

把數據庫驅動放到activemq目錄下 lib/extra

如:mysql-connector-java-5.1.41.jar

image.png

修改activemq.xml文件

開啟持久化

A節點配置

image.png

persistent="true"

activemq默認是用的kahadb的持久化方式,下面修改為mysql的持久化方案

image.png

配置數據源bean

image.png

B節點也進行相應的修改

image.png

image.png

啟動服務

通過jcmd查看java進程,發現并沒有啟動成功

查看日志錯誤信息如下:

image.png

重新修改配置文件

使用java代碼進行測試

consumer

package com.study.mq.b2_clustering;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

// http://activemq.apache.org/failover-transport-reference.html

public class ConsumerFailover {

public static void main(String[] args) throws InterruptedException {

// 非failover的公共參數配置通過nested.*,例如 failover:(...)?nested.wireFormat.maxInactivityDuration=1000

// ?randomize=false 隨機選擇,默認是順序

// 指定優先切換 failover:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)?priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616

// maxReconnectDelay重連的最大間隔時間(毫秒)

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616,tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";

ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue-cluster02");

queue1.start();

queue1.join();

}

}

class ConsumerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ConsumerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

MessageConsumer consumer;

try {

// brokerURL http://activemq.apache.org/connection-configuration-uri.html

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

// 2、創建連接對象

conn = connectionFactory.createConnection();

conn.start(); // 一定要啟動

// 3、創建會話(可以創建一個或者多個session)

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點接收的目標,queue - 點對點

Destination destination = session.createQueue(destinationUrl);

// 5、創建消費者消息 http://activemq.apache.org/destination-options.html

consumer = session.createConsumer(destination);

// 6、接收消息

consumer.setMessageListener(message -> {

try {

if (message instanceof TextMessage) {

System.out.println("收到文本消息:" + ((TextMessage) message).getText());

} else {

System.out.println(message);

}

} catch (JMSException e) {

e.printStackTrace();

}

});

} catch (JMSException e) {

e.printStackTrace();

}

}

}

provider

package com.study.mq.b2_clustering;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**

* 簡單生產者

*/

public class Producer {

public static void main(String[] args) {

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616,tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";

new ProducerThread(brokerUrl, "queue-cluster02").start();

}

static class ProducerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ProducerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

try {

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

// 2、創建連接對象md

conn = connectionFactory.createConnection();

conn.start();

// 3、創建會話

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點發送的目標

Destination destination = session.createQueue(destinationUrl);

// 5、創建生產者消息

MessageProducer producer = session.createProducer(destination);

// 設置生產者的模式,有兩種可選 持久化 / 不持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 6、創建一條文本消息

String text = "Hello world!";

TextMessage message = session.createTextMessage(text);

for (int i = 0; i < 1; i++) {

// 7、發送消息

producer.send(message);

}

// 8、 關閉連接

session.close();

conn.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

測試

可以啟動consumer,然后收到關閉消費者連接的activemq ,這里我需要關閉A節點。

image.png

Replicaterd LevelDB Store方式(棄用)

image.png

Broker-Cluster 部署方式

image.png

搭建環境

準備在一臺服務器上搭建兩個節點A,B

A:不修改其他端口,只添加配置信息。

B:需要修改所有的端口信息。

修改配置文件

conf/activemq.xml文件下的標簽中添加以下代碼

image.png

節點B的其他端口也需要修改

image.png

jetty.xml 的web頁面訪問端口也進行了修改

image.png

image.png

節點B也需要添加代碼到

image.png

啟動A,B兩節點

tail -f /var/activemq/data/activemq.log 查看日志信息

查看A節點的啟動日志

image.png

可以發現B節點并沒有啟動成功

B節點的日志信息:

image.png

重新修改B節點的端口:

image.png

使用jcmd命令

image.png

看到進程以及啟動成功了

查看A節點日志:

image.png

通過日志可以看出啟動成功了,并成功連接B節點了。

總結

修改配置前一定需要停止activemq服務,安裝過程遇到問題,可以查看日志。

Activemq集群是通過一種特殊的隊列進行集群操作的。

image.png

使用JAVA代碼測試Broker-Cluster集群模式

provider

使用A節點生產數據

package com.study.mq.b2_clustering.network_connector;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**

* 簡單生產者

*/

public class Producer {

public static void main(String[] args) {

// 生產者用A節點 activemq.tkxb.com:61616

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616)?initialReconnectDelay=100";

new ProducerThread(brokerUrl, "queue-cluster01").start();

}

static class ProducerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ProducerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

try {

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

// 2、創建連接對象md

conn = connectionFactory.createConnection();

conn.start();

// 3、創建會話

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點發送的目標

Destination destination = session.createQueue(destinationUrl);

// 5、創建生產者消息

MessageProducer producer = session.createProducer(destination);

// 設置生產者的模式,有兩種可選 持久化 / 不持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 6、創建一條文本消息

String text = "Hello world!";

TextMessage message = session.createTextMessage(text);

for (int i = 0; i < 1; i++) {

// 7、發送消息

producer.send(message);

}

// 8、 關閉連接

session.close();

conn.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

consumer

使用B節點消費數據

package com.study.mq.b2_clustering.network_connector;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

// http://activemq.apache.org/networks-of-brokers.html

public class ConsumerNetowork {

public static void main(String[] args) throws InterruptedException {

// 消費者用A節點 activemq.tkxb.com:61616

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";

ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue-cluster01");

queue1.start();

queue1.join();

}

}

class ConsumerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ConsumerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

MessageConsumer consumer;

try {

// brokerURL http://activemq.apache.org/connection-configuration-uri.html

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

// 2、創建連接對象

conn = connectionFactory.createConnection();

conn.start(); // 一定要啟動

// 3、創建會話(可以創建一個或者多個session)

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點接收的目標,queue - 點對點

Destination destination = session.createQueue(destinationUrl);

// 5、創建消費者消息 http://activemq.apache.org/destination-options.html

consumer = session.createConsumer(destination);

// 6、接收消息

consumer.setMessageListener(message -> {

try {

if (message instanceof TextMessage) {

System.out.println("收到文本消息:" + ((TextMessage) message).getText());

} else {

System.out.println(message);

}

} catch (JMSException e) {

e.printStackTrace();

}

});

} catch (JMSException e) {

e.printStackTrace();

}

}

}

Master-Slave與Broker-Cluster結合

image.png

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的activemq mysql集群配置_ActiveMQ专题--集群,高可用方案的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 91正在播放 | 正在播放老肥熟妇露脸 | 欧美熟女一区二区 | 91精品免费观看 | 欧美日本中文 | 性感美女高潮 | 国产精品一区二区三区四区 | 欧美福利视频一区二区 | 高清国产午夜精品久久久久久 | 精品黑人一区二区三区在线观看 | 国产女教师一区二区三区 | 中国色视频| 欧美精品一区二区三区四区 | 国产精选网站 | 国产精品综合网 | 国产人妻久久精品一区二区三区 | 2018国产大陆天天弄 | 亚洲一二三区av | 亚洲天堂av在线免费观看 | 一区二区在线观看视频 | 你懂的网站在线 | 久久成人小视频 | 超碰美女在线 | 丰满岳乱妇在线观看中字无码 | 日本不卡中文字幕 | 成人教育av| 黄色片中文字幕 | 国产片黄色 | 九色.com| 国产又大又黄又粗 | 亚洲欧美日本一区二区三区 | 亚洲人和日本人hd | 中文字幕一区二区三区手机版 | 韩国伦理中文字幕 | 国产精品无码久久久久久 | 欧美专区亚洲专区 | 国产美女精品久久久 | 免费av在线网址 | 偷看洗澡一二三区美女 | 久久久久久久久影院 | 九九色九九| 无码人中文字幕 | 79日本xxxxxxxxx18 婷婷亚洲一区 | 国产剧情在线一区 | av网站在线看 | 美妇av| 中文字幕一区二区三区四区五区 | av综合站| 特黄大片又粗又大又暴 | 久久免费在线观看视频 | 日韩欧美一区二区在线 | 少妇又色又紧又黄又刺激免费 | 性色网站 | 日本免费成人 | 国产免费又粗又猛又爽 | 一区二区三区伦理片 | 免费在线观看黄 | av手机免费观看 | 水蜜桃影库 | 人妻91麻豆一区二区三区 | 国产真人无码作爱视频免费 | 亚洲影音 | 亚洲天堂一区 | 美国性生活大片 | 中文字幕无人区二 | 黄瓜视频污在线观看 | 中文字幕欧美色图 | 日本韩国在线观看 | 一区二区三区免费毛片 | 精品久久久久久久久久久久久久久久久久 | 综合色站导航 | 少妇人妻邻居 | 日本中文字幕在线免费观看 | 精品视频久久 | 四虎新网址 | 天天插天天爽 | 九九福利视频 | 久久久久久久久久久国产精品 | 亚洲电影在线看 | 日韩美女毛片 | 99福利视频导航 | 黄色香港三级三级三级 | 色av免费| 日日操网| 在线看片日韩 | 91豆花视频| 国产女人和拘做受视频免费 | 黄色av网站免费观看 | 免费美女av | 国产成人无码一二三区视频 | 欧美视频一 | 婷婷国产成人精品视频 | 亚洲av色区一区二区三区 | 天天av天天 | 波多野久久 | 手机在线永久免费观看av片 | 91一级视频 | 古风h啪肉h文 | 欧美色亚洲 |