ActiveMq筆記2-消息持久化

爲了不意外宕機之後丟失信息,須要作到重啓後能夠恢復消息隊列,消息系統通常都會採用持久化機制。 ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,mysql

不管使用哪一種持久化方式,消息的存儲邏輯都是一致的。也就是說發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,而後試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啓動之後首先要檢查指定的存儲位置,若是有未發送成功的消息,則須要把消息發送出去。sql

 

1.KahaDB:數據庫

    1.KahaDB是從ActiveMQ 5.4開始默認的持久化插件,也是咱們項目如今使用的持久化方式。apache

    2.KahaDb恢復時間遠遠小於其前身AMQ而且使用更少的數據文件,因此能夠徹底代替AMQ。
    3.kahaDB的持久化機制一樣是基於日誌文件,索引和緩存。緩存

 

    (1)KahaDB主要特性:服務器

        一、日誌形式存儲消息;
        二、消息索引以B-Tree數據結構存儲,能夠快速更新;
        三、徹底支持JMS事務;
        四、支持多種恢復機制數據結構

 

    配置KahaDB 能夠在ActiveMq.xml中配置,配置方式以下圖:性能

    

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>
directory : 指定持久化消息的存儲目錄
journalMaxFileLength : 指定保存消息的日誌文件大小,具體根據你的實際應用配置

    KahaDB的結構以下:url

        消息存儲在基於文件的數據日誌中。若是消息發送成功,變標記爲可刪除的。系統會週期性的清除或者歸檔日誌文件。
        消息文件的位置索引存儲在內存中,這樣能快速定位到。按期將內存中的消息索引保存到metadata store中,避免大量消息未發送時,消息索引佔用過多內存空間。spa

 

    

    

    Data logs:
      Data logs用於存儲消息日誌,消息的所有內容都在Data logs中。
      同AMQ同樣,一個Data logs文件大小超過規定的最大值,會新建一個文件。一樣是文件尾部追加,寫入性能很快。
      每一個消息在Data logs中有計數引用,因此當一個文件裏全部的消息都不須要了,系統會自動刪除文件或放入歸檔文件夾。

    Metadata cache :
      緩存用於存放在線消費者的消息。若是消費者已經快速的消費完成,那麼這些消息就不須要再寫入磁盤了,不放在磁盤,這樣也就不用刷磁盤,效率大大提升。
      Btree索引會根據MessageID建立索引,用於快速的查找消息。這個索引一樣維護持久化訂閱者與Destination的關係,以及每一個消費者消費消息的指針。

    Metadata store 
      在db.data文件中保存消息日誌中消息的元數據,也是以B-Tree結構存儲的,定時從Metadata cache更新數據。Metadata store中也會備份一些在消息日誌中存在的信息,

      這樣可讓Broker實例快速啓動。
      即使metadata store文件被破壞或者誤刪除了。broker能夠讀取Data logs恢復過來,只是速度會相對較慢些。

   注意一點:ActiveMq默認是打開持久化開關的,發送消息到達Broker後,Broker不會着急把消息發出去,而是先把這個消息持久化,即把消息存儲在一個可靠的介質上面後,纔會發送消息出去,而且返回ACK,若是消息沒持久化成功,就算消息到達Broker後,也不會發送ACK確認給生產者。

  若是咱們不想持久化,咱們能夠進行一些配置以下圖:

    

 

      或者在代碼中設置,以下圖:

    

     如今若是咱們將Queue的消費者幹掉,看一下AcitveMq的持久化,以下圖:

      

 

    再重啓項目,從新發送消息:以下圖:

  

 

 

    

    能夠看到五條消息入隊,沒有消息出隊。

    舉個例子,好比有商家搞活動,好比秒殺,瞬間讓消息達到峯值,忽然CPU撐爆了,消息隊列宕機了,當時因爲咱們設置的消息是持久化的,若是咱們重啓機器後,這些被持久化

    下來的消息還在,而後從介質中把消息拿出來從新發出去。

    如今咱們從新把消費者配置回來,是否還有消息進隊,以下圖:

    

  再看一下工程運行狀況,以下圖:

  

    能夠看到消息發出去了,而且被消費者消費了。

    

    Topic持久化也相似,

        1.Topic持久化訂閱者,就是一直在線,能夠隨時接受消息。就比如QQ一直在線,別人發消息我隨時能接收,而且看到。

          Topic持久化訂閱以下:

            

 

        2.Topic非持久化訂閱者。生產者發消息,會把消息存在Broker裏面,等你上線了再把消息發過去。比如QQ處於離線狀態,等我上線了,就立馬接收消息。

          Topic非持久化訂閱以下:

          

 

 

2.JDBC持久化:

    使用JDBC持久化方式,數據庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中。

    

  (1)配置方式

  配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml文件,

  首先定義一個mysql-ds的MySQL數據源,而後在persistenceAdapter節點中配置jdbcPersistenceAdapter而且引用剛纔定義的數據源。

  以下代碼配置:

    

<persistenceAdapter> 
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 
</persistenceAdapter>

  dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。
  使用MySQL配置JDBC持久化:

  

<beans>
    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="activemq"/>
        <property name="password" value="activemq"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
</beans>

  

  (2)數據庫表信息
    activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中:
    ID:自增的數據庫主鍵
    CONTAINER:消息的Destination
    MSGID_PROD:消息發送者客戶端的主鍵
    MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ能夠組成JMS的MessageID
    EXPIRATION:消息的過時時間,存儲的是從1970-01-01到如今的毫秒數
    MSG:消息本體的Java序列化對象的二進制數據
    PRIORITY:優先級,從0-9,數值越大優先級越高

    activemq_acks用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存:
    主要的數據庫字段以下:
      CONTAINER:消息的Destination
      SUB_DEST:若是是使用Static集羣,這個字段會有集羣其餘系統的信息
      CLIENT_ID:每一個訂閱者都必須有一個惟一的客戶端ID用以區分
      SUB_NAME:訂閱者名稱
      SELECTOR:選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性AND和OR操做
      LAST_ACKED_ID:記錄消費過的消息的ID。

    表activemq_lock在集羣環境中才有用,只有一個Broker能夠得到消息,稱爲Master Broker,
    其餘的只能做爲備份等待Master Broker不可用,纔可能成爲下一個Master Broker。
    這個表用於記錄哪一個Broker是當前的Master Broker。

 

3.Activemq方式:性能高於JDBC,寫入消息時,會將消息寫入日誌文件,因爲是順序追加寫,性能很高。爲了提高性能,建立消息主鍵索引,而且提供緩存機制,進一步提高性能。每一個日誌文件的大小都是有限制的(默認32m,可自行配置)。
當超過這個大小,系統會從新創建一個文件。當全部的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。
主要的缺點是AMQ Message會爲每個Destination建立一個索引,若是使用了大量的Queue,索引文件的大小會佔用不少磁盤空間。
並且因爲索引巨大,一旦Broker崩潰,重建索引的速度會很是慢。

  配置以下:

  

<persistenceAdapter>
     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

雖然AMQ性能略高於上面的Kaha DB方式,可是因爲其重建索引時間過長,並且索引文件佔用磁盤空間過大,因此已經不推薦使用。

 

4.LevelDB方式:從ActiveMQ 5.6版本以後,又推出了LevelDB的持久化引擎。目前默認的持久化方式仍然是KahaDB,不過LevelDB持久化性能高於KahaDB,多是之後的趨勢。在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。

相關文章
相關標籤/搜索