ActiveMQ持久化方式(转)
?
消息持久性對于可靠消息傳遞來說應(yīng)該是一種比較好的方法,有了消息持久化,即使發(fā)送者和接受者不是同時在線或者消息中心在發(fā)送者發(fā)送消息后宕機了,在消息 中心重新啟動后仍然可以將消息發(fā)送出去,如果把這種持久化和ReliableMessaging結(jié)合起來應(yīng)該是很好的保證了消息的可靠傳送。
消息持久性的原理很簡單,就是在發(fā)送者將消息發(fā)送出去后,消息中心首先將消息存儲到本地數(shù)據(jù)文件、內(nèi)存數(shù)據(jù)庫或者遠(yuǎn)程數(shù)據(jù)庫等,然后試圖將消息發(fā)送 給接收者,發(fā)送成功則將消息從存儲中刪除,失敗則繼續(xù)嘗試。消息中心啟動以后首先要檢查制定的存儲位置,如果有未發(fā)送成功的消息,則需要把消息發(fā)送出去。
ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。
1、AMQ
AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復(fù)的特點。消息存儲在一個個文件中,文件的默認(rèn)大小為32M,如果一條消息的大小超過了 32M,那么這個值必須設(shè)置大一點。當(dāng)一個存儲文件中的消息已經(jīng)全部被消費,那么這個文件將被標(biāo)識為可刪除,在下一個清除階段,這個文件被刪除。AMQ適用于ActiveMQ5.3之前的版本。默認(rèn)配置如下:
| 1 2 3 | <persistenceAdapter> ???<amqPersistenceAdapter?directory="activemq-data"maxFileLength="32mb"/> </persistenceAdapter> |
屬性如下:
| 屬性名稱 | 默認(rèn)值 | 描述 |
| directory | activemq-data | 消息文件和日志的存儲目錄 |
| useNIO | true | 使用NIO協(xié)議存儲消息 |
| syncOnWrite | false | 同步寫到磁盤,這個選項對性能影響非常大 |
| maxFileLength | 32Mb | 一個消息文件的大小 |
| persistentIndex | true | 消息索引的持久化,如果為false,那么索引保存在內(nèi)存中 |
| maxCheckpointMessageAddSize | 4kb | 一個事務(wù)允許的最大消息量 |
| cleanupInterval | 30000 | 清除操作周期,單位ms |
| indexBinSize | 1024 | 索引文件緩存頁面數(shù),缺省為1024,當(dāng)amq擴充或者縮減存儲時,會鎖定整個broker,導(dǎo)致一定時間的阻塞,所以這個值應(yīng)該調(diào)整到比較大,但是代碼中實現(xiàn)會動態(tài)伸縮,調(diào)整效果并不理想。 |
| indexKeySize | 96 | 索引key的大小,key是消息ID |
| indexPageSize | 16kb | 索引的頁大小 |
| directoryArchive | archive | 存儲被歸檔的消息文件目錄 |
| archiveDataLogs | false | 當(dāng)為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
2、KahaDB
KahaDB是基于文件的本地數(shù)據(jù)庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復(fù)的時間比AMQ短,從5.4版本之后KahaDB做為默認(rèn)的持久化方式。默認(rèn)配置如下:
KahaDB的屬性如下:
| 屬性名稱 | 默認(rèn)值 | 描述 |
| directory | activemq-data | 消息文件和日志的存儲目錄 |
| indexWriteBatchSize | 1000 | 一批索引的大小,當(dāng)要更新的索引量到達(dá)這個值時,更新到消息文件中 |
| indexCacheSize | 10000 | 內(nèi)存中,索引的頁大小 |
| enableIndexWriteAsync | false | 索引是否異步寫到消息文件中 |
| journalMaxFileLength | 32mb | 一個消息文件的大小 |
| enableJournalDiskSyncs | true | 是否講非事務(wù)的消息同步寫入到磁盤 |
| cleanupInterval | 30000 | 清除操作周期,單位ms |
| checkpointInterval | 5000 | 索引寫入到消息文件的周期,單位ms |
| ignoreMissingJournalfiles | false | 忽略丟失的消息文件,false,當(dāng)丟失了消息文件,啟動異常 |
| checkForCorruptJournalFiles | false | 檢查消息文件是否損壞,true,檢查發(fā)現(xiàn)損壞會嘗試修復(fù) |
| checksumJournalFiles | false | 產(chǎn)生一個checksum,以便能夠檢測journal文件是否損壞。 |
| 5.4版本之后有效的屬性: | ? | ? |
| archiveDataLogs | false | 當(dāng)為true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除 |
| directoryArchive | null | 存儲被歸檔的消息文件目錄 |
| databaseLockedWaitDelay | 10000 | 在使用負(fù)載時,等待獲得文件鎖的延遲時間,單位ms |
| maxAsyncJobs | 10000 | 同個生產(chǎn)者產(chǎn)生等待寫入的異步消息最大量 |
| concurrentStoreAndDispatchTopics | false | 當(dāng)寫入消息的時候,是否轉(zhuǎn)發(fā)主題消息 |
| concurrentStoreAndDispatchQueues | true | 當(dāng)寫入消息的時候,是否轉(zhuǎn)發(fā)隊列消息 |
| 5.6版本之后有效的屬性: | ? | ? |
| archiveCorruptedIndex | false | 是否歸檔錯誤的索引 |
每個KahaDB的實例都可以配置單獨的適配器,如果沒有目標(biāo)隊列提交給filteredKahaDB,那么意味著對所有的隊列有效。如果一個隊列沒有對應(yīng)的適配器,那么將會拋出一個異常。配置如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | <persistenceAdapter> ??<mKahaDBdirectory="${activemq.base}/data/kahadb"> ????<filteredPersistenceAdapters> ??????<!--?match?all?queues?--> ??????<filteredKahaDBqueue=">"> ????????<persistenceAdapter> ??????????<kahaDBjournalMaxFileLength="32mb"/> ????????</persistenceAdapter> ??????</filteredKahaDB> ??????? ??????<!--?match?all?destinations?--> ??????<filteredKahaDB> ????????<persistenceAdapter> ??????????<kahaDBenableJournalDiskSyncs="false"/> ????????</persistenceAdapter> ??????</filteredKahaDB> ????</filteredPersistenceAdapters> ??</mKahaDB> </persistenceAdapter> |
如果filteredKahaDB的perDestination屬性設(shè)置為true,那么匹配的目標(biāo)隊列將會得到自己對應(yīng)的KahaDB實例。配置如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 | <persistenceAdapter> ??<mKahaDBdirectory="${activemq.base}/data/kahadb"> ????<filteredPersistenceAdapters> ??????<!--?kahaDB?per?destinations?--> ??????<filteredKahaDB?perDestination="true"> ????????<persistenceAdapter> ??????????<kahaDBjournalMaxFileLength="32mb"?/> ????????</persistenceAdapter> ??????</filteredKahaDB> ????</filteredPersistenceAdapters> ??</mKahaDB> </persistenceAdapter> |
3、JDBC
可以將消息存儲到數(shù)據(jù)庫中,例如:Mysql、SQL Server、Oracle、DB2。
配置JDBC適配器:
| 1 2 3 | <persistenceAdapter> ????<jdbcPersistenceAdapterdataSource="#mysql-ds"?createTablesOnStartup="false"?/> </persistenceAdapter> |
dataSource指定持久化數(shù)據(jù)庫的bean,createTablesOnStartup是否在啟動的時候創(chuàng)建數(shù)據(jù)表,默認(rèn)值是true,這樣每次啟動都會去創(chuàng)建數(shù)據(jù)表了,一般是第一次啟動的時候設(shè)置為true,之后改成false。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | Mysql持久化bean: <bean?id="mysql-ds"?class="org.apache.commons.dbcp.BasicDataSource"?destroy-method="close"> ????<property?name="driverClassName"?value="com.mysql.jdbc.Driver"/> ????<property?name="url"?value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> ????<property?name="username"?value="activemq"/> ????<property?name="password"?value="activemq"/> ????<property?name="poolPreparedStatements"?value="true"/> </bean> SQL?Server持久化bean: <bean?id="mssql-ds"?class="net.sourceforge.jtds.jdbcx.JtdsDataSource"?destroy-method="close"> ???<property?name="serverName"?value="SERVERNAME"/> ???<property?name="portNumber"?value="PORTNUMBER"/> ???<property?name="databaseName"?value="DATABASENAME"/> ???<property?name="user"?value="USER"/> ???<property?name="password"?value="PASSWORD"/> </bean> Oracle持久化bean: <bean?id="oracle-ds"?class="org.apache.commons.dbcp.BasicDataSource"?destroy-method="close"> ????<property?name="driverClassName"?value="oracle.jdbc.driver.OracleDriver"/> ????<property?name="url"?value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/> ????<property?name="username"?value="activemq"/> ????<property?name="password"?value="activemq"/> ????<property?name="maxActive"?value="200"/> ????<property?name="poolPreparedStatements"?value="true"/> </bean> DB2持久化bean: <bean?id="db2-ds"?class="org.apache.commons.dbcp.BasicDataSource"??destroy-method="close"> ??????<property?name="driverClassName"?value="com.ibm.db2.jcc.DB2Driver"/> ??????<property?name="url"?value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/> ??????<property?name="username"?value="activemq"/> ??????<property?name="password"?value="activemq"/> ??????<property?name="maxActive"?value="200"/> ??????<property?name="poolPreparedStatements"?value="true"/> ??</bean> |
4、LevelDB
這種文件系統(tǒng)是從ActiveMQ5.8之后引進的,它和KahaDB非常相似,也是基于文件的本地數(shù)據(jù)庫儲存形式,但是它提供比KahaDB更快的持久性。與KahaDB不同的是,它不是使用傳統(tǒng)的B-樹來實現(xiàn)對日志數(shù)據(jù)的提前寫,而是使用基于索引的LevelDB。
默認(rèn)配置如下:
?
| 1 2 3 | <?persistenceAdapter?> ???????<?levelDBdirectory?=?"activemq-data"?/> </?persistenceAdapter?> |
屬性如下:
| 屬性名稱 | 默認(rèn)值 | 描述 |
| directory | "LevelDB" | 數(shù)據(jù)文件的存儲目錄 |
| readThreads | 10 | 系統(tǒng)允許的并發(fā)讀線程數(shù)量 |
| sync | true | 同步寫到磁盤 |
| logSize | 104857600 (100 MB) | 日志文件大小的最大值 |
| logWriteBufferSize | 4194304 (4 MB) | 日志數(shù)據(jù)寫入文件系統(tǒng)的最大緩存值 |
| verifyChecksums | false | 是否對從文件系統(tǒng)中讀取的數(shù)據(jù)進行校驗 |
| paranoidChecks | false | 盡快對系統(tǒng)內(nèi)部發(fā)生的存儲錯誤進行標(biāo)記 |
| indexFactory | org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory | 在創(chuàng)建LevelDB索引時使用 |
| indexMaxOpenFiles | 1000 | 可供索引使用的打開文件的數(shù)量 |
| indexBlockRestartInterval | 16 | Number keys between restart points for delta encoding of keys. |
| indexWriteBufferSize | 6291456 (6 MB) | 內(nèi)存中索引數(shù)據(jù)的最大值 |
| indexBlockSize | 4096 (4 K) | 每個數(shù)據(jù)塊的索引數(shù)據(jù)大小 |
| indexCacheSize | 268435456 (256 MB) | 使用緩存索引塊允許的最大內(nèi)存 |
| indexCompression | snappy | 適用于索引塊的壓縮類型 |
| logCompression | none | 適用于日志記錄的壓縮類型 |
5、? 下面詳細(xì)介紹一下如何將消息持久化到Mysql數(shù)據(jù)庫中
?????????需要將mysql的驅(qū)動包放置到ActiveMQ的lib目錄下
?????????修改activeMQ的配置文件:
?
| 1 2 3 | <?persistenceAdapter?> <?jdbcPersistenceAdapter?dataDirectory?=?"${activemq.base}/data"?dataSource?=?"#mysql-ds"?createTablesOnStartup?=?"false"?/> </?persistenceAdapter?> |
在配置文件中的broker節(jié)點外增加:
?
| 1 2 3 4 5 6 7 8 | <?beanid?=?"mysql-ds"?class?=?"org.apache.commons.dbcp.BasicDataSource"?destroy-method?=?"close"?> ???????<?propertyname?=?"driverClassName"?value?=?"com.mysql.jdbc.Driver"?/> ???????<?property?name?=?"url"?value?=?"jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"?/> ???????<?property?name?=?"username"?value?=?"root"?/> ???????<?property?name?=?"password"?value?=?"root"?/> ???????<?property?name?=?"maxActive"?value?=?"200"?/> ???????<?propertyname?=?"poolPreparedStatements"?value?=?"true"?/> </?bean?> |
從配置中可以看出數(shù)據(jù)庫的名稱是activemq,需要手動在MySql中建立這個數(shù)據(jù)庫。
然后重新啟動activeMQ,會發(fā)現(xiàn)activemq多了三張表:
1:activemq_acks
2:activemq_lock
3:activemq_msgs
?????????點到點類型
Sender類:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.DeliveryMode; import?javax.jms.Destination; import?javax.jms.JMSException; import?javax.jms.MessageProducer; import?javax.jms.Session; import?javax.jms.TextMessage; import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Sender { private?static?final?int?SEND_NUMBER =?2000?; ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ????????ConnectionFactory connectionFactory; ????????// Connection :JMS客戶端到JMS Provider的連接 ????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ????????Session session; ????????// Destination :消息的目的地;消息發(fā)送給誰. ????????Destination destination; ????????// MessageProducer:消息發(fā)送者 ????????MessageProducer producer; ?????????// TextMessage message; ?????????// 構(gòu)造ConnectionFactory實例對象,此處采用ActiveMq的實現(xiàn) ????????connectionFactory =?new?ActiveMQConnectionFactory( ???????????????ActiveMQConnection.DEFAULT_USER, ???????????????ActiveMQConnection.DEFAULT_PASSWORD, ???????????????"tcp://localhost:61616"?); ????????try?{ ????????????// 構(gòu)造從工廠得到連接對象 ????????????connection = connectionFactory.createConnection(); ????????????//啟動 ????????????connection.start(); ????????????//獲取操作連接 ????????????session = connection.createSession(?false?, Session.AUTO_ACKNOWLEDGE); ????????????//獲取session,FirstQueue是一個服務(wù)器的queue??????????????? destination = session.createQueue("FirstQueue"); ????????????// 得到消息生成者【發(fā)送者】 ????????????producer = session.createProducer(destination); ????????????//設(shè)置不持久化 ????????????producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ????????????//構(gòu)造消息 ????????????sendMessage(session, producer); ????????????//session.commit(); ????????????connection.close(); ????????} ????????catch?(Exception e){ ????????????e.printStackTrace(); ????????}?finally?{ ????????????if?(?null?!= connection){ ???????????????try?{ ???????????????????connection.close(); ???????????????}?catch?(JMSException e) { ???????????????????// TODO Auto-generatedcatch block ???????????????????e.printStackTrace(); ???????????????} ????????????}??? ????????} ?????} ?????public?static?void?sendMessage(Session session, MessageProducer producer)?throws?Exception{ ????????for?(?int?i=?1?; i<=SEND_NUMBER; i++){ ????????????TextMessage message = session.createTextMessage(?"ActiveMQ發(fā)送消息"?+i); ????????????System.out.println(?"發(fā)送消息:ActiveMQ發(fā)送的消息"?+i); ????????????producer.send(message); ????????} ?????} } |
Receiver類:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.Destination; import?javax.jms.MessageConsumer; import?javax.jms.Session; import?javax.jms.TextMessage; import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Receiver { ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ?????????ConnectionFactory connectionFactory; ?????????// Connection :JMS客戶端到JMS Provider的連接 ?????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ?????????Session session; ?????????// Destination :消息的目的地;消息發(fā)送給誰. ?????????Destination destination; ?????????// 消費者,消息接收者 ?????????MessageConsumer consumer; ?????????connectionFactory = newActiveMQConnectionFactory( ?????????????????ActiveMQConnection.DEFAULT_USER, ?????????????????ActiveMQConnection.DEFAULT_PASSWORD, ?????????????????"tcp://localhost:61616"?); ?????????try?{ ?????????????//得到連接對象 ?????????????connection =connectionFactory.createConnection(); ?????????????// 啟動 ?????????????connection.start(); ?????????????// 獲取操作連接 ?????????????session = connection.createSession(?false?, ?????????????????????Session.AUTO_ACKNOWLEDGE); ?????????????// 創(chuàng)建Queue ????????????destination = session.createQueue(?"FirstQueue"?); ?????????????consumer =session.createConsumer(destination);??????? ?????????????while?(?true?){ ???????????????//設(shè)置接收者接收消息的時間,為了便于測試,這里定為100s ???????????????TextMessagemessage = (TextMessage)consumer.receive(?100000?); ???????????????if?(?null?!= message){ ??????????????????System.out.println(?"收到消息"?+message.getText()); ???????????????}?else?break?; ?????????????} ?????????}?catch?(Exception e){ ?????????e.printStackTrace(); ?????????}?finally?{ ?????????????try?{ ?????????????????if?(?null?!= connection) ?????????????????????connection.close(); ?????????????}?catch?(Throwable ignore) { ?????????????} ?????????} ?????} } |
測試:
測試一:
A、 先運行Sender類,待運行完畢后,運行Receiver類
B、 在此過程中activemq數(shù)據(jù)庫的activemq_msgs表中沒有數(shù)據(jù)
C、 再次運行Receiver,消費不到任何信息
測試二:
A、? 先運行Sender類
B、 重啟電腦
C、 運行Receiver類,無任何信息被消費
測試三:
A、?? 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);
B、?? 先運行Sender類,待運行完畢后,運行Receiver類
C、?? 在此過程中activemq數(shù)據(jù)庫的activemq_msgs表中有數(shù)據(jù)生成,運行完Receiver類后,數(shù)據(jù)清除
測試四:
A、??? 把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改為producer.setDeliveryMode(DeliveryMode.PERSISTENT);
B、??? 運行Sender類
C、??? 重啟電腦
D、??? 運行Receiver類,有消息被消費
結(jié)論:???
通過以上測試,可以發(fā)現(xiàn),在P2P類型中當(dāng)DeliveryMode設(shè)置為NON_PERSISTENCE時,消息被保存在內(nèi)存中,而當(dāng) DeliveryMode設(shè)置為PERSISTENCE時,消息保存在broker的相應(yīng)的文件或者數(shù)據(jù)庫中。而且P2P中消息一旦被Consumer消 費就從broker中刪除。
?????????發(fā)布/訂閱類型
Sender類:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.DeliveryMode; import?javax.jms.Destination; import?javax.jms.JMSException; import?javax.jms.MessageProducer; import?javax.jms.Session; import?javax.jms.TextMessage; import?javax.jms.Topic; import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Sender { ?????private?static?final?int?SEND_NUMBER =?100?; ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ????????ConnectionFactory connectionFactory; ????????// Connection :JMS客戶端到JMS Provider的連接 ????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ????????Session session; ????????// MessageProducer:消息發(fā)送者 ????????MessageProducer producer; ?????????// TextMessage message; ?????????// 構(gòu)造ConnectionFactory實例對象,此處采用ActiveMq的實現(xiàn) ????????connectionFactory =?new?ActiveMQConnectionFactory( ???????????????ActiveMQConnection.DEFAULT_USER, ????????????????ActiveMQConnection.DEFAULT_PASSWORD, ???????????????"tcp://localhost:61616"?); ????????try?{ ????????????//得到連接對象 ????????????connection = connectionFactory.createConnection(); ????????????//啟動 ????????????connection.start(); ????????????//獲取操作連接 ????????????session = connection.createSession(?false?, Session.AUTO_ACKNOWLEDGE);??????? ????????????Topic topic = session.createTopic(?"MQ_test"?);?????? ????????????// 得到消息生成者【發(fā)送者】 ????????????producer = session.createProducer(topic); ????????????//設(shè)置持久化 ????????????producer.setDeliveryMode(DeliveryMode.PERSISTENT); ????????????//構(gòu)造消息 ????????????sendMessage(session, producer); ????????????//session.commit(); ????????????connection.close(); ????????} ????????catch?(Exception e){ ????????????e.printStackTrace(); ????????}?finally?{ ????????????if?(?null?!= connection){ ???????????????try?{ ???????????????????connection.close(); ???????????????}?catch?(JMSException e) { ???????????????????// TODO Auto-generatedcatch block ???????????????????e.printStackTrace(); ???????????????} ????????????}??? ????????} ?????} ?????public?static?void?sendMessage(Session session, MessageProducer producer)?throws?Exception{ ????????for?(?int?i=?1?; i<=SEND_NUMBER; i++){ ????????????TextMessage message = session.createTextMessage(?"ActiveMQ發(fā)送消息"?+i); ????????????System.out.println(?"發(fā)送消息:ActiveMQ發(fā)送的消息"?+i); ????????????producer.send(message); ????????} ?????} } |
Receiver類:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | import?javax.jms.Connection; import?javax.jms.ConnectionFactory; import?javax.jms.Destination; import?javax.jms.MessageConsumer; import?javax.jms.Session; import?javax.jms.TextMessage; import?javax.jms.Topic; ?? import?org.apache.activemq.ActiveMQConnection; import?org.apache.activemq.ActiveMQConnectionFactory; public?class?Receiver { ?????public?static?void?main(String[] args) { ????????// ConnectionFactory :連接工廠,JMS用它創(chuàng)建連接 ?????????ConnectionFactory connectionFactory; ?????????// Connection :JMS客戶端到JMS Provider的連接 ?????????Connection connection =?null?; ?????????// Session:一個發(fā)送或接收消息的線程 ?????????Session session; ?????????// 消費者,消息接收者 ?????????MessageConsumer consumer; ?????????connectionFactory = newActiveMQConnectionFactory( ????????????????ActiveMQConnection.DEFAULT_USER, ?????????????????ActiveMQConnection.DEFAULT_PASSWORD, ?????????????????"tcp://localhost:61616"?); ?????????try?{ ?????????????// 構(gòu)造從工廠得到連接對象 ?????????????connection =connectionFactory.createConnection(); ????????????? ?????????????connection.setClientID(?"clientID001"?); ?????????????// 啟動 ?????????????connection.start(); ?????????????// 獲取操作連接 ?????????????session = connection.createSession(?false?, ?????????????????????Session.AUTO_ACKNOWLEDGE); ?????????????// 獲取session ????????????Topic topic = session.createTopic(?"MQ_test"?);?????? ????????????// 得到消息生成者【發(fā)送者】 ????????????consumer = session.createDurableSubscriber(topic,?"MQ_sub"?); ???????????? ?????????????while?(?true?){ ???????????????//設(shè)置接收者接收消息的時間,為了便于測試,這里誰定為100s ???????????????TextMessagemessage = (TextMessage)consumer.receive(?100000?); ???????????????if?(?null?!= message){ ??????????????????System.out.println(?"收到消息"?+message.getText()); ???????????????}?else?break?; ?????????????} ?????????}?catch?(Exception e){ ?????????e.printStackTrace(); ?????????}?finally?{ ?????????????try?{ ?????????????????if?(?null?!= connection) ?????????????????????connection.close(); ?????????????}?catch?(Throwable ignore) { ?????????????} ?????????} ?????} ?? } |
測試:
測試一:
A、先啟動Sender類
B、再啟動Receiver類
C、結(jié)果無任何記錄被訂閱
測試二:
A、先啟動Receiver類,讓Receiver在相關(guān)主題上進行訂閱
B、停止Receiver類,再啟動Sender類
C、待Sender類運行完成后,再啟動Receiver類
D、結(jié)果發(fā)現(xiàn)相應(yīng)主題的信息被訂閱
http://www.cnblogs.com/adolfmc/p/4462580.html
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ持久化方式(转)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Acess添加记录
- 下一篇: UIView CALayer