日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

activemq java 异步_异步消息处理机制之activeMQ应用实例

發布時間:2023/12/15 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activemq java 异步_异步消息处理机制之activeMQ应用实例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上篇說了KAFKA應用實例,本篇承接上篇,著重描述activeMQ消息機制的應用。

KAFKA和MQ同為數據異步處理中間件,本質都是對消息的異步處理,異步通信、削谷填峰,高并發情況下的數據處理機制。他們的不同之處在于處理數據量的大小。

MQ和KAFKA相比較,KAFKA處理的數據量更大.

下圖為activeMQ應用目錄:

ActiveMQ客戶端,對連接和會話的管理/**

*

*/

package com.ustcinfo.kanms.alarmcollector.activemq;

import javax.jms.Connection;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import org.apache.log4j.Logger;

import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;

/**

* =================================================

* 工程:GessAlarmCollector

* 類名:ActiveMQClient

* 作者:dlzhang

* 時間:2014-8-28下午05:37:21

* 版本:Version 1.0

* 描述:ActiveMQ客戶端,對連接和會話的管理,保證全局只有一個會話被創建,減少服務端壓力、節省資源

* =================================================

*/

public class ActiveMQClient{

private static final Logger logger = Logger.getLogger(ActiveMQClient.class);

private String url;

private String user;

private String passwd;

private ActiveMQConnectionFactory connFactory;

private Connection conn;

private Session session;

private boolean isConn;

public ActiveMQClient(){

// 初始化參數

this.url = GlobleConfiguration.getInstance().getActiveMqUrl();

this.user = GlobleConfiguration.getInstance().getActiveMqUser();

this.passwd = GlobleConfiguration.getInstance().getActiveMqPasswd();

}

/**

* 建立連接

*/

protected synchronized void buildConnect() {

if(isConn)

return;

try {

logger.debug("建立連接,user=" + user + ", passwd=" + passwd + ", url=" + url);

connFactory = new ActiveMQConnectionFactory(user, passwd, url);

conn = connFactory.createConnection();

conn.start();

session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

isConn = true;

logger.info("建立連接成功");

} catch (JMSException e) {

logger.error("建立連接失敗:" + e.getMessage(), e);

isConn = false;

}

}

/**

* 關閉連接

*/

public synchronized void close() {

try {

if(null != session)

session.close();

if(null != conn)

conn.close();

} catch (JMSException e) {

logger.error("關閉連接失敗:" + e.getMessage(), e);

} finally {

session = null;

conn = null;

connFactory = null;

isConn = false;

}

}

/**

* @return the url

*/

public String getUrl() {

return url;

}

/**

* @param url the url to set

*/

public void setUrl(String url) {

this.url = url;

}

/**

* @return the user

*/

public String getUser() {

return user;

}

/**

* @param user the user to set

*/

public void setUser(String user) {

this.user = user;

}

/**

* @return the passwd

*/

public String getPasswd() {

return passwd;

}

/**

* @param passwd the passwd to set

*/

public void setPasswd(String passwd) {

this.passwd = passwd;

}

/**

* @return the connFactory

*/

public ActiveMQConnectionFactory getConnFactory() {

if(!isConn)

buildConnect();

if(null == connFactory && isConn) {

this.close();

this.buildConnect();

}

return connFactory;

}

/**

* @param connFactory the connFactory to set

*/

public void setConnFactory(ActiveMQConnectionFactory connFactory) {

this.connFactory = connFactory;

}

/**

* @return the conn

*/

public Connection getConn() {

if(!isConn)

buildConnect();

if(null == conn && isConn) {

this.close();

this.buildConnect();

}

return conn;

}

/**

* @param conn the conn to set

*/

public void setConn(Connection conn) {

this.conn = conn;

}

/**

* @return the session

*/

public Session getSession() {

if(!isConn)

buildConnect();

if(null == session && isConn) {

this.close();

this.buildConnect();

}

return session;

}

/**

* @param session the session to set

*/

public void setSession(Session session) {

this.session = session;

}

/**

* @return the isOpen

*/

public boolean isConn() {

return isConn;

}

/**

* @param isOpen the isOpen to set

*/

public void setConn(boolean isConn) {

this.isConn = isConn;

}

}activemq消息主體的定義和聲明/**

*

*/

