日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ持久化方式(转)

發(fā)布時間:2025/1/21 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ持久化方式(转) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

?

消息持久性對于可靠消息傳遞來說應(yīng)該是一種比較好的方法,有了消息持久化,即使發(fā)送者和接受者不是同時在線或者消息中心在發(fā)送者發(fā)送消息后宕機了,在消息 中心重新啟動后仍然可以將消息發(fā)送出去,如果把這種持久化和ReliableMessaging結(jié)合起來應(yīng)該是很好的保證了消息的可靠傳送。

消息持久性的原理很簡單,就是在發(fā)送者將消息發(fā)送出去后,消息中心首先將消息存儲到本地數(shù)據(jù)文件、內(nèi)存數(shù)據(jù)庫或者遠(yuǎn)程數(shù)據(jù)庫等,然后試圖將消息發(fā)送 給接收者,發(fā)送成功則將消息從存儲中刪除,失敗則繼續(xù)嘗試。消息中心啟動以后首先要檢查制定的存儲位置,如果有未發(fā)送成功的消息,則需要把消息發(fā)送出去。

ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。

1、AMQ

AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復(fù)的特點。消息存儲在一個個文件中,文件的默認(rèn)大小為32M,如果一條消息的大小超過了 32M,那么這個值必須設(shè)置大一點。當(dāng)一個存儲文件中的消息已經(jīng)全部被消費,那么這個文件將被標(biāo)識為可刪除,在下一個清除階段,這個文件被刪除。AMQ適用于ActiveMQ5.3之前的版本。默認(rèn)配置如下:

1 2 3 <persistenceAdapter> ???<amqPersistenceAdapter?directory="activemq-data"maxFileLength="32mb"/> </persistenceAdapter>

屬性如下:

屬性名稱

默認(rèn)值

描述

directory

activemq-data

消息文件和日志的存儲目錄

useNIO

true

使用NIO協(xié)議存儲消息

syncOnWrite

false

同步寫到磁盤,這個選項對性能影響非常大

maxFileLength

32Mb

一個消息文件的大小

persistentIndex

true

消息索引的持久化,如果為false,那么索引保存在內(nèi)存中

maxCheckpointMessageAddSize

4kb

一個事務(wù)允許的最大消息量

cleanupInterval

30000

清除操作周期,單位ms

indexBinSize

1024

索引文件緩存頁面數(shù),缺省為1024,當(dāng)amq擴充或者縮減存儲時,會鎖定整個broker,導(dǎo)致一定時間的阻塞,所以這個值應(yīng)該調(diào)整到比較大,但是代碼中實現(xiàn)會動態(tài)伸縮,調(diào)整效果并不理想。

indexKeySize

96

索引key的大小,key是消息ID

indexPageSize

16kb

索引的頁大小

directoryArchive

archive

存儲被歸檔的消息文件目錄

archiveDataLogs

false

當(dāng)為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除                    

2、KahaDB

KahaDB是基于文件的本地數(shù)據(jù)庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復(fù)的時間比AMQ短,從5.4版本之后KahaDB做為默認(rèn)的持久化方式。默認(rèn)配置如下:

KahaDB的屬性如下:

屬性名稱

默認(rèn)值

描述

directory

activemq-data

消息文件和日志的存儲目錄

indexWriteBatchSize

1000

一批索引的大小,當(dāng)要更新的索引量到達(dá)這個值時,更新到消息文件中

indexCacheSize

10000

內(nèi)存中,索引的頁大小

enableIndexWriteAsync

false

索引是否異步寫到消息文件中

journalMaxFileLength

32mb

一個消息文件的大小

enableJournalDiskSyncs

true

是否講非事務(wù)的消息同步寫入到磁盤

cleanupInterval

30000

清除操作周期,單位ms

checkpointInterval

5000

索引寫入到消息文件的周期,單位ms

ignoreMissingJournalfiles

false

忽略丟失的消息文件,false,當(dāng)丟失了消息文件,啟動異常

checkForCorruptJournalFiles

false

檢查消息文件是否損壞,true,檢查發(fā)現(xiàn)損壞會嘗試修復(fù)

checksumJournalFiles

false

產(chǎn)生一個checksum,以便能夠檢測journal文件是否損壞。

5.4版本之后有效的屬性:

??

archiveDataLogs

false

當(dāng)為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除

directoryArchive

null

存儲被歸檔的消息文件目錄

databaseLockedWaitDelay

10000

