ActiveMQ的消息存储(八)
1.隊列存儲
Queues采取先進先出模式,同一時間,消息只會發送給某一個消費者,只有當該消息被消費并告知已收到時,它才能在代理的存儲中被刪除。 而對于持久性Topic來說,每一個消費者都會獲取消息的拷貝。為了節約空間,代理的存儲介質中只存儲了一份消息,存儲介質的持久訂閱對象為其以后的被存儲的消息維護了一個指針,消費者消費時,從存儲介質中復制一個消息,指針指向這個消息。消息被所有訂閱者獲取后才能刪除2.文件存儲(kaha存儲)
從ActiveMQ5.3后,推薦使用KahaDB存儲普通用途的消息。KahaDB是一種基于文件的消息存儲機制,為了提高消息存儲的可靠性和可恢復性,它整合了一個事務日志。KahaDB擁有高性能和可擴展性等特點。由于KahaDB使用的是基于文件的存儲,所以不需要使用第三方數據庫。
KahaDB消息存儲機制為所有目的地使用一個索引,其索引使用一個事務日志。
KahaDB被使用在10000個活動連接的產品環境,每個連接都擁有一個分割的隊列。
KahaDB可以通過配置或硬代碼實現,配置文件在/config/activemq.xml中,以下是配置方式:
這個directory文件在apache-activitymq-xxx/data/kahadb中
這個文件的目錄結構如下圖所示:
archive directory 這個文件只有當archiveDataLogs屬性為true時才會存在,用來存儲KahaDb不再需要但是需要進行恢復時才會用到這個archived中的數據。
db.data 里面記錄著B樹的索引,指向log文件的位置。
db.redo 當硬件發生意外時,用來恢復B樹的索引。
可選擇的配置
3.文件存儲(AMQ存儲)
AMQ消息存儲與KahaDB消息存儲類似,由一個提供可靠持續性的事務日志以及高效索引組成,當一個應用中,消息吞吐量是主要需求時,AMQ是最好的選擇。但由于它為每個索引使用了兩個分隔文件,而每個目的地都有一個索引,所以它不能被使用于每個代理擁有成百上千個隊列的情況。ActiveMQ代理沒有被完全關閉時,索引的覆蓋也會很慢。這是因為所有的索引都需要被重建。
AMQ的使用方式
<?xml version="1.0" encoding="UTF-8"?> <beans><broker xmlns="http://activemq.apache.org/schema/core"><persistenceAdapter> <amqPersistenceAdapter directory="target/Broker2-data/activemq-data" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /></persistenceAdapter> </broker> </beans>AMQ存儲的配置選項:
AMQ的文件存儲結構
lock file保證只有一個broker可以訪問
tmp-storage存儲臨時文件,一般是非持久化消息
The data directory負責儲存消息在log中的引用的索引
The state directory負責記錄Topic消費者的狀態
The archive directory和kaha的目錄一樣
The journal directory儲存數據,data-control里面有元數據信息,還會記錄每個數據的引用次數,消息傳遞完成后,會被刪除
4.JDBC存儲
jdbc存儲數據需要用到三個表
ACTIVITY-MSGS負責儲存Topic和Queues的數據
ACTIVITY-ACKS負責管理Message的狀態
ACTIVEMQ_LOCK負責控制只能有一個broker訪問數據庫的三個表
<?xml version="1.0" encoding="UTF-8"?> <beans><broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"><persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds"/></persistenceAdapter> </broker><bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>5.內存存儲方式
方式一:
<?xml version="1.0" encoding="UTF-8"?> <beans><broker brokerName="test-broker"persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors></broker> </beans>方式二:
import org.apache.activemq.broker.BrokerService; public void createEmbeddedBroker() throws Exception { BrokerService broker = new BrokerService(); //configure the broker to use the Memory Store broker.setPersistent(false);//Add a transport connectorbroker.addConnector("tcp://localhost:61616");//now start the brokerbroker.start(); }- -
總結
以上是生活随笔為你收集整理的ActiveMQ的消息存储(八)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ActiveMQ的network con
- 下一篇: ActiveMQ的安全配置(九)