ActiveMQ 消息存儲

深刻淺出 JMS(四) - ActiveMQ 消息存儲

 1、消息的存儲方式

ActiveMQ 支持 JMS 規範中的持久化消息與非持久化消息mysql

  • 持久化消息一般用於不論是否消費者在線,它們都會保證消息會被消費者消費。當消息被確認消費後,會從存儲中刪除
  • 非持久化消息一般用於發送通知以及實時數據,一般要求性能優先,消息可靠性並非必須的狀況
  • MQ 支持可插拔式的消息存儲,如:內存、文件和關係數據庫等方式

Queue 消息模型在 ActiveMQ 的存儲:sql

  採用存儲採用先進先出(FIFO),一個消息只能被一個消費者消費,當消息被確認消費以後纔會被刪除。數據庫

Topic消息模型(針對持久訂閱):apache

  每一個訂閱者獲取的消息實際是消息的一個副本,只有一個消息副本會被存儲,MQ 提供了一個指針來指向消息存儲而且分發消息副本到訂閱者,消息直到全部的持久化訂閱者都被接收才能被刪除。緩存

持久化存儲方式:服務器

  • KahaDB 消息存儲
  • AMQ 消息存儲
  • JDBC 消息存儲
  • 內存消息存儲

2、KahaDB 存儲方式

  KahaDB 是從 ActiveMQ 5.4 開始默認的持久化插件。KahaDb 恢復時間遠遠小於其前身 AMQ 而且使用更少的數據文件,因此能夠徹底代替 AMQ,kahaDB 的持久化機制一樣是基於日誌文件,索引和緩存。性能

(一)KahaDB 主要特性:

  • 日誌形式存儲消息;
  • 消息索引以 B-Tree 結構存儲,能夠快速更新;
  • 徹底支持 JMS 事務;
  • 支持多種恢復機制;

(二)適用場景:

高吞吐量的應用程序
存儲大數據量的消息大數據

(三)配置方式 conf/activemq.xml:

<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>

圖4.1 KahaDB

(四)KahaDB 存儲原理

當有活動消費者時,用於臨時存儲,消息會被髮送給消費着,同時被安排將被存儲,若是消息及時被確認,就不須要寫入到磁盤。寫入到磁盤中的數據消息,在後續的消息活動中,若是消息發送成功,變標記爲可刪除的。系統會週期性的清除或者歸檔日誌文件。url

(1) KahaDB 內部結構

圖4.1 KahaDB內部結構

  • Data logs:消息日誌包含了消息日誌和一些命令
  • Cache:當有活動消費者時,用於臨時存儲,消息會被髮送給消費着,同時被安排將被存儲,若是消息及時被確認,這不須要寫入到磁盤
  • Btree indexes(消息索引):用於引用消息日誌(message id),它存儲在內存中,這樣能快速定位到。MQ會按期將內存中的消息索引保存到 metadata store 中,避免大量消息未發送時,消息索引佔用過多內存空間。
  • Redo log:用於在非正常關機狀況下維護索引完整性。

(2) 目錄結構

圖4.1 KahaDB目錄結構

  • Db log files:用於存儲消息(默認大小32M),當 log 日誌滿了,會建立一個新的,當 log 日誌中的消息都被刪除,該日誌文件會被刪除或者歸檔。
  • Archive directory:當 datalog 不在被 kahadb 須要會被歸檔(經過 archiveDataLogs 屬性控制)。
  • Db.data:存放 Btree indexs。
  • Db.redo:存放 redo file,用於恢復 Btree indexs。

3、AMQ 消息存儲

寫入消息時,會將消息寫入日誌文件,因爲是順序追加寫,性能很高。爲了提高性能,建立消息主鍵索引,而且提供緩存機制,進一步提高性能。每一個日誌文件的大小都是有限制的(默認32m,可自行配置)。當超過這個大小,系統會從新創建一個文件。當全部的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。主要的缺點是 AMQ Message 會爲每個 Destination 建立一個索引,若是使用了大量的 Queue,索引文件的大小會佔用不少磁盤空間。並且因爲索引巨大,一旦 Broker 崩潰,重建索引的速度會很是慢。spa