在使用負(fù)載時,等待獲得文件鎖的延遲時間,單位ms

maxAsyncJobs

10000

同個生產(chǎn)者產(chǎn)生等待寫入的異步消息最大量

concurrentStoreAndDispatchTopics

false

當(dāng)寫入消息的時候,是否轉(zhuǎn)發(fā)主題消息

concurrentStoreAndDispatchQueues

true

當(dāng)寫入消息的時候,是否轉(zhuǎn)發(fā)隊列消息

5.6版本之后有效的屬性:

??

archiveCorruptedIndex

false

是否歸檔錯誤的索引

每個KahaDB的實例都可以配置單獨的適配器,如果沒有目標(biāo)隊列提交給filteredKahaDB,那么意味著對所有的隊列有效。如果一個隊列沒有對應(yīng)的適配器,那么將會拋出一個異常。配置如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <persistenceAdapter> ??<mKahaDBdirectory="${activemq.base}/data/kahadb"> ????<filteredPersistenceAdapters> ??????<!--?match?all?queues?--> ??????<filteredKahaDBqueue=">"> ????????<persistenceAdapter> ??????????<kahaDBjournalMaxFileLength="32mb"/> ????????</persistenceAdapter> ??????</filteredKahaDB> ??????? ??????<!--?match?all?destinations?--> ??????<filteredKahaDB> ????????<persistenceAdapter> ??????????<kahaDBenableJournalDiskSyncs="false"/> ????????</persistenceAdapter> ??????</filteredKahaDB> ????</filteredPersistenceAdapters> ??</mKahaDB> </persistenceAdapter>

如果filteredKahaDB的perDestination屬性設(shè)置為true,那么匹配的目標(biāo)隊列將會得到自己對應(yīng)的KahaDB實例。配置如下:

1 2 3 4 5 6 7 8 9 10 11 12 <persistenceAdapter> ??<mKahaDBdirectory="${activemq.base}/data/kahadb"> ????<filteredPersistenceAdapters> ??????<!--?kahaDB?per?destinations?--> ??????<filteredKahaDB?perDestination="true"> ????????<persistenceAdapter> ??????????<kahaDBjournalMaxFileLength="32mb"?/> ????????</persistenceAdapter> ??????</filteredKahaDB> ????</filteredPersistenceAdapters> ??</mKahaDB> </persistenceAdapter>

3、JDBC

可以將消息存儲到數(shù)據(jù)庫中,例如:Mysql、SQL Server、Oracle、DB2。

配置JDBC適配器:

1 2 3 <persistenceAdapter> ????<jdbcPersistenceAdapterdataSource="#mysql-ds"?createTablesOnStartup="false"?/> </persistenceAdapter>

dataSource指定持久化數(shù)據(jù)庫的bean,createTablesOnStartup是否在啟動的時候創(chuàng)建數(shù)據(jù)表,默認(rèn)值是true,這樣每次啟動都會去創(chuàng)建數(shù)據(jù)表了,一般是第一次啟動的時候設(shè)置為true,之后改成false。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 Mysql持久化bean: <bean?id="mysql-ds"?class="org.apache.commons.dbcp.BasicDataSource"?destroy-method="close"> ????<property?name="driverClassName"?value="com.mysql.jdbc.Driver"/> ????<property?name="url"?value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> ????<property?name="username"?value="activemq"/> ????<property?name="password"?value="activemq"/> ????<property?name="poolPreparedStatements"?value="true"/> </bean> SQL?Server持久化bean: <bean?id="mssql-ds"?class="net.sourceforge.jtds.jdbcx.JtdsDataSource"?destroy-method="close"> ???<property?name="serverName"?value="SERVERNAME"/> ???<property?name="portNumber"?value="PORTNUMBER"/> ???<property?name="databaseName"?value="DATABASENAME"/> ???<property?name="user"?value="USER"/> ???<property?name="password"?value="PASSWORD"/> </bean> Oracle持久化bean: <bean?id="oracle-ds"?class="org.apache.commons.dbcp.BasicDataSource"?destroy-method="close"> ????<property?name="driverClassName"?value="oracle.jdbc.driver.OracleDriver"/> ????<property?name="url"?value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/> ????<property?name="username"?value="activemq"/> ????<property?name="password"?value="activemq"/> ????<property?name="maxActive"?value="200"/> ????<property?name="poolPreparedStatements"?value="true"/> </bean> DB2持久化bean: <bean?id="db2-ds"?class="org.apache.commons.dbcp.BasicDataSource"??destroy-method="close"> ??????<property?name="driverClassName"?value="com.ibm.db2.jcc.DB2Driver"/> ??????<property?name="url"?value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/> ??????<property?name="username"?value="activemq"/> ??????<property?name="password"?value="activemq"/> ??????<property?name="maxActive"?value="200"/> ??????<property?name="poolPreparedStatements"?value="true"/> ??</bean>