package com.ustcinfo.kanms.alarmcollector.activemq;

import java.io.Serializable;

import javax.jms.BytesMessage;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageProducer;

import javax.jms.ObjectMessage;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.log4j.Logger;

import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;

/**

* =================================================

* 工程:GessAlarmCollector

* 類名:ActiveMQSender

* 作者:dlzhang

* 時間:2014-8-29下午01:02:40

* 版本:Version 1.0

* 描述:描述該文件的作用

* =================================================

*/

public class ActiveMQSender {

private static final Logger logger = Logger.getLogger(ActiveMQSender.class);

private ActiveMQClient mqClient;

private Session session;

private String sendQueueName;

private Destination dest;

private MessageProducer producer;

public ActiveMQSender() {

// 初始化參數

this.mqClient = new ActiveMQClient();

this.session = this.mqClient.getSession();

this.sendQueueName = GlobleConfiguration.getInstance().getActiveMqSendQueueName();

try {

dest = session.createQueue(sendQueueName);

producer = session.createProducer(dest);

} catch (JMSException e) {

logger.error(e.getMessage(), e);

}

}

// /**

// * 測試函數

// * @param args

// */

// public static void main(String[] args) {

// // 初始化系統配置文件

// GlobleConfiguration.getInstance().initSysConfig();

//

// ActiveMQSender sender = new ActiveMQSender();

for(int i=0; i<100; i++) {

sender.sendTextMessage("這是一個測試");

// sender.sendTextMessage("quit");

// logger.debug("第" + ++num + "條消息發送成功");

}

// }

// private static long num;

/**

* @param msg

*/

public void sendMessage(Message msg) {

try {

producer.send(msg);

} catch (JMSException e) {

logger.error(e.getMessage(), e);

}

}

/**

* @param text

*/

public void sendTextMessage(String text) {

TextMessage tMsg = null;

try {

tMsg = session.createTextMessage(text);

} catch (JMSException e) {

logger.error(e.getMessage(), e);

}

this.sendMessage(tMsg);

}

/**

* @param bytes

*/

public void sendbytesMessage(byte[] bytes) {

BytesMessage bMsg = null;

try {

bMsg = session.createBytesMessage();

bMsg.writeBytes(bytes);

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

this.sendMessage(bMsg);

}

/**

* @param s

*/

public void sendObjectMessage(Serializable s) {

ObjectMessage oMsg = null;

try {

oMsg = session.createObjectMessage(s);

} catch (JMSException e) {

logger.error(e.getMessage(), e);

}

this.sendMessage(oMsg);

}

/**

* @return the mqClient

*/

public ActiveMQClient getMqClient() {

return mqClient;

}

/**

* @param mqClient the mqClient to set

*/

public void setMqClient(ActiveMQClient mqClient) {

this.mqClient = mqClient;

}

/**

* @return the session

*/

public Session getSession() {

return session;

}

/**

* @param session the session to set

*/

public void setSession(Session session) {

this.session = session;

}

/**

* @return the sendQueueName

*/

public String getSendQueueName() {

return sendQueueName;

}

/**

* @param sendQueueName the sendQueueName to set

*/

public void setSendQueueName(String sendQueueName) {

this.sendQueueName = sendQueueName;

}

/**

* @return the dest

*/

public Destination getDest() {

return dest;

}

/**

* @param dest the dest to set

*/

public void setDest(Destination dest) {

this.dest = dest;

}

/**

* @return the producer

*/

public MessageProducer getProducer() {

return producer;

}

/**

* @param producer the producer to set

*/

public void setProducer(MessageProducer producer) {

this.producer = producer;

}

}消息接受器,使用監聽的方式接受消息/**

*

*/

package com.ustcinfo.kanms.alarmcollector.activemq;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.List;

import javax.jms.Destination;

import javax.jms.ExceptionListener;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.log4j.Logger;

import com.ustcinfo.kanms.alarmcollector.framework.BaseThread;

import com.ustcinfo.kanms.alarmcollector.framework.GesAlarmContainer;

import com.ustcinfo.kanms.alarmcollector.framework.ReadXML;

import com.ustcinfo.kanms.alarmcollector.kafka.ProducerHandler;

import com.ustcinfo.kanms.alarmcollector.main.GlobleConfiguration;

import com.ustcinfo.kanms.alarmcollector.model.GesAlarm;

import com.ustcinfo.kanms.alarmcollector.util.FileWriteThread;

/**

* =================================================

* 工程:AlarmAutoReceiver

* 類名:ActiveMQReceiver

* 作者:lt

* 時間:2019-9-23下午08:49:38

* 版本:Version 1.0

* 描述:告警消息接受器,使用監聽的方式接受消息

* 實現ExceptionListener接口,對服務器進行監聽,實現自動重連功能

* =================================================

*/

public class AlarmAutoReceiverThread{

private static final Logger logger = Logger.getLogger(AlarmAutoReceiverThread.class);

private final FileWriteThread filewrite = new FileWriteThread();//建立一個新的線程寫文件

public AlarmAutoReceiverThread(int n) {

for(int i=0;i

new AlarmAutoReceiver().start();

logger.info(">>>>>成功啟動監聽線程:線程"+i);

}

}

class AlarmAutoReceiver extends BaseThread implements MessageListener, ExceptionListener {

private final Logger log = Logger.getLogger(AlarmAutoReceiver.class);

private ActiveMQClient mqClient;

private GesAlarmContainer gesAlarmContainer;

private Session session;

private String recvQueueName;

private Destination dest;

private MessageConsumer consumer;

public AlarmAutoReceiver() {

// 初始化參數

this.mqClient = new ActiveMQClient();

this.gesAlarmContainer = GesAlarmContainer.getInstance();

try {

mqClient.getConn().setExceptionListener(this); // 設置監聽

} catch (JMSException e) {

log.error(e.getMessage(), e);

}

this.session = this.mqClient.getSession();

this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName();

}

public AlarmAutoReceiver(ActiveMQClient mqClient) {

// 初始化參數

this.mqClient = mqClient;

this.gesAlarmContainer = GesAlarmContainer.getInstance();

try {

mqClient.getConn().setExceptionListener(this); // 設置監聽

} catch (JMSException e) {

log.error(e.getMessage(), e);

}

this.session = this.mqClient.getSession();

this.recvQueueName = GlobleConfiguration.getInstance().getAlarmRecvQueueName();

}

/* (non-Javadoc)

* @see java.lang.Thread#run()

*/

@Override

public void run() {

log.debug("啟動線程, Thread.currentThread().getName()=" + Thread.currentThread().getName());

try {

dest = session.createQueue(recvQueueName);

consumer = session.createConsumer(dest);

consumer.setMessageListener(this);

// 阻塞線程,直到收到消息為“quit”時被喚醒

synchronized (this) {

wait();

}

log.debug("結束線程, Thread.currentThread().getName()=" + Thread.currentThread().getName());

mqClient.close();

} catch (JMSException e) {

log.error("創建消費者失敗:" + e.getMessage(), e);

} catch (InterruptedException e) {

log.error("線程被中斷", e);

}

}

/* (non-Javadoc)

* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)

*/

@Override

public void onException(JMSException e) {

// 捕獲異常,連接重連

log.error("服務端異常", e);

// 結束線程

if(this.isAlive()) {

synchronized (this) {

this.notifyAll();

}

}

// 每隔一分鐘循環獲取,直到獲取連接成功

long sleepTimeSec = 10;

long sleepTimeMillis = sleepTimeSec * 1000;

int reConnCnt = 0;

while (true) {

try {

reConnCnt++;

log.info("休眠" + sleepTimeSec + "秒 -_-~zZ");

Thread.sleep(sleepTimeMillis);

log.debug("開始重新獲取連接");

// 先關閉連接,再重新連接

mqClient.close();

mqClient.buildConnect();

if(mqClient.isConn()) {

log.info("重新獲取連接成功,耗時:[" + reConnCnt * sleepTimeSec + "]秒 ^_^");

// 重新創建監聽

new AlarmAutoReceiver(mqClient).start();

break;

}

log.error("第" + reConnCnt + "次重新獲取連接失敗 T_T");

} catch (InterruptedException e1) {

log.error(e1.getMessage(), e1);

}

}

}

// /**

// * 測試函數

// * @param args

// */

// public static void main(String[] args) {

// // 初始化系統配置文件

// GlobleConfiguration.getInstance().initSysConfig();

//

// ActiveMQReceiver recv = new ActiveMQReceiver();

// recv.start();

// logger.debug("消費者啟動成功");

// }

/* (non-Javadoc)

* @see javax.jms.MessageListener#onMessage(javax.jms.Message)

*/

@Override

public void onMessage(Message message) {

log.info("-------------- 收到一條告警信息 -----------");

if (message instanceof TextMessage){

TextMessage tm = (TextMessage)message;

try {

String xml = tm.getText();

if(xml!=null){

try{

filewrite.handle(xml.substring(xml.indexOf(""),xml.lastIndexOf("")+9)+"\n--"+dateFormat(new Date()));

}catch(Exception e){

log.error("告警日志寫入出錯!",e);

filewrite.handle(xml+"\n--"+dateFormat(new Date()));

}

}

// 當消息為“quit”時,喚醒線程

if(xml != null && xml.equalsIgnoreCase("quit")) {

logger.info(Thread.currentThread().getName() + "接收到的消息為:" + xml + ",開始退出線程");

synchronized (this) {

notifyAll();

}

return;

}

/*String gesAlarmStr = ReadXML.obtainBodyInfo(xml.trim());

List gesAlarmList = ReadXML.getAlarmValue(gesAlarmStr);

if(gesAlarmList!=null&&gesAlarmList.size()>0){

logger.debug("gesAlarmList.size()=" + gesAlarmList.size());

for(int i=0;i

GesAlarm gesAlarm = new GesAlarm();

gesAlarm = gesAlarmList.get(i);

gesAlarmContainer.putGesAlarm(gesAlarm);

}

}*/

//向kafka寫入數據

new ProducerHandler("znwgAlarm",xml);

} catch (JMSException e) {

} catch(Exception e){

logger.error("接受告警信息出錯!", e);

}finally {

}

}

}

private String dateFormat(Date date){

SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

return sdf.format(date);

}

// private long num;

/**

* @return the mqClient

*/

public ActiveMQClient getMqClient() {

return mqClient;

}

/**

* @param mqClient the mqClient to set

*/

public void setMqClient(ActiveMQClient mqClient) {

this.mqClient = mqClient;

}

/**

* @return the session

*/

public Session getSession() {

return session;

}

/**

* @param session the session to set

*/

public void setSession(Session session) {

this.session = session;

}

/**

* @return the recvQueueName

*/

public String getRecvQueueName() {

return recvQueueName;

}

/**

* @param recvQueueName the recvQueueName to set

*/

public void setRecvQueueName(String recvQueueName) {

this.recvQueueName = recvQueueName;

}

/**

* @return the dest

*/

public Destination getDest() {

return dest;

}

/**

* @param dest the dest to set

*/

public void setDest(Destination dest) {

this.dest = dest;

}

/**

* @return the consumer

*/

public MessageConsumer getConsumer() {

return consumer;

}

/**

* @param consumer the consumer to set

*/

public void setConsumer(MessageConsumer consumer) {

this.consumer = consumer;

}

}

}

總結

以上是生活随笔為你收集整理的activemq java 异步_异步消息处理机制之activeMQ应用实例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。