activemq mysql集群配置_ActiveMQ专题--集群,高可用方案
ActiveMQ集群部署方式
Master-Slave部署方式
Broker-Cluster部署方式
Master-Slave與Broker-Cluster相結(jié)合的部署方式
Shared database Master-Slave方式
image.png
實(shí)戰(zhàn)
環(huán)境
centos 7
在服務(wù)器器部署兩個(gè)節(jié)點(diǎn),一個(gè)是A節(jié)點(diǎn)只需要添加配置信息,B節(jié)點(diǎn)需要修改相應(yīng)的端口。
使用數(shù)據(jù)庫(kù)進(jìn)行消息持久化
引入數(shù)據(jù)庫(kù)驅(qū)動(dòng)包和數(shù)據(jù)庫(kù)連接池mysql驅(qū)動(dòng)包
把數(shù)據(jù)庫(kù)驅(qū)動(dòng)放到activemq目錄下 lib/extra
如:mysql-connector-java-5.1.41.jar
image.png
修改activemq.xml文件
開啟持久化
A節(jié)點(diǎn)配置
image.png
persistent="true"
activemq默認(rèn)是用的kahadb的持久化方式,下面修改為mysql的持久化方案
image.png
配置數(shù)據(jù)源bean
image.png
B節(jié)點(diǎn)也進(jìn)行相應(yīng)的修改
image.png
image.png
啟動(dòng)服務(wù)
通過(guò)jcmd查看java進(jìn)程,發(fā)現(xiàn)并沒(méi)有啟動(dòng)成功
查看日志錯(cuò)誤信息如下:
image.png
重新修改配置文件
使用java代碼進(jìn)行測(cè)試
consumer
package com.study.mq.b2_clustering;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// http://activemq.apache.org/failover-transport-reference.html
public class ConsumerFailover {
public static void main(String[] args) throws InterruptedException {
// 非failover的公共參數(shù)配置通過(guò)nested.*,例如 failover:(...)?nested.wireFormat.maxInactivityDuration=1000
// ?randomize=false 隨機(jī)選擇,默認(rèn)是順序
// 指定優(yōu)先切換 failover:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)?priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616
// maxReconnectDelay重連的最大間隔時(shí)間(毫秒)
String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616,tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";
ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue-cluster02");
queue1.start();
queue1.join();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try {
// brokerURL http://activemq.apache.org/connection-configuration-uri.html
// 1、創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、創(chuàng)建連接對(duì)象
conn = connectionFactory.createConnection();
conn.start(); // 一定要啟動(dòng)
// 3、創(chuàng)建會(huì)話(可以創(chuàng)建一個(gè)或者多個(gè)session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、創(chuàng)建點(diǎn)對(duì)點(diǎn)接收的目標(biāo),queue - 點(diǎn)對(duì)點(diǎn)
Destination destination = session.createQueue(destinationUrl);
// 5、創(chuàng)建消費(fèi)者消息 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination);
// 6、接收消息
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
provider
package com.study.mq.b2_clustering;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 簡(jiǎn)單生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616,tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";
new ProducerThread(brokerUrl, "queue-cluster02").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、創(chuàng)建連接對(duì)象md
conn = connectionFactory.createConnection();
conn.start();
// 3、創(chuàng)建會(huì)話
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、創(chuàng)建點(diǎn)對(duì)點(diǎn)發(fā)送的目標(biāo)
Destination destination = session.createQueue(destinationUrl);
// 5、創(chuàng)建生產(chǎn)者消息
MessageProducer producer = session.createProducer(destination);
// 設(shè)置生產(chǎn)者的模式,有兩種可選 持久化 / 不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、創(chuàng)建一條文本消息
String text = "Hello world!";
TextMessage message = session.createTextMessage(text);
for (int i = 0; i < 1; i++) {
// 7、發(fā)送消息
producer.send(message);
}
// 8、 關(guān)閉連接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
測(cè)試
可以啟動(dòng)consumer,然后收到關(guān)閉消費(fèi)者連接的activemq ,這里我需要關(guān)閉A節(jié)點(diǎn)。
image.png
Replicaterd LevelDB Store方式(棄用)
image.png
Broker-Cluster 部署方式
image.png
搭建環(huán)境
準(zhǔn)備在一臺(tái)服務(wù)器上搭建兩個(gè)節(jié)點(diǎn)A,B
A:不修改其他端口,只添加配置信息。
B:需要修改所有的端口信息。
修改配置文件
conf/activemq.xml文件下的標(biāo)簽中添加以下代碼
image.png
節(jié)點(diǎn)B的其他端口也需要修改
image.png
jetty.xml 的web頁(yè)面訪問(wèn)端口也進(jìn)行了修改
image.png
image.png
節(jié)點(diǎn)B也需要添加代碼到
image.png
啟動(dòng)A,B兩節(jié)點(diǎn)
tail -f /var/activemq/data/activemq.log 查看日志信息
查看A節(jié)點(diǎn)的啟動(dòng)日志
image.png
可以發(fā)現(xiàn)B節(jié)點(diǎn)并沒(méi)有啟動(dòng)成功
B節(jié)點(diǎn)的日志信息:
image.png
重新修改B節(jié)點(diǎn)的端口:
image.png
使用jcmd命令
image.png
看到進(jìn)程以及啟動(dòng)成功了
查看A節(jié)點(diǎn)日志:
image.png
通過(guò)日志可以看出啟動(dòng)成功了,并成功連接B節(jié)點(diǎn)了。
總結(jié)
修改配置前一定需要停止activemq服務(wù),安裝過(guò)程遇到問(wèn)題,可以查看日志。
Activemq集群是通過(guò)一種特殊的隊(duì)列進(jìn)行集群操作的。
image.png
使用JAVA代碼測(cè)試Broker-Cluster集群模式
provider
使用A節(jié)點(diǎn)生產(chǎn)數(shù)據(jù)
package com.study.mq.b2_clustering.network_connector;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 簡(jiǎn)單生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
// 生產(chǎn)者用A節(jié)點(diǎn) activemq.tkxb.com:61616
String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616)?initialReconnectDelay=100";
new ProducerThread(brokerUrl, "queue-cluster01").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、創(chuàng)建連接對(duì)象md
conn = connectionFactory.createConnection();
conn.start();
// 3、創(chuàng)建會(huì)話
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、創(chuàng)建點(diǎn)對(duì)點(diǎn)發(fā)送的目標(biāo)
Destination destination = session.createQueue(destinationUrl);
// 5、創(chuàng)建生產(chǎn)者消息
MessageProducer producer = session.createProducer(destination);
// 設(shè)置生產(chǎn)者的模式,有兩種可選 持久化 / 不持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、創(chuàng)建一條文本消息
String text = "Hello world!";
TextMessage message = session.createTextMessage(text);
for (int i = 0; i < 1; i++) {
// 7、發(fā)送消息
producer.send(message);
}
// 8、 關(guān)閉連接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
consumer
使用B節(jié)點(diǎn)消費(fèi)數(shù)據(jù)
package com.study.mq.b2_clustering.network_connector;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// http://activemq.apache.org/networks-of-brokers.html
public class ConsumerNetowork {
public static void main(String[] args) throws InterruptedException {
// 消費(fèi)者用A節(jié)點(diǎn) activemq.tkxb.com:61616
String brokerUrl = "failover:(tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";
ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue-cluster01");
queue1.start();
queue1.join();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try {
// brokerURL http://activemq.apache.org/connection-configuration-uri.html
// 1、創(chuàng)建連接工廠
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、創(chuàng)建連接對(duì)象
conn = connectionFactory.createConnection();
conn.start(); // 一定要啟動(dòng)
// 3、創(chuàng)建會(huì)話(可以創(chuàng)建一個(gè)或者多個(gè)session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、創(chuàng)建點(diǎn)對(duì)點(diǎn)接收的目標(biāo),queue - 點(diǎn)對(duì)點(diǎn)
Destination destination = session.createQueue(destinationUrl);
// 5、創(chuàng)建消費(fèi)者消息 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination);
// 6、接收消息
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage) {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}
} catch (JMSException e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Master-Slave與Broker-Cluster結(jié)合
image.png
與50位技術(shù)專家面對(duì)面20年技術(shù)見證,附贈(zèng)技術(shù)全景圖總結(jié)
以上是生活随笔為你收集整理的activemq mysql集群配置_ActiveMQ专题--集群,高可用方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java 文件流 重写_java中关于文
- 下一篇: mysql 类型 自动转化_自动MySQ