日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

activemq mysql集群配置_ActiveMQ专题--集群,高可用方案

發布時間:2025/3/12 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activemq mysql集群配置_ActiveMQ专题--集群,高可用方案 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

ActiveMQ集群部署方式

Master-Slave部署方式

Broker-Cluster部署方式

Master-Slave與Broker-Cluster相結合的部署方式

Shared database Master-Slave方式

image.png

實戰

環境

centos 7

在服務器器部署兩個節點,一個是A節點只需要添加配置信息,B節點需要修改相應的端口。

使用數據庫進行消息持久化

引入數據庫驅動包和數據庫連接池mysql驅動包

把數據庫驅動放到activemq目錄下 lib/extra

如:mysql-connector-java-5.1.41.jar

image.png

修改activemq.xml文件

開啟持久化

A節點配置

image.png

persistent="true"

activemq默認是用的kahadb的持久化方式,下面修改為mysql的持久化方案

image.png

配置數據源bean

image.png

B節點也進行相應的修改

image.png

image.png

啟動服務

通過jcmd查看java進程,發現并沒有啟動成功

查看日志錯誤信息如下:

image.png

重新修改配置文件

使用java代碼進行測試

consumer

package com.study.mq.b2_clustering;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

// http://activemq.apache.org/failover-transport-reference.html

public class ConsumerFailover {

public static void main(String[] args) throws InterruptedException {

// 非failover的公共參數配置通過nested.*,例如 failover:(...)?nested.wireFormat.maxInactivityDuration=1000

// ?randomize=false 隨機選擇,默認是順序

// 指定優先切換 failover:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)?priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616

// maxReconnectDelay重連的最大間隔時間(毫秒)

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616,tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";

ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue-cluster02");

queue1.start();

queue1.join();

}

}

class ConsumerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ConsumerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

MessageConsumer consumer;

try {

// brokerURL http://activemq.apache.org/connection-configuration-uri.html

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

// 2、創建連接對象

conn = connectionFactory.createConnection();

conn.start(); // 一定要啟動

// 3、創建會話(可以創建一個或者多個session)

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點接收的目標,queue - 點對點

Destination destination = session.createQueue(destinationUrl);

// 5、創建消費者消息 http://activemq.apache.org/destination-options.html

consumer = session.createConsumer(destination);

// 6、接收消息

consumer.setMessageListener(message -> {

try {

if (message instanceof TextMessage) {

System.out.println("收到文本消息:" + ((TextMessage) message).getText());

} else {

System.out.println(message);

}

} catch (JMSException e) {

e.printStackTrace();

}

});

} catch (JMSException e) {

e.printStackTrace();

}

}

}

provider

package com.study.mq.b2_clustering;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**

* 簡單生產者

*/

public class Producer {

public static void main(String[] args) {

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616,tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";

new ProducerThread(brokerUrl, "queue-cluster02").start();

}

static class ProducerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ProducerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

try {

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

// 2、創建連接對象md

conn = connectionFactory.createConnection();

conn.start();

// 3、創建會話

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點發送的目標

Destination destination = session.createQueue(destinationUrl);

// 5、創建生產者消息

MessageProducer producer = session.createProducer(destination);

// 設置生產者的模式,有兩種可選 持久化 / 不持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 6、創建一條文本消息

String text = "Hello world!";

TextMessage message = session.createTextMessage(text);

for (int i = 0; i < 1; i++) {

// 7、發送消息

producer.send(message);

}

// 8、 關閉連接

session.close();

conn.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

測試

可以啟動consumer,然后收到關閉消費者連接的activemq ,這里我需要關閉A節點。

image.png

Replicaterd LevelDB Store方式(棄用)

image.png

Broker-Cluster 部署方式

image.png

搭建環境

準備在一臺服務器上搭建兩個節點A,B

A:不修改其他端口,只添加配置信息。

B:需要修改所有的端口信息。

修改配置文件

conf/activemq.xml文件下的標簽中添加以下代碼

image.png

節點B的其他端口也需要修改

image.png

jetty.xml 的web頁面訪問端口也進行了修改

image.png

image.png

節點B也需要添加代碼到

image.png

啟動A,B兩節點

tail -f /var/activemq/data/activemq.log 查看日志信息

查看A節點的啟動日志

image.png

可以發現B節點并沒有啟動成功

B節點的日志信息:

image.png

重新修改B節點的端口:

image.png

使用jcmd命令

image.png

看到進程以及啟動成功了

查看A節點日志:

image.png

通過日志可以看出啟動成功了,并成功連接B節點了。

總結

修改配置前一定需要停止activemq服務,安裝過程遇到問題,可以查看日志。

Activemq集群是通過一種特殊的隊列進行集群操作的。

image.png

使用JAVA代碼測試Broker-Cluster集群模式

provider

使用A節點生產數據

package com.study.mq.b2_clustering.network_connector;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**

* 簡單生產者

*/

public class Producer {

public static void main(String[] args) {

// 生產者用A節點 activemq.tkxb.com:61616

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61616)?initialReconnectDelay=100";

new ProducerThread(brokerUrl, "queue-cluster01").start();

}

static class ProducerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ProducerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

try {

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

// 2、創建連接對象md

conn = connectionFactory.createConnection();

conn.start();

// 3、創建會話

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點發送的目標

Destination destination = session.createQueue(destinationUrl);

// 5、創建生產者消息

MessageProducer producer = session.createProducer(destination);

// 設置生產者的模式,有兩種可選 持久化 / 不持久化

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

// 6、創建一條文本消息

String text = "Hello world!";

TextMessage message = session.createTextMessage(text);

for (int i = 0; i < 1; i++) {

// 7、發送消息

producer.send(message);

}

// 8、 關閉連接

session.close();

conn.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

consumer

使用B節點消費數據

package com.study.mq.b2_clustering.network_connector;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

// http://activemq.apache.org/networks-of-brokers.html

public class ConsumerNetowork {

public static void main(String[] args) throws InterruptedException {

// 消費者用A節點 activemq.tkxb.com:61616

String brokerUrl = "failover:(tcp://activemq.tkxb.com:61617)?initialReconnectDelay=100";

ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue-cluster01");

queue1.start();

queue1.join();

}

}

class ConsumerThread extends Thread {

String brokerUrl;

String destinationUrl;

public ConsumerThread(String brokerUrl, String destinationUrl) {

this.brokerUrl = brokerUrl;

this.destinationUrl = destinationUrl;

}

@Override

public void run() {

ActiveMQConnectionFactory connectionFactory;

Connection conn;

Session session;

MessageConsumer consumer;

try {

// brokerURL http://activemq.apache.org/connection-configuration-uri.html

// 1、創建連接工廠

connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

// 2、創建連接對象

conn = connectionFactory.createConnection();

conn.start(); // 一定要啟動

// 3、創建會話(可以創建一個或者多個session)

session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 4、創建點對點接收的目標,queue - 點對點

Destination destination = session.createQueue(destinationUrl);

// 5、創建消費者消息 http://activemq.apache.org/destination-options.html

consumer = session.createConsumer(destination);

// 6、接收消息

consumer.setMessageListener(message -> {

try {

if (message instanceof TextMessage) {

System.out.println("收到文本消息:" + ((TextMessage) message).getText());

} else {

System.out.println(message);

}

} catch (JMSException e) {

e.printStackTrace();

}

});

} catch (JMSException e) {

e.printStackTrace();

}

}

}

Master-Slave與Broker-Cluster結合

image.png

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的activemq mysql集群配置_ActiveMQ专题--集群,高可用方案的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。