ActiveMQ(六)——ActiveMQ的消息存儲

1、隊列和topicjava

  • 概述
    ActiveMQ不只支持persistent和non-persistent兩種方式,還支持消息的恢復(recovery)方式
  • PTP
    Queue的存儲方式很簡單,就是一個FIFO(先進先出)的Queue
    ActiveMQ(六)——ActiveMQ的消息存儲
  • PUB/SUB
    對於持久化訂閱主題,每個消費者將得到一個消息的複製
    ActiveMQ(六)——ActiveMQ的消息存儲
  • 有效的消息存儲
      ActiveMQ提供了一個插件式的消息存儲,相似於消息的多點傳播,主要實現了以下幾種:
    1:AMQ消息存儲一基於文件的存儲方式,是之前的默認消息存儲
    2:KahaDB消息存儲一提供了容量的提高和恢復能力,是如今的默認存儲方式
    3:JDBC消息存儲一消息基於JDBC存儲的
    4:Memory消息存儲一基於內存的消息存儲

2、KahaDBmysql

  • KahaDB Message Store概述
        KahaDB是目前默認的存儲方式,可用於任何場景,提升了性能和恢復能力。消息存儲使用一個事務日誌和僅僅用一個索引文件來存儲它全部的地址。
        KahaDB是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。在Kaha中,數據被追加到datalogs中。當再也不須要log文件中的數的時候,log文件會被丟棄。
  • KahaDB基本配置例子sql

    <persistenceAdapter>
    <kahaDB directory="${actlvemq.data}/kahadb"/>
    </persistenceAdapter>

    可用的屬性有:
    1:director:KahaDB存放的路徑,默認值activemq-data
    2:indexWriteBatchSize:批量寫入磁盤的索引page數量,默認值1000
    3:indexCacheSize:內存中緩存索引page的數量,默認值10000
    4:enableIndexWriteAsync:是否異步寫出索引,默認false
    5:journalMaxFi1eLength:設置每一個消息data log的大小,默認是32M
    6:enab1eJournalDiskSyncs:設置是否保證每一個沒有事務的內容,被同步寫入磁盤,JMS持久化的時候須要,默認爲true
    7:cleanupInterval:在檢查到再也不使用的消後,在具體刪除消息前的時間,默認30000
    8:checkpointInterval:checkpoint的間隔時間,默認5000
    9:ignoreMissingJournalfiles:是否忽略丟失的消息日誌文件,默認false
    10:checkForCourruptJournalFiles:在啓動的時候,將會驗證消息文件是否損壞,默認爲false
    11:checksumJournalFiles:是否爲每一個消息日誌文件提供checksum,默認false
    12:archiveDataLogs:是否移動文件到特定的路徑,而不是刪除它們,默認false
    13:directoryArchive:定義消息已經被消費後,移動data log到的路徑,默認爲null
    14:databaseLockedWaitDelay:得到數據庫鎖的等待時間,默認10000
    15:maxAsyncJobs:設置最大的能夠存儲的異步消息隊列,默認10000,能夠和concurrent MessageProducers設置成同樣的值
    16:concurrentStoreAndDispatchTransactions:是否分發消息到客戶端,同時事務存儲消息,默認true
    17:concurrentStoreAndDispatchTopics:是否分發Topic消息到客戶端,同時進行存儲,默認true
    18:concurrentStoreAndDispatchQueues:是否分發queue消息到客戶端,同時進行存儲,默認true數據庫

  • 在Java中內嵌使用Broker,使用KahaDB的例子apache

    public class EmbeddedBrokerUsingKahaDBStoreExample {
    public BrokerService createEmbeddedBroker()throws Exception{
        BrokerService broker = new BrokerService();
        File dataFileDir = new File("target/amq-in-action/kahadb");
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalMaxFileLength(1024*1000);
        kaha.setIndexWriteBatchSize(100);
        kaha.setEnableIndexWriteAsync(true);
    
        broker.setPersistenceAdapter(kaha);
        broker.addConnector("tcp://localhost:61616");
        broker.start();
        return broker;
    }
    }

