ActiveMQ Messages Storejava
ActiveMQ消息的持久化,在不少須要同步的消息的下很關鍵,當一個消費者掛了後,確保之前消息可以接受,這個時候就必須用到消息的持久化存儲。ActiveMQ主要提供瞭如下幾種策略進行配置:mysql
1.The KahaDB message storesql
先看一下java 代碼的編寫數據庫
BrokerService broker = new BrokerService(); File dataFileDir = new File("../amq-in-action/kahadb1"); KahaDBStore kaha = new KahaDBStore(); kaha.setDirectory(dataFileDir); kaha.setJournalMaxFileLength(1024*1204*10); kaha.setConcurrentStoreAndDispatchTopics(true); kaha.setIndexWriteBatchSize(100); kaha.setEnableIndexWriteAsync(false); broker.setPersistenceAdapter(kaha);//使用KahaDB,作持久化存儲 broker.addConnector("tcp://localhost:61618");
很明顯咱們能夠看到,KahaDB 是一個基於文件的持久化策略。看一些內部實現的原理:緩存
從上圖咱們能夠看到,咱們指定的目錄下,生成了幾個文件:db.data db.redo db-1.log。
異步
在db-xx.log文件中記錄的是:一個消息和命令滾動日誌(如事務邊界和信息刪除等),當超過文件存儲最大長度就會再建一個心得db.log文件。tcp
db.data文件是:消息轉變成持久性B樹索引的數據。spa
db.redo 文件是:用於從新修改db.data中B樹索引結構的,用於回收db-xx.log的這些文件。
日誌
值得一體的是,cache是用來緩存數據的,沒有第一時間寫入磁盤,有多個生成者的時候,在對共同消息修改時,會寫到磁盤。在消費者馬上響應收到的狀況下,不寫入磁盤當即丟棄。
code
一些比較關鍵的參數配置:
Property name | Default value | Description |
directory | activemq-data | 設置的目錄位置及名字 |
indexWriteBatchSize | 1000 | 批處理磁盤上寫的索引頁的數量 |
indexCacheSize | 10000 | 在內存中緩存頁面的數量 |
enableIndexWriteAsync | false | 可否異步寫入索引 |
journalMaxFileLength | 32mb | 每個db-xx.log日誌文件的大小 |
enableJournalDiskSyncs | true | 磁盤同步寫數據 |
cleanupInterval | 30000 | 多少時間去清理一些存儲但再也不使用的文件數據 |
checkpointInterval | 5000 | 檢測內部關鍵點(不知道用處) |
ignoreMissingJournalfiles | false | 若是啓用,將忽略丟失的日誌文件 |
2.The AMQ message store
暫時略過,和第一個很想,有時間再補上。
3.The JDBC message store
//配置數據源 BasicDataSource datasource=new BasicDataSource(); datasource.setDriverClassName("com.mysql.jdbc.Driver"); datasource.setUrl("jdbc:mysql://localhost/activemq?relaxAutoCommit=true"); datasource.setUsername("root"); datasource.setPassword("xxxx"); datasource.setMaxActive(20); datasource.setPoolPreparedStatements(true); JDBCPersistenceAdapter jpa=new JDBCPersistenceAdapter(); jpa.setDataSource(datasource); /* DefaultJDBCAdapter dja=new DefaultJDBCAdapter();*/ /*JDBCTopicMessageStore jts=new JDBCTopicMessageStore(jpa,dja, null, null, null);*/ BrokerService broker = new BrokerService(); broker.setPersistenceAdapter(jpa); broker.addConnector("tcp://localhost:61618"); broker.start();
4.The memory message store