JMS 之 Active MQ 消息存儲

1、消息的存儲方式

ActiveMQ支持JMS規範中的持久化消息與非持久化消息mysql

  • 持久化消息一般用於不論是否消費者在線,它們都會保證消息會被消費者消費。當消息被確認消費後,會從存儲中刪除
  • 非持久化消息一般用於發送通知以及實時數據,一般要求性能優先,消息可靠性並非必須的狀況
  • MQ支持可插拔式的消息存儲,如:內存、文件和關係數據庫等方式

Queue消息模型在ActiveMQ的存儲sql

  採用存儲採用先進先出(FIFO),一個消息只能被一個消費者消費,當消息被確認消費以後纔會被刪除。數據庫

Topic消息模型(針對持久訂閱)
  每一個訂閱者獲取的消息實際是消息的一個副本,只有一個消息副本會被存儲,MQ提供了一個指針來指向消息存儲而且分發消息副本到訂閱者,消息直到全部的持久化訂閱者都被接收才能被刪除。apache

持久化存儲方式:緩存

  1. KahaDB消息存儲
  2. AMQ消息存儲
  3. JDBC消息存儲
  4. 內存消息存儲

2、KahaDB存儲方式

  KahaDB是從ActiveMQ 5.4開始默認的持久化插件。KahaDb恢復時間遠遠小於其前身AMQ而且使用更少的數據文件,因此能夠徹底代替AMQ,kahaDB的持久化機制一樣是基於日誌文件,索引和緩存。服務器

(一)、KahaDB主要特性:性能

  1. 日誌形式存儲消息;
  2. 消息索引以B-Tree結構存儲,能夠快速更新;
  3. 徹底支持JMS事務;
  4. 支持多種恢復機制;

(二)、適用場景:大數據

  1. 高吞吐量的應用程序
  2. 存儲大數據量的消息

(三)、配置方式 conf/activemq.xml:url

       <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

(四)、KahaDB存儲原理:spa

    當有活動消費者時,用於臨時存儲,消息會被髮送給消費着,同時被安排將被存儲,若是消息及時被確認,就不須要寫入到磁盤。寫入到磁盤中的數據消息,在後續的消息活動中,若是消息發送成功,變標記爲可刪除的。系統會週期性的清除或者歸檔日誌文件。

  一、KahaDB內部結構

Data logs:消息日誌包含了消息日誌和一些命令
Cache:當有活動消費者時,用於臨時存儲,消息會被髮送給消費着,同時被安排將被存儲,若是消息及時被確認,這不須要寫入到磁盤
Btree indexes(消息索引):用於引用消息日誌(message id),它存儲在內存中,這樣能快速定位到。MQ會按期將內存中的消息索引保存到metadata store中,避免大量消息未發送時,消息索引佔用過多內存空間。
Redo log用於在非正常關機狀況下維護索引完整性。

二、目錄結構:

 

Db log files:用於存儲消息(默認大小32M),當log日誌滿了,會建立一個新的,當log日誌中的消息都被刪除,該日誌文件會被刪除或者歸檔。
Archive directory:當datalog不在被kahadb須要會被歸檔(經過archiveDataLogs屬性控制)。
Db.data:存放Btree indexs。
Db.redo:存放redo file,用於恢復Btree indexs。

3、AMQ消息存儲

  寫入消息時,會將消息寫入日誌文件,因爲是順序追加寫,性能很高。爲了提高性能,建立消息主鍵索引,而且提供緩存機制,進一步提高性能。每一個日誌文件的大小都是有限制的(默認32m,可自行配置)。當超過這個大小,系統會從新創建一個文件。當全部的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。主要的缺點是AMQ Message會爲每個Destination建立一個索引,若是使用了大量的Queue,索引文件的大小會佔用不少磁盤空間。並且因爲索引巨大,一旦Broker崩潰,重建索引的速度會很是慢。

特色:相似KahaDB,也包含了事務日誌,每一個destination都包含一個index文件,AMQ適用於高吞吐量的應用場景,可是不適合多個隊列的場景。

 配置方式conf/activemq.xml:

       <!--AMQ    directory:數據存儲路徑 syncOnWrite:是否同步寫入  maxFileLength:日誌文件大小 -->
        <persistenceAdapter>
            <amqPersistenceAdapter
                directory="${activemq.data}/AMQdb"
                syncOnWrite="true"
                maxFileLength="10mb" />
        </persistenceAdapter>

一、AMQ內部結構:

 

Data logs:消息日誌包含了消息日誌
Cache:用於消息的快速檢索
Reference store indexes:用於引用datalogs中的消息,經過message ID 關聯

二、目錄結構:

Lock:保證同一時間只有一個borker訪問文件目錄
temp-storag:用於存儲非持久化消息(當不在被存儲在內存中),如等待慢消費者處理消息
Kr-store:用於存儲引用消息日誌數據
journal directory:包含了消息文件、消息日誌和消息控制信息
Archive:歸檔的數據日誌

4、JDBC存儲

支持經過JDBC將消息存儲到關係數據庫,性能上不如文件存儲,能經過關係型數據庫查詢到消息的信息。

MQ支持的數據庫:Apache Derby、MYsql、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。

存儲表結構:

A、ACTIVEMQ_MSGS:用於存儲消息,Queue和Topic都存儲在這個表中:

  • ID:自增的數據庫主鍵
  • CONTAINER:消息的Destination
  • MSGID_PROD:消息發送者客戶端的主鍵
  • MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ能夠組成JMS的MessageID
  • EXPIRATION:消息的過時時間,存儲的是從1970-01-01到如今的毫秒數
  • MSG:消息本體的Java序列化對象的二進制數據
  • PRIORITY:優先級,從0-9,數值越大優先級越高

B、ACTIVEMQ_ACKS:用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存:

  • 主要的數據庫字段以下:
  • CONTAINER:消息的Destination
  • SUB_DEST:若是是使用Static集羣,這個字段會有集羣其餘系統的信息
  • CLIENT_ID:每一個訂閱者都必須有一個惟一的客戶端ID用以區分
  • SUB_NAME:訂閱者名稱
  • SELECTOR:選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性AND和OR操做
  • LAST_ACKED_ID:記錄消費過的消息的ID。

C、ACTIVEMQ_LOCK(消息鎖,保證同一時間只能有一個broker訪問這些表結構):
        表activemq_lock在集羣環境中才有用,只有一個Broker能夠得到消息,稱爲Master Broker,其餘的只能做爲備份等待Master Broker不可用,纔可能成爲下一個Master Broker。這個表用於記錄哪一個Broker是當前的Master Broker。

 

配置方式:

一、配置數據源 conf/acticvemq.xml文件:

 <!-- 配置數據源-->
      <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="111111"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>

二、配置broke中的persistenceAdapter :

dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。

 <!-- JDBC配置 -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds"  createTablesOnStartup="false"/>
        </persistenceAdapter> 

ps:數據庫activemq 須要手動建立。

5、內存消息存儲

內存消息存儲,會將全部的持久化消息存儲在內存中,必須注意JVM使用狀況以及內存限制,適用於一些能快速消費的數據量不大的小消息,當MQ關閉或者宕機,未被消費的內存消息會被清空。

配置方式 設置 broker屬性值  persistent="false":

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false">
相關文章
相關標籤/搜索