3、AMQ緩存

  • AMQ Message Store概述
        AMQ Message Store 是ActiveMQ5.0缺省的持久化存儲,它是一個基於文件、事務存儲設計爲快速消息存儲的一個結構,該結構是以流的形式來進行消息交互的。
        這種方式中,Messages被保存到data logs中,同時被reference store進行索引以提升存取速度。Data logs由一些單獨的data log文件組成,缺省的文件大小是32M,若是某個消息的大小超過了data log文件的大小,那麼能夠修改配置以增長data log文件的大小。若是某個data log文件中全部的消息都被成功消費了,那麼這個data log文件將會被標記,以便在下一輪的清理中被刪除或者歸檔。
  • AMQ Message Store配置示例
    <broker brokerName="broker" persistent="true" useShutdownHook="false">
    <persistenceAdapter>
        <amqPersistenceAdapter driectory="${activemq.base}/data"    maxFileLength="32mb"/>
    </persistenceAdapter>
    </broker>

    4、 JDBC異步

  • 使用JDBC來持久化消息(此步驟不須要手工跑腳本)
    ActiveMQ支持使用JDBC來持久化消息,預約義的表以下:
    1:消息表,缺省代表爲ACTIVEMQ_MSGS,quue和topic都存在裏面,結構以下:
    ActiveMQ(六)——ActiveMQ的消息存儲
    2:ACTIVEMQ_ACKS表存儲持久訂閱的信息和最後一個持久訂閱接收的消息ID,結構以下:
    ActiveMQ(六)——ActiveMQ的消息存儲
    3:鎖定表,缺省代表爲ACTIVEMQ_LOCK,用來確保在某一時刻,只能有一個Act broker實例來訪問數據庫,結構以下:
    ActiveMQ(六)——ActiveMQ的消息存儲
  • 使用JDBC來持久化消息的配置示例
    ActiveMQ(六)——ActiveMQ的消息存儲

注意:
(1)數據庫須要字符集設置爲latin1。
(2)須要把mysql-connector-java.jar包放入lib中。
(3)啓動成功以後會出現三張表。tcp

  • 示例
    1、(queue模式):
    ActiveMQ(六)——ActiveMQ的消息存儲
    ActiveMQ(六)——ActiveMQ的消息存儲
    運行發送者(數據庫中有三條未接收的消息):
    ActiveMQ(六)——ActiveMQ的消息存儲
    運行接收者(消息成功接收的同時,數據庫中的消息也會被刪除)

2、persistence模式
消息接收者接收完成以後,數據庫中的消息不會被刪除。
如圖所示:
ActiveMQ(六)——ActiveMQ的消息存儲ide

  • JDBC Message Store with ActiveMQ Journal(日誌)
    這種方式克服了JDBC Store的不足,使用快速的緩存寫入技術,大大提升了性能。配置示例以下:性能

    <beans>
    <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceFactory>
            <journalPersistenceAdapterFactory 
                journalLogFiles="4"
                journalLogFileSize="32768"
                useJournal="true"
                useQuickJournal="true"
                dataSource="#derby-ds"
                dataDirectory="activemq-data"/>         
        </persistenceFactory>
    </broker>
    </beans>
  • JDBC Store和JDBC Message Store with ActiveMQ Journal的區別
    1:jdbc with journal的性能優於jdbc
    2:jdbc用於master/slave模式的數據庫分享
    3:jdbc with journal不能用於master/slave模式
    4:通常狀況下(非集羣狀態下),推薦使用jdbc with journal

5、 MMS

  • Memory Message Store
        內存消息存儲主要是存儲全部的持久化的消息在內存中。這裏沒有動態的緩存存在,因此必需要注意設置broker所在的JVM和內存限制。
  • Memory Message Store 配置示例
    <beans>
    <broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors uri="tcp://localhost:61616"/>
        </transportConnectors>
    </broker>
    </beans>
  • 在Java中內嵌使用Broker,使用Memory的例子
    public void createEmbeddedBroker()throws Exception{
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector("tcp://localhost:61616");
    broker.start();
    }
相關文章
相關標籤/搜索