4、LevelDB

這種文件系統(tǒng)是從ActiveMQ5.8之后引進的,它和KahaDB非常相似,也是基于文件的本地數(shù)據(jù)庫儲存形式,但是它提供比KahaDB更快的持久性。與KahaDB不同的是,它不是使用傳統(tǒng)的B-樹來實現(xiàn)對日志數(shù)據(jù)的提前寫,而是使用基于索引的LevelDB。

默認(rèn)配置如下:

?

1 2 3 <?persistenceAdapter?> ???????<?levelDBdirectory?=?"activemq-data"?/> </?persistenceAdapter?>

屬性如下:

屬性名稱

默認(rèn)值

描述

directory

"LevelDB"

數(shù)據(jù)文件的存儲目錄

readThreads

10

系統(tǒng)允許的并發(fā)讀線程數(shù)量

sync

true

同步寫到磁盤

logSize

104857600 (100 MB)

日志文件大小的最大值

logWriteBufferSize

4194304 (4 MB)

日志數(shù)據(jù)寫入文件系統(tǒng)的最大緩存值

verifyChecksums

false

是否對從文件系統(tǒng)中讀取的數(shù)據(jù)進行校驗

paranoidChecks

false

盡快對系統(tǒng)內(nèi)部發(fā)生的存儲錯誤進行標(biāo)記

indexFactory

org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory

在創(chuàng)建LevelDB索引時使用

indexMaxOpenFiles

1000

可供索引使用的打開文件的數(shù)量

indexBlockRestartInterval

16

Number keys between restart points for delta encoding of keys.

indexWriteBufferSize

6291456 (6 MB)

內(nèi)存中索引數(shù)據(jù)的最大值

indexBlockSize

4096 (4 K)

每個數(shù)據(jù)塊的索引數(shù)據(jù)大小

indexCacheSize

268435456 (256 MB)

使用緩存索引塊允許的最大內(nèi)存

indexCompression

snappy

適用于索引塊的壓縮類型

logCompression

none

適用于日志記錄的壓縮類型

5、? 下面詳細(xì)介紹一下如何將消息持久化到Mysql數(shù)據(jù)庫中

?????????需要將mysql的驅(qū)動包放置到ActiveMQ的lib目錄下

?????????修改activeMQ的配置文件:

?

1 2 3 <?persistenceAdapter?> <?jdbcPersistenceAdapter?dataDirectory?=?"${activemq.base}/data"?dataSource?=?"#mysql-ds"?createTablesOnStartup?=?"false"?/> </?persistenceAdapter?>

在配置文件中的broker節(jié)點外增加:

?

1 2 3 4 5 6 7 8 <?beanid?=?"mysql-ds"?class?=?"org.apache.commons.dbcp.BasicDataSource"?destroy-method?=?"close"?> ???????<?propertyname?=?"driverClassName"?value?=?"com.mysql.jdbc.Driver"?/> ???????<?property?name?=?"url"?value?=?"jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"?/> ???????<?property?name?=?"username"?value?=?"root"?/> ???????<?property?name?=?"password"?value?=?"root"?/> ???????<?property?name?=?"maxActive"?value?=?"200"?/> ???????<?propertyname?=?"poolPreparedStatements"?value?=?"true"?/> </?bean?>

從配置中可以看出數(shù)據(jù)庫的名稱是activemq,需要手動在MySql中建立這個數(shù)據(jù)庫。

然后重新啟動activeMQ,會發(fā)現(xiàn)activemq多了三張表:

1:activemq_acks

2:activemq_lock

3:activemq_msgs

?????????點到點類型

