ActiveMQ學習筆記03 - 消息持久化

ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,還有一種內存存儲的方式,因爲內存不屬於持久化範疇,並且若是使用內存隊列,能夠考慮使用更合適的產品,如ZeroMQ。因此內存存儲不在討論範圍內。html

不管使用哪一種持久化方式,消息的存儲邏輯都是一致的。java

消息分爲Queue和Topic兩種,Queue是點對點消費,發送者發送一條消息,只有一個且惟一的一個消費者能對其進行消費。mysql

Topic是訂閱式消費,一個消息能夠被不少的訂閱者消費,其中定閱者又分爲持久化訂閱和非持久化訂閱。持久化訂閱是指即便訂閱者當前不在線,其訂閱以後,發送方發到Broker的消息,也會在持久化訂閱者再次上線的時候完成消費,不會丟失消息。而非持久化訂閱者,只有訂閱者在線時纔會消費,不在線時,即便Broker收到新的消息,當其再次上線時,也不會收到錯過的消息。sql

ActiveMQ的持久化機制,對於Queue類型的消息,將存儲在Broker,可是一旦其中一個消費者完成消費,則當即刪除這條消息。對於Topic類型的消息,即便全部的訂閱者都完成了消費,Broker也不必定會立刻刪除無用消息,而是保留推送歷史,以後會異步清除無用消息。而每一個訂閱者消費到了哪條消息的offset會記錄在Broker,以避免下次重複消費。由於消息是順序消費,先進先出,因此只須要記錄上次消息消費到哪裏就能夠了。數據庫

配置持久化的方式,都是修改%ACTIVEMQ_HOME%conf/acticvemq.xml文件。apache

下面分別介紹幾種持久化方式的特色:緩存

JDBC:不少企業級應用比較喜歡這種存儲方式。優勢是大多數企業都有專門的DBA,以數據庫做爲存儲介質,會讓有這方面人才的公司比較放心。另外,數據庫的存儲方式,能夠看到消息是如何存儲的,能夠經過SQL查詢消息消費狀態,能夠查看消息內容,這是其餘持久化方式所不具有的。還有一個優勢就是數據庫能夠支持強一致性事務,支持兩階段提交的分佈式事務。缺點是性能問題,數據庫持久化是性能最低的一種方式。服務器

之因此最早介紹數據庫的持久化方式,是由於咱們能夠經過表結構很好的理解ActiveMQ是怎麼存儲和消費消息的。數據結構

數據庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。異步

activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中。

下面介紹一下主要的數據庫字段:

ID:自增的數據庫主鍵

CONTAINER:消息的Destination

MSGID_PROD:消息發送者客戶端的主鍵

MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ能夠組成JMS的MessageID

EXPIRATION:消息的過時時間,存儲的是從1970-01-01到如今的毫秒數

MSG:消息本體的Java序列化對象的二進制數據

PRIORITY:優先級,從0-9,數值越大優先級越高

activemq_acks用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存。

主要的數據庫字段以下:

CONTAINER:消息的Destination

SUB_DEST:若是是使用Static集羣,這個字段會有集羣其餘系統的信息

CLIENT_ID:每一個訂閱者都必須有一個惟一的客戶端ID用以區分

SUB_NAME:訂閱者名稱

SELECTOR:選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性AND和OR操做

LAST_ACKED_ID:記錄消費過的消息的ID。

表activemq_lock在集羣環境中才有用,只有一個Broker能夠得到消息,稱爲Master Broker,其餘的只能做爲備份等待Master Broker不可用,纔可能成爲下一個Master Broker。這個表用於記錄哪一個Broker是當前的Master Broker。

配置以下:

<beans>
    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"/> 
        </persistenceAdapter>
    </broker>
    
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
        <property name="username" value="activemq"/> 
        <property name="password" value="activemq"/> 
        <property name="maxActive" value="200"/> 
        <property name="poolPreparedStatements" value="true"/> 
    </bean>
</beans>

首先定義一個mysql-ds的MySQL數據源,而後在persistenceAdapter節點中配置jdbcPersistenceAdapter而且引用剛纔定義的數據源。

AMQ:性能高於JDBC,寫入消息時,會將消息寫入日誌文件,因爲是順序追加寫,性能很高。爲了提高性能,建立消息主鍵索引,而且提供緩存機制,進一步提高性能。每一個日誌文件的大小都是有限制的(默認32m,可自行配置)。當超過這個大小,系統會從新創建一個文件。當全部的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。主要的缺點是AMQ Message會爲每個Destination建立一個索引,若是使用了大量的Queue,索引文件的大小會佔用不少磁盤空間。並且因爲索引巨大,一旦Broker崩潰,重建索引的速度會很是慢。

配置片斷以下:

<persistenceAdapter>
     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

雖然AMQ性能略高於Kaha DB,可是因爲其重建索引時間過長,並且索引文件佔用磁盤空間過大,因此已經不推薦使用。這裏就不在詳細介紹AMQ持久化的數據結構。在新版本的ActiveMQ中,AMQ已經被刪除。

