日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

JMS学习九 ActiveMQ的消息持久化到Mysql数据库

發布時間:2025/3/15 数据库 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 JMS学习九 ActiveMQ的消息持久化到Mysql数据库 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1、將連接Mysql數據庫驅動包,放到ActiveMQ的lib目錄下

2,修改ActiveMQ的conf目錄下的active.xml文件,修改數據持久化的方式

? ? ? ? ?2.1 ?修改原來的kshadb的持久化數據的方式

?

? ? ? ? ? 2.2 ?連接Mysql的配置

3、將數據持久化Mysql的運行截圖

? ? ? 3.1 ?重新啟動ActiveMQ,并運行程序,放入持久化數據,查看Mysql的active數據庫

?

4,數據持久化代碼

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
?? ?
?? ? //默認連接用戶名
? ? private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
? ? //默認連接密碼
? ? private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
? ? //默認連接地址
? ? private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
?? ?
?? ?//發送的消息數量
? ? private static final int SENDNUM = 10;

?? ?public static void main(String[] args) throws Exception {
?? ??? ?/*ActiveMQConnectionFactory activeMQConnectionFactory =?
?? ??? ??? ??? ?new ActiveMQConnectionFactory(
?? ??? ??? ??? ?ActiveMQConnection.DEFAULT_USER,?
?? ??? ??? ??? ?ActiveMQConnection.DEFAULT_PASSWORD,?
?? ??? ??? ??? ?"tcp://localhost:61616");*/
?? ??? ?/**
?? ??? ? * activemq.xml ?配置密碼
?? ??? ? */
?? ??? ?ActiveMQConnectionFactory activeMQConnectionFactory =?
?? ??? ??? ??? ?new ActiveMQConnectionFactory(
?? ??? ??? ??? ?"bhz",?
?? ??? ??? ??? ?"bhz",?
?? ??? ??? ??? ?"tcp://localhost:61616");
?? ??? ?//連接
? ? ? ? Connection connection = null;
?? ??? ?
?? ??? ?try {
?? ??? ??? ? connection = activeMQConnectionFactory.createConnection();
?? ??? ??? ? connection.start();
?? ??? ??? ?
?? ??? ??? ?//創建session
?? ??? ?//?? ?Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
?? ??? ??? ? //開啟事物
?? ??? ?//?? ?Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
?? ??? ??? ? //開啟事物 并且使用Client的方式
?? ??? ??? ? /**
?? ??? ??? ? ?* ?3、通過Connection對象創建Session會話(上下文環境對象), ?
? ? ? ? ??? ??? ? ? ? 參數一,表示是否開啟事務 ?
? ? ? ? ??? ??? ? ? ? 參數二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收 ?
??
? ? ? ? ??? ??? ? ? ? 第一個參數設置為true,表示開啟事務 ?
? ? ? ? ??? ??? ? ? ? 開啟事務后,記得要手動提交事務?
?? ??? ??? ? ?*/
?? ??? ??? ?Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
?? ??? ??? ?
?? ??? ??? ?// 4、通過Session創建Destination對象,指的是一個客戶端用來指定生產消息目標和消費消息來源的對象。 ?
?? ? ? ? ? ?// 在PTP模式中,Destination指的是Queue ?
?? ? ? ? ? ?// 在發布訂閱模式中,Destination指的是Topic ?
?? ??? ??? ?//消息的目的地
?? ??? ??? ?Destination destination = session.createQueue("queue1");
?? ??? ??? ?//創建消息生產者
?? ??? ??? ?// 5、使用Session來創建消息對象的生產者或者消費者 ?
?? ??? ??? ?MessageProducer messageProducer = session.createProducer(destination);
?? ??? ??? ?//PERSISTENT 用來指定JMS Provider對消息進行持久化操作,以免Provider fail的時候,丟失Message
?? ??? ??? ?//NON_Persistent 方式下的JMS Provider不會對消進行持久化
?? ??? ??? ?messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
?? ??? ??? ?//發送消息
?? ??? ?//?? ?sendMessage(session, messageProducer);
?? ??? ??? ?for (int j = 0; j < 10; j++) {
?? ??? ??? ??? ?TextMessage textMessage = session.createTextMessage();
?? ? ? ? ? ??? ?textMessage.setText("我的消息內容,id為"+j);
?? ? ? ? ? ??? ?System.out.println("生產者: "+textMessage.getText());
?? ??? ??? ??? ?messageProducer.send(destination,textMessage,DeliveryMode.PERSISTENT,j,60*1000);?? ?
?? ??? ??? ?//?? ?System.out.println("生產者: "+textMessage.getText());
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?//使用事物?? ?Boolean.TRUE?? ?
?? ??? ?//?? ?session.commit();
?? ??? ?} catch (Exception e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}finally{
? ? ? ? ? ? if(connection != null){
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? connection.close();
? ? ? ? ? ? ? ? } catch (JMSException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }

?? ?}
?? ?
?? ?
?? ? /**
? ? ?* 發送消息
? ? ?* @param session
? ? ?* @param messageProducer ?消息生產者
? ? ?* @throws Exception
? ? ?*/
? ? public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
? ? ? ? for (int i = 0; i < Sender.SENDNUM; i++) {
? ? ? ? ? ? //創建一條文本消息?
? ? ? ? ?? ?TextMessage textMessage = session.createTextMessage();
? ? ? ? ?? ?textMessage.setText("我的消息內容,id為"+i);
? ? ? ? ?? ?messageProducer.send(textMessage);
? ? ? ? ?? ?System.out.println("生產者: "+textMessage.getText());
? ? ? ? ?// ? TextMessage message = session.createTextMessage("ActiveMQ 發送消息" +i);
? ? ? ? ?// ? System.out.println("生產者發送消息:Activemq 發送消息" + i);
? ? ? ? ? ? //通過消息生產者發出消息?
? ? ? ? ?// ? messageProducer.send(message);
? ? ? ? }

? ? }

}
?

總結

以上是生活随笔為你收集整理的JMS学习九 ActiveMQ的消息持久化到Mysql数据库的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。