Sender類:

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.DeliveryMode; import?javax.jms.Destination; import?javax.jms.JMSException; import?javax.jms.MessageProducer; import?javax.jms.Session; import?javax.jms.TextMessage; import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Sender { private?static?final?int?SEND_NUMBER =?2000?; ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ????????ConnectionFactory connectionFactory; ????????// Connection :JMS客戶端到JMS Provider的連接 ????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ????????Session session; ????????// Destination :消息的目的地;消息發(fā)送給誰. ????????Destination destination; ????????// MessageProducer:消息發(fā)送者 ????????MessageProducer producer; ?????????// TextMessage message; ?????????// 構(gòu)造ConnectionFactory實例對象,此處采用ActiveMq的實現(xiàn) ????????connectionFactory =?new?ActiveMQConnectionFactory( ???????????????ActiveMQConnection.DEFAULT_USER, ???????????????ActiveMQConnection.DEFAULT_PASSWORD, ???????????????"tcp://localhost:61616"?); ????????try?{ ????????????// 構(gòu)造從工廠得到連接對象 ????????????connection = connectionFactory.createConnection(); ????????????//啟動 ????????????connection.start(); ????????????//獲取操作連接 ????????????session = connection.createSession(?false?, Session.AUTO_ACKNOWLEDGE); ????????????//獲取session,FirstQueue是一個服務(wù)器的queue??????????????? destination = session.createQueue("FirstQueue"); ????????????// 得到消息生成者【發(fā)送者】 ????????????producer = session.createProducer(destination); ????????????//設(shè)置不持久化 ????????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ????????????//構(gòu)造消息 ????????????sendMessage(session, producer); ????????????//session.commit(); ????????????connection.close(); ????????} ????????catch?(Exception e){ ????????????e.printStackTrace(); ????????}?finally?{ ????????????if?(?null?!= connection){ ???????????????try?{ ???????????????????connection.close(); ???????????????}?catch?(JMSException e) { ???????????????????// TODO Auto-generatedcatch block ???????????????????e.printStackTrace(); ???????????????} ????????????}??? ????????} ?????} ?????public?static?void?sendMessage(Session session, MessageProducer producer)?throws?Exception{ ????????for?(?int?i=?1?; i<=SEND_NUMBER; i++){ ????????????TextMessage message = session.createTextMessage(?"ActiveMQ發(fā)送消息"?+i); ????????????System.out.println(?"發(fā)送消息:ActiveMQ發(fā)送的消息"?+i); ????????????producer.send(message); ????????} ?????} }

Receiver類:

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.Destination; import?javax.jms.MessageConsumer; import?javax.jms.Session; import?javax.jms.TextMessage; import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Receiver { ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ?????????ConnectionFactory connectionFactory; ?????????// Connection :JMS客戶端到JMS Provider的連接 ?????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ?????????Session session; ?????????// Destination :消息的目的地;消息發(fā)送給誰. ?????????Destination destination; ?????????// 消費者,消息接收者 ?????????MessageConsumer consumer; ?????????connectionFactory = newActiveMQConnectionFactory( ?????????????????ActiveMQConnection.DEFAULT_USER, ?????????????????ActiveMQConnection.DEFAULT_PASSWORD, ?????????????????"tcp://localhost:61616"?); ?????????try?{ ?????????????//得到連接對象 ?????????????connection =connectionFactory.createConnection(); ?????????????// 啟動 ?????????????connection.start(); ?????????????// 獲取操作連接 ?????????????session = connection.createSession(?false?, ?????????????????????Session.AUTO_ACKNOWLEDGE); ?????????????// 創(chuàng)建Queue ????????????destination = session.createQueue(?"FirstQueue"?); ?????????????consumer =session.createConsumer(destination);??????? ?????????????while?(?true?){ ???????????????//設(shè)置接收者接收消息的時間,為了便于測試,這里定為100s ???????????????TextMessagemessage = (TextMessage)consumer.receive(?100000?); ???????????????if?(?null?!= message){ ??????????????????System.out.println(?"收到消息"?+message.getText()); ???????????????}?else?break?; ?????????????} ?????????}?catch?(Exception e){ ?????????e.printStackTrace(); ?????????}?finally?{ ?????????????try?{ ?????????????????if?(?null?!= connection) ?????????????????????connection.close(); ?????????????}?catch?(Throwable ignore) { ?????????????} ?????????} ?????} }

測試:

測試一:

A、 先運行Sender類,待運行完畢后,運行Receiver類

B、 在此過程中activemq數(shù)據(jù)庫的activemq_msgs表中沒有數(shù)據(jù)

C、 再次運行Receiver,消費不到任何信息

測試二:

A、? 先運行Sender類

B、 重啟電腦

C、 運行Receiver類,無任何信息被消費

測試三:

A、?? 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、?? 先運行Sender類,待運行完畢后,運行Receiver類

C、?? 在此過程中activemq數(shù)據(jù)庫的activemq_msgs表中有數(shù)據(jù)生成,運行完Receiver類后,數(shù)據(jù)清除

測試四:

A、??? 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、??? 運行Sender類

C、??? 重啟電腦

D、??? 運行Receiver類,有消息被消費

結(jié)論:???

通過以上測試,可以發(fā)現(xiàn),在P2P類型中當(dāng)DeliveryMode設(shè)置為NON_PERSISTENCE時,消息被保存在內(nèi)存中,而當(dāng) DeliveryMode設(shè)置為PERSISTENCE時,消息保存在broker的相應(yīng)的文件或者數(shù)據(jù)庫中。而且P2P中消息一旦被Consumer消 費就從broker中刪除。

?????????發(fā)布/訂閱類型

Sender類:

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.DeliveryMode; import?javax.jms.Destination; import?javax.jms.JMSException; import?javax.jms.MessageProducer; import?javax.jms.Session; import?javax.jms.TextMessage; import?javax.jms.Topic; import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Sender { ?????private?static?final?int?SEND_NUMBER =?100?; ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ????????ConnectionFactory connectionFactory; ????????// Connection :JMS客戶端到JMS Provider的連接 ????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ????????Session session; ????????// MessageProducer:消息發(fā)送者 ????????MessageProducer producer; ?????????// TextMessage message; ?????????// 構(gòu)造ConnectionFactory實例對象,此處采用ActiveMq的實現(xiàn) ????????connectionFactory =?new?ActiveMQConnectionFactory( ???????????????ActiveMQConnection.DEFAULT_USER, ????????????????ActiveMQConnection.DEFAULT_PASSWORD, ???????????????"tcp://localhost:61616"?); ????????try?{ ????????????//得到連接對象 ????????????connection = connectionFactory.createConnection(); ????????????//啟動 ????????????connection.start(); ????????????//獲取操作連接 ????????????session = connection.createSession(?false?, Session.AUTO_ACKNOWLEDGE);??????? ????????????Topic topic = session.createTopic(?"MQ_test"?);?????? ????????????// 得到消息生成者【發(fā)送者】 ????????????producer = session.createProducer(topic); ????????????//設(shè)置持久化 ????????????producer.setDeliveryMode(DeliveryMode.PERSISTENT); ????????????//構(gòu)造消息 ????????????sendMessage(session, producer); ????????????//session.commit(); ????????????connection.close(); ????????} ????????catch?(Exception e){ ????????????e.printStackTrace(); ????????}?finally?{ ????????????if?(?null?!= connection){ ???????????????try?{ ???????????????????connection.close(); ???????????????}?catch?(JMSException e) { ???????????????????// TODO Auto-generatedcatch block ???????????????????e.printStackTrace(); ???????????????} ????????????}??? ????????} ?????} ?????public?static?void?sendMessage(Session session, MessageProducer producer)?throws?Exception{ ????????for?(?int?i=?1?; i<=SEND_NUMBER; i++){ ????????????TextMessage message = session.createTextMessage(?"ActiveMQ發(fā)送消息"?+i); ????????????System.out.println(?"發(fā)送消息:ActiveMQ發(fā)送的消息"?+i); ????????????producer.send(message); ????????} ?????} }

Receiver類:

?

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.Destination; import?javax.jms.MessageConsumer; import?javax.jms.Session; import?javax.jms.TextMessage; import?javax.jms.Topic; ?? import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Receiver { ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ?????????ConnectionFactory connectionFactory; ?????????// Connection :JMS客戶端到JMS Provider的連接 ?????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ?????????Session session; ?????????// 消費者,消息接收者 ?????????MessageConsumer consumer; ?????????connectionFactory = newActiveMQConnectionFactory( ????????????????ActiveMQConnection.DEFAULT_USER, ?????????????????ActiveMQConnection.DEFAULT_PASSWORD, ?????????????????"tcp://localhost:61616"?); ?????????try?{ ?????????????// 構(gòu)造從工廠得到連接對象 ?????????????connection =connectionFactory.createConnection(); ????????????? ?????????????connection.setClientID(?"clientID001"?); ?????????????// 啟動 ?????????????connection.start(); ?????????????// 獲取操作連接 ?????????????session = connection.createSession(?false?, ?????????????????????Session.AUTO_ACKNOWLEDGE); ?????????????// 獲取session ????????????Topic topic = session.createTopic(?"MQ_test"?);?????? ????????????// 得到消息生成者【發(fā)送者】 ????????????consumer = session.createDurableSubscriber(topic,?"MQ_sub"?); ???????????? ?????????????while?(?true?){ ???????????????//設(shè)置接收者接收消息的時間,為了便于測試,這里誰定為100s ???????????????TextMessagemessage = (TextMessage)consumer.receive(?100000?); ???????????????if?(?null?!= message){ ??????????????????System.out.println(?"收到消息"?+message.getText()); ???????????????}?else?break?; ?????????????} ?????????}?catch?(Exception e){ ?????????e.printStackTrace(); ?????????}?finally?{ ?????????????try?{ ?????????????????if?(?null?!= connection) ?????????????????????connection.close(); ?????????????}?catch?(Throwable ignore) { ?????????????} ?????????} ?????} ?? }

測試:

測試一:

A、先啟動Sender類

B、再啟動Receiver類

C、結(jié)果無任何記錄被訂閱

測試二:

A、先啟動Receiver類,讓Receiver在相關(guān)主題上進行訂閱

B、停止Receiver類,再啟動Sender類

C、待Sender類運行完成后,再啟動Receiver類

D、結(jié)果發(fā)現(xiàn)相應(yīng)主題的信息被訂閱

http://www.cnblogs.com/adolfmc/p/4462580.html

總結(jié)

以上是生活随笔為你收集整理的ActiveMQ持久化方式(转)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产又粗又猛又大爽 | 国产精品看片 | 亚洲第一a | 99精品久久精品一区二区 | www色亚洲| 欧美成人黄色小视频 | 亚洲成a人v欧美综合天堂麻豆 | 中文字幕在线导航 | 怡红院一区二区三区 | 女女互慰吃奶互揉调教捆绑 | 午夜亚洲AV永久无码精品蜜芽 | 国产精品ⅴa有声小说 | 天堂网av中文字幕 | www亚洲一区| 日本黄色特级片 | 一卡二卡三卡四卡五卡 | 日韩高清在线播放 | 欧美人与性动交ccoo | 日韩美女视频在线观看 | 欧美一级淫片免费视频魅影视频 | 黄色国产网站 | 久久色在线视频 | 国产av一区二区三区精品 | 欧美在线视频网 | 九九九九精品九九九九 | 亚洲精品区| 九色91popny蝌蚪新疆 | 69视频在线观看免费 | 自拍三级| 人妻 日韩精品 中文字幕 | 97成人在线| 丰满少妇一区二区 | 激情a| 狠狠躁日日躁夜夜躁2022麻豆 | 精品国产露脸精彩对白 | 成人性生交大片免费 | 亚欧精品在线观看 | 爱爱高潮视频 | 国产露脸无套对白在线播放 | 99久久久| 久久综合激的五月天 | 四虎黄网| 中文字幕一区二区三区电影 | 91色片| 免费观看成年人视频 | 亚洲一区在线观 | 欧美三日本三级少妇三级99观看视频 | 日本a在线天堂 | 波多野吉衣伦理片 | 开心色婷婷| 国产午夜激情 | 成人精品在线视频 | 蜜臀久久精品久久久久 | 欧美18免费视频 | 自拍第一页 | 中文字幕乱码在线观看 | 麻豆影视av | 国产精品色片 | 国产剧情演绎av | 天天摸天天做天天爽水多 | 久久黄色片视频 | 天堂精品一区二区三区 | 日韩亚洲欧美综合 | 欧美精品一级在线观看 | 香蕉国产在线视频 | 精品资源成人 | 成人av免费在线看 | 午夜精品一区二区三区三上悠亚 | 69国产| 天天综合网在线 | 亚洲精品国产美女 | 日韩 国产| 成人蜜桃av | 亚洲精视频 | 玖玖网 | 久久精品国产亚洲7777 | aa视频免费观看 | 久久黄色av | 人妻妺妺窝人体色www聚色窝 | 我们的2018中文免费看 | 美女诱惑一区 | 青青操网 | 伊人网国产 | 一本大道av | 四虎黄网| 色哟哟免费视频 | 特级西西444www高清大视频 | 婷婷亚洲视频 | 岛国一区二区 | 超碰免费成人 | 又黄又骚又爽 | 久久精品国产一区二区 | 毛片少妇| 国产无遮无挡120秒 欧美综合图片 | 成人久色 | 国产粉嫩呻吟一区二区三区 | 激情国产视频 | 最新黄色网址在线观看 | 特黄一级视频 |