KahaDB:從ActiveMQ 5.4開始默認的持久化插件,KahaDb恢復時間遠遠小於其前身AMQ而且使用更少的數據文件,因此能夠徹底代替AMQ。kahaDB的持久化機制和AMQ很是像。一樣是基於日誌文件,索引和緩存。和AMQ不一樣,KahaDB全部的Destination都使用一個索引文件。《ActiveMQ In Action》表示其能夠支持10000個鏈接,每一個鏈接都是一個獨立的Queue,足以知足大部分應用場景。

Data logs用於存儲消息日誌,消息的所有內容都在Data logs中。同AMQ同樣,一個Data logs文件大小超過規定的最大值,會新建一個文件。一樣是文件尾部追加,寫入性能很快。每一個消息在Data logs中有計數引用,因此當一個文件裏全部的消息都不須要了,系統會自動刪除文件或放入歸檔文件夾。

緩存用於存放在線消費者的消息。若是消費者已經快速的消費完成,那麼這些消息就不須要再寫入磁盤了。

Btree索引會根據MessageID建立索引,用於快速的查找消息。這個索引一樣維護持久化訂閱者與Destination的關係,以及每一個消費者消費消息的指針。

Redo log用於系統崩潰後,重建Btree索引。由於Redo log的存在,使得重建索引時不須要讀取Data logs的全量數據,大大提高性能。

KahaDB的目錄結構:

db log文件,以db-<Number>.log命名。archive目錄用於存檔歸檔的數據。db.data和db.redo分別是Btree索引和redo log。

因爲是ActiveMQ的默認持久化機制,因此不須要修改配置文件就可使用KahaDB,可是仍是貼出配置片斷:

<persistenceAdapter> 
    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/> 
</persistenceAdapter>

LevelDB:從ActiveMQ 5.6版本以後,又推出了LevelDB的持久化引擎。LevelDB持久化性能高於KahaDB,雖然目前默認的持久化方式仍然是KahaDB,可是LevelDB是未來的趨勢。而且,在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。LevelDB使用自定義的索引代替經常使用的BTree索引。

經過上圖能夠看出LevelDB主要由6部分組成:內存中的MemTable和ImmutableMemTable,還有硬盤上的log文件,manifest文件,current文件和SSTable文件。還有一些其餘的輔助文件,暫時不作說明。

每寫入一次數據,須要寫入log文件,和MemTable,也就是說,只須要一次硬盤的順序寫入,和一個內存寫入,若是系統崩潰,能夠經過log文件恢復數據。每次寫入會先寫log文件,後寫MemTable來保證不丟失數據。

當MemTable到達內存閥值,LevelDB會建立一個新的MemTable和log文件,而舊的MemTable會變成ImmutableMemTable,ImmutableMemTable的內容是隻讀的。而後系統會定時的異步的把ImmutableMemTable的數據寫入新的SSTable文件。

SSTable文件和MemTable,ImmutableMemTable的數據結構相同,都是key,value的數據,按照key排序。

manifest文件用於記錄每一個SSTable的key的起始值和結束值,有點相似於B-tree索引。而manifest一樣會生成新文件,舊的文件再也不使用。current文件就是指定哪一個manifest文件是如今正在使用的。

更具體實現原理可參見:http://www.cnblogs.com/haippy/archive/2011/12/04/2276064.html

配置片斷以下:

<persistenceAdapter>
    <levelDB directory="${activemq.data}/activemq-data"/>
</persistenceAdapter>

在目前的ActiveMQ 5.10版本中,直接使用LevelDB會致使服務不能啓動,拋出java.io.IOException: com.google.common.base.Objects.firstNonNull(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;

緣由是有兩個Guava cache致使版本衝突,解決的辦法是:

  1. 刪除%ACTIVEMQ_HOME%lib下面的pax-url-aether-1.5.2.jar

  2. 註釋掉%ACTIVEMQ_HOME%conf/activemq.xml的下面幾行:

<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>

這個BUG地址是https://issues.apache.org/jira/browse/AMQ-5225,但願能夠在下個版本順利解決。


下面是跑在我機器上的性能測試,實際數據意義不大,由於每一個環境的配置都不一樣,可是能夠經過對比看出幾種持久化方式的性能對比。


發送1000條消息(毫秒) 發送10000條消息(毫秒) 消費1000條消息的時間(毫秒) 消費10000條消息的時間(毫秒)
JDBC-Mysql 43009 369802 610 509338
KahaDB 34227 360493 208 2224
LevelDB 34032 347712 220 2877

經過這個表格能夠看出來,發送消息LevelDB最快,KahaDB稍微慢點,JDBC最慢,可是也不會慢太多,是一個數量級。消費消息,KahaDB最快,LevelDB稍微慢點,JDBC慢的讓人不能忍受,差好幾個數量級。LevelDB並無顯現出比KahaDB更多速度上的優點。可是因爲LevelDB支持高可用的複製數據,因此首選確定仍是LevelDB。

相關文章
相關標籤/搜索