ActiveMQ不只支持persistent和non-persistent兩種方式,還支持消息的recovery(恢復)方式。html
Queue的存儲是很簡單的,就是一個FIFO的Queuejava
對於持久化訂閱主題,每個消費者將得到一個消息的複製。mysql
ActiveMQ提供了一個插件式的消息存儲,相似於消息的多點傳播,主要實現了以下幾種:spring
1.AMQ消息存儲-基於文件的存儲,之前默認的存儲方式sql
2.KahaDB消息存儲-提供了容量的提高和恢復能力,如今的默認方式數據庫
3.JDBC消息存儲-消息基於JDBC存儲apache
4.Memory消息存儲-基於內存的消息存儲緩存
(1)kahaDB Message Store概述session
KahaDB是目前默認的存儲方式,可用於任何場景,提升了性能和恢復能力。消息存儲使用一個事務日誌和僅僅用一個索引文件來存儲它全部的地址。
KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在kaha中,數據被追加到 data logs中。 當再也不須要log文件中的數據的時候,log文件會被丟棄。異步
(2)基本配置方式: 在activemq的安裝目錄下的:conf/activemq.xml中有以下配置:(默認配置)
<!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
如上面配置了文件的位置,如今咱們查看kahadb的文件位置:
(3)可用的屬性有:
1. director: KahaDB存放的路徑,默認值activemq-data
2. indexWriteBatchSize: 批量寫入磁盤的索引page數量,默認值爲1000
3. indexCacheSize: 內存中緩存索引page的數量,默認值10000
4. enableIndexWriteAsync: 是否異步寫出索引,默認false
5. journalMaxFileLength: 設置每一個消息data log的大小,默認是32MB
6. enableJournalDiskSyncs: 設置是否保證每一個沒有事務的內容,被同步寫入磁盤,JMS持久化的時候須要,默認爲true
7. cleanupInterval: 在檢查到再也不使用的消息後,在具體刪除消息前的時間,默認30000
8. checkpointInterval: checkpoint的間隔時間,默認是5000
9. ignoreMissingJournalfiles: 是否忽略丟失的消息日誌文件,默認false
10. checkForCorruptJournalFiles: 在啓動的時候,將會驗證消息文件是否損壞,默認false
11. checksumJournalFiles: 是否爲每一個消息日誌文件提供checksum,默認false
12. archiveDataLogs: 是否移動文件到特定的路徑,而不是刪除它們,默認false
13. directoryArchive: 定義消息已經被消費事後,移動data log到的路徑,默認null
14. databaseLockedWaitDelay: 得到數據庫鎖的等待時間(used by shared master/slave),默認10000。用於以後主從複製的時候配置
15. maxAsyncJobs: 設置最大的能夠存儲的異步消息隊列,默認值10000,能夠和concurrent MessageProducers設置成同樣的值。
16. concurrentStoreAndDispatchTransactions:是否分發消息到客戶端,同時事務存儲消息,默認true
17. concurrentStoreAndDispatchTopics: 是否分發Topic消息到客戶端,同時進行存儲,默認true
18. concurrentStoreAndDispatchQueues: 是否分發queue消息到客戶端,同時進行存儲,默認true
(4)Java內嵌Broker使用kahadb的例子:
package cn.qlq.activemq.broker; import java.io.File; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.kahadb.KahaDBStore; public class BrokerUeingKahadb { public static void main(String[] args) throws Exception { BrokerService brokerService = new BrokerService(); File dataKahadbFile = new File("data/kahadb"); KahaDBStore kahaDBStore = new KahaDBStore(); kahaDBStore.setDirectory(dataKahadbFile); kahaDBStore.setJournalDiskSyncInterval(1024 * 1000); kahaDBStore.setIndexWriteBatchSize(100); kahaDBStore.setEnableIndexWriteAsync(true); brokerService.setPersistenceAdapter(kahaDBStore); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }
(1)概述:
AMQ Message Store是ActiveMQ5.0缺省的持久化存儲,它是一個基於文件、事務存儲設計爲快速消息存儲的一個結構,該結構是以流的形式來進行消息交互的。
這種方式中,Messages被保存到data logs中,同時被reference store進行索引以提升存取速度。Data logs由一些簡單的data log文件組成,缺省的文件大小是32M,若是某個消息的大小超過了data log文件的大小,那麼能夠修改配置以增長data log文件的大小。若是某個data log文件中全部的消息都被成功消費了,那麼這個data log文件將會被標記,以便在下一輪的清理中被刪除或者歸檔。
(2)配置示例以下:(用下面的配置方式替換掉activemq.xml的 persistenceAdapter 便可,也就是該配置須要做爲broker的屬性)
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/data" maxFileLength="32mb"/> </persistenceAdapter>
ActiveMQ支持使用JDBC來持久化消息,咱們只須要配置JDBC驅動便可,至於表結構activemq會自動幫咱們建好表結構。
我使用的activemq的版本是:5.15.8,個人配置方式以下:
(1)拷貝mysql的驅動包 mysql-connector-java-5.1.37-bin.jar 到 apache-activemq-5.15.8\lib\optional\ 目錄下
(2)修改apache-activemq-5.15.8\conf\activemq.xml
首先定義dataSource(該dataSource位於apache-activemq-5.15.8\lib\optional\ commons-dbcp2-2.1.1.jar包內,固然能夠換成咱們本身的c3p0,durid等鏈接池)
<bean id="mysql_ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName"> <value>com.mysql.jdbc.Driver</value> </property> <property name="url"> <value>jdbc:mysql://localhost:3306/activemq</value> </property> <property name="username"> <value>root</value> </property> <property name="password"> <value>123456</value> </property> </bean>
修改broker的persistenceAdapter持久化方式:
<persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/> </persistenceAdapter>
(3)啓動項目發現activemq會自動幫咱們建立好表結構,以下三張表:
mysql> show tables; +--------------------+ | Tables_in_activemq | +--------------------+ | activemq_acks | | activemq_lock | | activemq_msgs | +--------------------+ 3 rows in set (0.00 sec) mysql> desc activemq_msgs; +------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +------------+--------------+------+-----+---------+-------+ | ID | bigint(20) | NO | PRI | NULL | | | CONTAINER | varchar(250) | NO | MUL | NULL | | | MSGID_PROD | varchar(250) | YES | MUL | NULL | | | MSGID_SEQ | bigint(20) | YES | | NULL | | | EXPIRATION | bigint(20) | YES | MUL | NULL | | | MSG | longblob | YES | | NULL | | | PRIORITY | bigint(20) | YES | MUL | NULL | | | XID | varchar(250) | YES | MUL | NULL | | +------------+--------------+------+-----+---------+-------+ 8 rows in set (0.00 sec) mysql> desc activemq_acks; +---------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------------+--------------+------+-----+---------+-------+ | CONTAINER | varchar(250) | NO | PRI | NULL | | | SUB_DEST | varchar(250) | YES | | NULL | | | CLIENT_ID | varchar(250) | NO | PRI | NULL | | | SUB_NAME | varchar(250) | NO | PRI | NULL | | | SELECTOR | varchar(250) | YES | | NULL | | | LAST_ACKED_ID | bigint(20) | YES | | NULL | | | PRIORITY | bigint(20) | NO | PRI | 5 | | | XID | varchar(250) | YES | MUL | NULL | | +---------------+--------------+------+-----+---------+-------+ 8 rows in set (0.00 sec) mysql> desc activemq_lock; +-------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------+--------------+------+-----+---------+-------+ | ID | bigint(20) | NO | PRI | NULL | | | TIME | bigint(20) | YES | | NULL | | | BROKER_NAME | varchar(250) | YES | | NULL | | +-------------+--------------+------+-----+---------+-------+ 3 rows in set (0.00 sec)
其建表語句以下:(能夠看出acks表是多列作聯合主鍵)
mysql> show create table activemq_acks\G *************************** 1. row *************************** Table: activemq_acks Create Table: CREATE TABLE `activemq_acks` ( `CONTAINER` varchar(250) NOT NULL, `SUB_DEST` varchar(250) DEFAULT NULL, `CLIENT_ID` varchar(250) NOT NULL, `SUB_NAME` varchar(250) NOT NULL, `SELECTOR` varchar(250) DEFAULT NULL, `LAST_ACKED_ID` bigint(20) DEFAULT NULL, `PRIORITY` bigint(20) NOT NULL DEFAULT '5', `XID` varchar(250) DEFAULT NULL, PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`), KEY `ACTIVEMQ_ACKS_XIDX` (`XID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 1 row in set (0.00 sec) mysql> show create table activemq_msgs\G *************************** 1. row *************************** Table: activemq_msgs Create Table: CREATE TABLE `activemq_msgs` ( `ID` bigint(20) NOT NULL, `CONTAINER` varchar(250) NOT NULL, `MSGID_PROD` varchar(250) DEFAULT NULL, `MSGID_SEQ` bigint(20) DEFAULT NULL, `EXPIRATION` bigint(20) DEFAULT NULL, `MSG` longblob, `PRIORITY` bigint(20) DEFAULT NULL, `XID` varchar(250) DEFAULT NULL, PRIMARY KEY (`ID`), KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`), KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`), KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`), KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`), KEY `ACTIVEMQ_MSGS_XIDX` (`XID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 1 row in set (0.00 sec) mysql> show create table activemq_lock\G *************************** 1. row *************************** Table: activemq_lock Create Table: CREATE TABLE `activemq_lock` ( `ID` bigint(20) NOT NULL, `TIME` bigint(20) DEFAULT NULL, `BROKER_NAME` varchar(250) DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 1 row in set (0.00 sec)
解釋上面三張表:
1.消息表,缺省代表爲ACTIVEMQ_MSGS, queue和topic都存儲在裏面
ID:自增主鍵; CONTAINER:所屬類型以及所在隊列或者主題; MSGID_PROD:生產者ID;MSGID_SEQ:在同一批次消息中的序號; EXPIRATION:過時時間(表示永不過時); MSG:序列化以後的消息; PRIORITY優先級; XID:暫時不知道這個有什麼用。
2.ACTIVEMQ_ACKS表存儲持久訂閱的信息和最後一個持久訂閱接收的消息ID
CONTAINER:與上面消息表的CONTAINER同樣; SUB_DEST:子目的地,與CONTAINER同樣;
CLIENT_ID: 連接的客戶端ID,也就是咱們程序:connection.setClientID("cc1"); 產生的ID
SUB_NAME:持久訂閱者的名稱.也就是咱們程序: session.createDurableSubscriber(destination, "C11"); 產生的名稱
SELECTOR:消息選擇器,consumer能夠選擇本身想要的
LAST_ACKED_ID:最後一次確認ID,這個字段存的該該訂閱者最後一次收到的消息的ID
XID:暫時不知道這個有什麼用。
3.鎖定表,缺省表名爲ACTIVEMQ_LOCK,用來確保在某一時刻,只能有一個ActiveMQ broker實例來訪問數據庫
XID:自增的主鍵
TIME:日期
BROKER_NAME:佔用數據庫的brokerName
(4)測試持久化存儲和訂閱
lock表一直是隻有一條數據:
mysql> select * from activemq_lock; +----+------+-------------+ | ID | TIME | BROKER_NAME | +----+------+-------------+ | 1 | NULL | NULL | +----+------+-------------+ 1 row in set (0.00 sec)
咱們按照以下順序進行操做:
消息表以下:
acks表以下:
補充:Java內嵌Broker使用JDBC持久化存儲
(1)依賴的jar包
(2)broker啓動的代碼
package cn.qlq.activemq; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.commons.dbcp2.BasicDataSource; public class BrokerUsingJDBC { public static void main(String[] args) throws Exception { BrokerService brokerService = new BrokerService(); // 1.建立數據源 BasicDataSource dataSource = new BasicDataSource(); dataSource.setUrl("jdbc:mysql://localhost:3306/activemq"); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); // 2.建立JDBCPersistenceAdapter JDBCPersistenceAdapter Adapter = new JDBCPersistenceAdapter(); Adapter.setDataSource(dataSource); brokerService.setBrokerName("brokerName"); brokerService.setUseJmx(true); brokerService.setPersistenceAdapter(Adapter); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }
測試結果以及代碼同上面。關於整合spring的broker啓動方式也是相似注入正確的bean便可。
這種方式克服了JDBC Store的不足,使用快速的緩存寫入技術,大大提升了性能。
JDBC 配合其自帶的 high performance journal;根據官方說法,它內置的高性能journal的工做相似於在緩存層工做,消息會優先寫入到journal,後臺的定時任務會每隔一段時間間隔去。
JDBC Store和JDBC Message Store with ActiveMQ Journal的區別:
1. JDBC with journal的性能優於jdbc
2. JDBC用於master/slave模式的數據庫分享
3. JDBC with journal不能用於master/slave模式
4. 通常狀況下,推薦使用jdbc with journal
其配置方式以下:註釋掉原來的持久化適配器,並注入持久化工廠
<!-- <persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/> </persistenceAdapter> --> <persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql_ds" dataDirectory="activemq-data"/> </persistenceFactory>
(1)須要的jar包:
(2)Broker啓動類:
package cn.qlq.activemq; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.journal.JournalPersistenceAdapterFactory; import org.apache.commons.dbcp2.BasicDataSource; public class BrokerUsingJDBC { public static void main(String[] args) throws Exception { BrokerService brokerService = new BrokerService(); // 1.建立數據源 BasicDataSource dataSource = new BasicDataSource(); dataSource.setUrl("jdbc:mysql://localhost:3306/activemq"); dataSource.setUsername("root"); dataSource.setPassword("123456"); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); // 2.建立PersistenceAdapterFactory JournalPersistenceAdapterFactory persistenceFactory = new JournalPersistenceAdapterFactory(); persistenceFactory.setDataSource(dataSource); persistenceFactory.setJournalLogFiles(4); persistenceFactory.setJournalLogFileSize(32768); persistenceFactory.setUseJournal(true); persistenceFactory.setUseQuickJournal(true); // 3.設置持久化工廠並啓動broker brokerService.setBrokerName("brokerName"); brokerService.setUseJmx(true); brokerService.setPersistenceFactory(persistenceFactory); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }
內存消息存儲主要是存儲全部的持久化的消息在內存中。這裏沒有動態的緩存存在,因此你必須注意設置你的broker所在的JVM和內存限制。
這種方式的持久化消息只在當前JVM內有效,當重啓JVM以後會丟失持久化的消息。
配置方式以下:只須要將 persistent 屬性設爲false便可。
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="brokerName" dataDirectory="${activemq.data}">
其內嵌Java的broker方式以下:
BrokerService brokerService = new BrokerService(); brokerService.setPersistent(false); brokerService.setBrokerName("brokerName"); brokerService.setUseJmx(true); brokerService.addConnector("tcp://localhost:61616"); brokerService.start();