特色:相似 KahaDB,也包含了事務日誌,每一個 destination 都包含一個 index 文件,AMQ 適用於高吞吐量的應用場景,可是不適合多個隊列的場景。

配置方式 conf/activemq.xml:

<!--AMQ directory:數據存儲路徑 syncOnWrite:是否同步寫入 maxFileLength:日誌文件大小 --> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/AMQdb" syncOnWrite="true" maxFileLength="10mb" /> </persistenceAdapter>

(1) AMQ內部結構

圖4.3 AMQ內部結構

  • Data logs:消息日誌包含了消息日誌
  • Cache:用於消息的快速檢索
  • Reference store indexes:用於引用 datalogs 中的消息,經過 message ID 關聯

(2) 目錄結構

圖4.4 AMQ目錄結構

  • Lock:保證同一時間只有一個 borker 訪問文件目錄
  • temp-storag:用於存儲非持久化消息(當不在被存儲在內存中),如等待慢消費者處理消息
  • Kr-store:用於存儲引用消息日誌數據
  • journal directory:包含了消息文件、消息日誌和消息控制信息
  • Archive:歸檔的數據日誌

4、JDBC存儲

支持經過 JDBC 將消息存儲到關係數據庫,性能上不如文件存儲,能經過關係型數據庫查詢到消息的信息。

MQ 支持的數據庫:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。

存儲表結構:

表 1:ACTIVEMQ_MSGS:用於存儲消息,Queue 和 Topic 都存儲在這個表中:

字段 說明
ID 自增的數據庫主鍵
CONTAINER 消息的Destination
MSGID_PROD 消息發送者客戶端的主鍵
MSG_SEQ 是發送消息的順序,MSGID_PROD+MSG_SEQ能夠組成JMS的MessageID
EXPIRATION 消息的過時時間,存儲的是從1970-01-01到如今的毫秒數
MSG 消息本體的Java序列化對象的二進制數據
PRIORITY 優先級,從0-9,數值越大優先級越高

表 2:ACTIVEMQ_ACKS:用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存:

字段 說明
CONTAINER 消息的Destination
SUB_DEST 若是是使用Static集羣,這個字段會有集羣其餘系統的信息
CLIENT_ID 每一個訂閱者都必須有一個惟一的客戶端ID用以區分
SUB_NAME 訂閱者名稱
SELECTOR 選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性AND和OR操做
LAST_ACKED_ID 記錄消費過的消息的ID

表 3:ACTIVEMQ_LOCK(消息鎖,保證同一時間只能有一個broker訪問這些表結構):

表 activemq_lock 在集羣環境中才有用,只有一個 Broker 能夠得到消息,稱爲 Master Broker,其餘的只能做爲備份等待 Master Broker 不可用,纔可能成爲下一個 Master Broker。這個表用於記錄哪一個 Broker 是當前的 Master Broker。

配置方式:

一、配置數據源 conf/acticvemq.xml 文件:

<!-- 配置數據源--> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="111111"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>

二、配置 broke 中的 persistenceAdapter

dataSource 指定持久化數據庫的 bean,createTablesOnStartup 是否在啓動的時候建立數據表,默認值是 true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲 true,以後改爲 false。

<!-- JDBC配置 --> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/> </persistenceAdapter> 

ps:數據庫 activemq 須要手動建立。

5、內存消息存儲

內存消息存儲,會將全部的持久化消息存儲在內存中,必須注意JVM使用狀況以及內存限制,適用於一些能快速消費的數據量不大的小消息,當MQ關閉或者宕機,未被消費的內存消息會被清空。

配置方式 設置 broker屬性值 persistent="false":

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false"/>

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索