ActiveMQ入門之四--ActiveMQ持久化方式

  消息持久性對於可靠消息傳遞來講應該是一種比較好的方法,有了消息持久化,即便發送者和接受者不是同時在線或者消息中心在發送者發送消息後宕機了,在消息中心從新啓動後仍然能夠將消息發送出去,若是把這種持久化和ReliableMessaging結合起來應該是很好的保證了消息的可靠傳送。html

  消息持久性的原理很簡單,就是在發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,而後試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。消息中心啓動之後首先要檢查制定的存儲位置,若是有未發送成功的消息,則須要把消息發送出去。java

ActiveMQ持久化方式:AMQ、KahaDB、JDBC、LevelDB。mysql

一、AMQsql

AMQ是一種文件存儲形式,它具備寫入速度快和容易恢復的特色。消息存儲在一個個文件中,文件的默認大小爲32M,若是一條消息的大小超過了 32M,那麼這個值必須設置大一點。當一個存儲文件中的消息已經所有被消費,那麼這個文件將被標識爲可刪除,在下一個清除階段,這個文件被刪除。AMQ適用於ActiveMQ5.3以前的版本。默認配置以下:數據庫

<persistenceAdapter>
   <amqPersistenceAdapter directory="activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

屬性以下:apache

屬性名稱緩存

默認值服務器

描述session

directory併發

activemq-data

消息文件和日誌的存儲目錄

useNIO

true

使用NIO協議存儲消息

syncOnWrite

false

同步寫到磁盤,這個選項對性能影響很是大

maxFileLength

32Mb

一個消息文件的大小

persistentIndex

true

消息索引的持久化,若是爲false,那麼索引保存在內存中

maxCheckpointMessageAddSize

4kb

一個事務容許的最大消息量

cleanupInterval

30000

清除操做週期,單位ms

indexBinSize

1024

索引文件緩存頁面數,缺省爲1024,當amq擴充或者縮減存儲時,會鎖定整個broker,致使必定時間的阻塞,因此這個值應該調整到比較大,可是代碼中實現會動態伸縮,調整效果並不理想。

indexKeySize

96

索引key的大小,key是消息ID

indexPageSize

16kb

索引的頁大小

directoryArchive

archive

存儲被歸檔的消息文件目錄

archiveDataLogs

false

當爲true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除                    

二、KahaDB

KahaDB是基於文件的本地數據庫儲存形式,雖然沒有AMQ的速度快,可是它具備強擴展性,恢復的時間比AMQ短,從5.4版本以後KahaDB作爲默認的持久化方式。默認配置以下:

KahaDB的屬性以下:

屬性名稱

默認值

描述

directory

activemq-data

消息文件和日誌的存儲目錄

indexWriteBatchSize

1000

一批索引的大小,當要更新的索引量到達這個值時,更新到消息文件中

indexCacheSize

10000

內存中,索引的頁大小

enableIndexWriteAsync

false

索引是否異步寫到消息文件中

journalMaxFileLength

32mb

一個消息文件的大小

enableJournalDiskSyncs

true

是否講非事務的消息同步寫入到磁盤

cleanupInterval

30000

清除操做週期,單位ms

checkpointInterval

5000

索引寫入到消息文件的週期,單位ms

ignoreMissingJournalfiles

false

忽略丟失的消息文件,false,當丟失了消息文件,啓動異常

checkForCorruptJournalFiles

false

檢查消息文件是否損壞,true,檢查發現損壞會嘗試修復

checksumJournalFiles

false

產生一個checksum,以便可以檢測journal文件是否損壞。

5.4版本以後有效的屬性:

   

archiveDataLogs

false

當爲true時,歸檔的消息文件被移到directoryArchive,而不是直接刪除

directoryArchive

null

存儲被歸檔的消息文件目錄

databaseLockedWaitDelay

10000

在使用負載時,等待得到文件鎖的延遲時間,單位ms

maxAsyncJobs

10000

同個生產者產生等待寫入的異步消息最大量

concurrentStoreAndDispatchTopics

false

當寫入消息的時候,是否轉發主題消息

concurrentStoreAndDispatchQueues

true

當寫入消息的時候,是否轉發隊列消息

5.6版本以後有效的屬性:

   

archiveCorruptedIndex

false

是否歸檔錯誤的索引

每一個KahaDB的實例均可以配置單獨的適配器,若是沒有目標隊列提交給filteredKahaDB,那麼意味着對全部的隊列有效。若是一個隊列沒有對應的適配器,那麼將會拋出一個異常。配置以下:

<persistenceAdapter>
  <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- match all queues -->
      <filteredKahaDBqueue=">">
        <persistenceAdapter>
          <kahaDBjournalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
       
      <!-- match all destinations -->
      <filteredKahaDB>
        <persistenceAdapter>
          <kahaDBenableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

若是filteredKahaDB的perDestination屬性設置爲true,那麼匹配的目標隊列將會獲得本身對應的KahaDB實例。配置以下:

<persistenceAdapter>
  <mKahaDBdirectory="${activemq.base}/data/kahadb">
    <filteredPersistenceAdapters>
      <!-- kahaDB per destinations -->
      <filteredKahaDB perDestination="true">
        <persistenceAdapter>
          <kahaDBjournalMaxFileLength="32mb" />
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
</persistenceAdapter>

三、JDBC

能夠將消息存儲到數據庫中,例如:Mysql、SQL Server、Oracle、DB2。

配置JDBC適配器:

<persistenceAdapter>
    <jdbcPersistenceAdapterdataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>

dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。

Mysql持久化bean:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
SQL Server持久化bean:
<bean id="mssql-ds" class="net.sourceforge.jtds.jdbcx.JtdsDataSource" destroy-method="close">
   <property name="serverName" value="SERVERNAME"/>
   <property name="portNumber" value="PORTNUMBER"/>
   <property name="databaseName" value="DATABASENAME"/>
   <property name="user" value="USER"/>
   <property name="password" value="PASSWORD"/>
</bean>
Oracle持久化bean:
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@10.53.132.47:1521:activemq"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
DB2持久化bean:
<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">
      <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>
      <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>
      <property name="username" value="activemq"/>
      <property name="password" value="activemq"/>
      <property name="maxActive" value="200"/>
      <property name="poolPreparedStatements" value="true"/>
  </bean>

四、LevelDB

這種文件系統是從ActiveMQ5.8以後引進的,它和KahaDB很是類似,也是基於文件的本地數據庫儲存形式,可是它提供比KahaDB更快的持久性。與KahaDB不一樣的是,它不是使用傳統的B-樹來實現對日誌數據的提早寫,而是使用基於索引的LevelDB。

默認配置以下:

< persistenceAdapter >
       < levelDBdirectory = "activemq-data" />
</ persistenceAdapter >

 

屬性名稱

默認值

描述

directory

"LevelDB"

數據文件的存儲目錄

readThreads

10

系統容許的併發讀線程數量

sync

true

同步寫到磁盤

logSize

104857600 (100 MB)

日誌文件大小的最大值

logWriteBufferSize

4194304 (4 MB)

日誌數據寫入文件系統的最大緩存值

verifyChecksums

false

是否對從文件系統中讀取的數據進行校驗

paranoidChecks

false

儘快對系統內部發生的存儲錯誤進行標記

indexFactory

org.fusesource.leveldbjni.JniDBFactory, org.iq80.leveldb.impl.Iq80DBFactory

在建立LevelDB索引時使用

indexMaxOpenFiles

1000

可供索引使用的打開文件的數量

indexBlockRestartInterval

16

Number keys between restart points for delta encoding of keys.

indexWriteBufferSize

6291456 (6 MB)

內存中索引數據的最大值

indexBlockSize

4096 (4 K)

每一個數據塊的索引數據大小

indexCacheSize

268435456 (256 MB)

使用緩存索引塊容許的最大內存

indexCompression

snappy

適用於索引塊的壓縮類型

logCompression

none

適用於日誌記錄的壓縮類型

屬性以下:

五、  下面詳細介紹一下如何將消息持久化到Mysql數據庫中

Ø        須要將mysql的驅動包放置到ActiveMQ的lib目錄下

Ø        修改activeMQ的配置文件:

<persistenceAdapter>
            <!--<kahaDB directory="${activemq.base}/data/kahadb"/>-->
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds" useDatabaseLock="false" createTablesOnStartup="false"/>
        </persistenceAdapter>

在配置文件中的broker節點外增長:

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method ="close">
       <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
       <property name="url" value="jdbc:mysql://192.168.9.65:3306/activemq?relaxAutoCommit=true"/>
       <property name="username" value="root"/>
       <property name="password" value="12345678"/>
       <property name="maxActive" value="20"/>
       <property name="poolPreparedStatements" value="true"/>
    </bean>

從配置中能夠看出數據庫的名稱是activemq,你須要手動在MySql中增長這個庫。而後從新啓動activeMQ,會發現activemq多了三張表:從配置中能夠看出數據庫的名稱是activemq,須要手動在MySql中創建這個數據庫。

1:activemq_acks

2:activemq_lock

3:activemq_msgs

Ø        點到點類型

Sender類:

package com.dxz.activemq.ex2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    private static final int SEND_NUMBER = 2000;

    public static void main(String[] args) {
        // ConnectionFactory :鏈接工廠,JMS用它建立鏈接
        ConnectionFactory connectionFactory;
        // Connection :JMS客戶端到JMS Provider的鏈接
        Connection connection = null;
        // Session:一個發送或接收消息的線程
        Session session;
        // Destination :消息的目的地;消息發送給誰.
        Destination destination;
        // MessageProducer:消息發送者
        MessageProducer producer;
        // TextMessage message;
        // 構造ConnectionFactory實例對象,此處採用ActiveMq的實現
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 構造從工廠獲得鏈接對象
            connection = connectionFactory.createConnection();
            // 啓動
            connection.start();
            // 獲取操做鏈接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 獲取session,FirstQueue是一個服務器的queue
            destination = session.createQueue("FirstQueue");
            // 獲得消息生成者【發送者】
            producer = session.createProducer(destination);
            // 設置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 構造消息
            sendMessage(session, producer);
            // session.commit();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (null != connection) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generatedcatch block
                    e.printStackTrace();
                }
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ發送消息" + i);
            System.out.println("發送消息:ActiveMQ發送的消息" + i);
            producer.send(message);
        }
    }
}
View Code

Receiver類:

package com.dxz.activemq.ex2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :鏈接工廠,JMS用它建立鏈接
        ConnectionFactory connectionFactory;
        // Connection :JMS客戶端到JMS Provider的鏈接
        Connection connection = null;
        // Session:一個發送或接收消息的線程
        Session session;
        // Destination :消息的目的地;消息發送給誰.
        Destination destination;
        // 消費者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
        try {
            // 獲得鏈接對象
            connection = connectionFactory.createConnection();
            // 啓動
            connection.start();
            // 獲取操做鏈接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 建立Queue
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                // 設置接收者接收消息的時間,爲了便於測試,這裏定爲100s
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else
                    break;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}
View Code

測試一:測試:

A、 先運行Sender類,待運行完畢後,運行Receiver類

B、 在此過程當中activemq數據庫的activemq_msgs表中沒有數據

C、 再次運行Receiver,消費不到任何信息

測試二:

A、  先運行Sender類

B、 重啓電腦

C、 運行Receiver類,無任何信息被消費

測試三:

A、   把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改成producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、   先運行Sender類,待運行完畢後,運行Receiver類

C、   在此過程當中activemq數據庫的activemq_msgs表中有數據生成,運行完Receiver類後,數據清除

測試四:

A、    把Sender類中的producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);改成producer.setDeliveryMode(DeliveryMode.PERSISTENT);

B、    運行Sender類

C、    重啓電腦

D、    運行Receiver類,有消息被消費

結論:   

經過以上測試,能夠發現,在P2P類型中當DeliveryMode設置爲NON_PERSISTENCE時,消息被保存在內存中,而當 DeliveryMode設置爲PERSISTENCE時,消息保存在broker的相應的文件或者數據庫中。並且P2P中消息一旦被Consumer消 費就從broker中刪除。

Ø        發佈/訂閱類型

Sender類:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import  javax.jms.Connection;
import  javax.jms.ConnectionFactory;
import  javax.jms.DeliveryMode;
import  javax.jms.Destination;
import  javax.jms.JMSException;
import  javax.jms.MessageProducer;
import  javax.jms.Session;
import  javax.jms.TextMessage;
import  javax.jms.Topic;
import  org.apache.activemq.ActiveMQConnection;
import  org.apache.activemq.ActiveMQConnectionFactory;
public  class  Sender {
      private  static  final  int  SEND_NUMBER =  100  ;
      public  static  void  main(String[] args) {
         // ConnectionFactory :鏈接工廠,JMS用它建立鏈接
         ConnectionFactory connectionFactory;
         // Connection :JMS客戶端到JMS Provider的鏈接
         Connection connection =  null  ;
          // Session:一個發送或接收消息的線程
         Session session;
         // MessageProducer:消息發送者
         MessageProducer producer;
          // TextMessage message;
          // 構造ConnectionFactory實例對象,此處採用ActiveMq的實現
         connectionFactory =  new  ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                 ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616"  );
         try  {
             //獲得鏈接對象
             connection = connectionFactory.createConnection();
             //啓動
             connection.start();
             //獲取操做鏈接
             session = connection.createSession(  false  , Session.AUTO_ACKNOWLEDGE);       
             Topic topic = session.createTopic(  "MQ_test"  );      
             // 獲得消息生成者【發送者】
             producer = session.createProducer(topic);
             //設置持久化
             producer.setDeliveryMode(DeliveryMode.PERSISTENT);
             //構造消息
             sendMessage(session, producer);
             //session.commit();
             connection.close();
         }
         catch  (Exception e){
             e.printStackTrace();
         }  finally  {
             if  (  null  != connection){
                try  {
                    connection.close();
                catch  (JMSException e) {
                    // TODO Auto-generatedcatch block
                    e.printStackTrace();
                }
             }   
         }
      }
      public  static  void  sendMessage(Session session, MessageProducer producer)  throws  Exception{
         for  (  int  i=  1  ; i<=SEND_NUMBER; i++){
             TextMessage message = session.createTextMessage(  "ActiveMQ發送消息"  +i);
             System.out.println(  "發送消息:ActiveMQ發送的消息"  +i);
             producer.send(message);
         }
      }
}

Receiver類:

?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import  javax.jms.Connection;
import  javax.jms.ConnectionFactory;
import  javax.jms.Destination;
import  javax.jms.MessageConsumer;
import  javax.jms.Session;
import  javax.jms.TextMessage;
import  javax.jms.Topic;
  
import  org.apache.activemq.ActiveMQConnection;
import  org.apache.activemq.ActiveMQConnectionFactory;
public  class  Receiver {
      public  static  void  main(String[] args) {
         // ConnectionFactory :鏈接工廠,JMS用它建立鏈接
          ConnectionFactory connectionFactory;
          // Connection :JMS客戶端到JMS Provider的鏈接
          Connection connection =  null  ;
          // Session:一個發送或接收消息的線程
          Session session;
          // 消費者,消息接收者
          MessageConsumer consumer;
          connectionFactory = newActiveMQConnectionFactory(
                 ActiveMQConnection.DEFAULT_USER,
                  ActiveMQConnection.DEFAULT_PASSWORD,
                  "tcp://localhost:61616"  );
          try  {
              // 構造從工廠獲得鏈接對象
              connection =connectionFactory.createConnection();
             
              connection.setClientID(  "clientID001"  );
              // 啓動
              connection.start();
              // 獲取操做鏈接
              session = connection.createSession(  false  ,
                      Session.AUTO_ACKNOWLEDGE);
              // 獲取session
             Topic topic = session.createTopic(  "MQ_test"  );      
             // 獲得消息生成者【發送者】
             consumer = session.createDurableSubscriber(topic,  "MQ_sub"  );
            
              while  (  true  ){
                //設置接收者接收消息的時間,爲了便於測試,這裏誰定爲100s
                TextMessagemessage = (TextMessage)consumer.receive(  100000  );
                if  (  null  != message){
                   System.out.println(  "收到消息"  +message.getText());
                }  else  break  ;
              }
          }  catch  (Exception e){
          e.printStackTrace();
          }  finally  {
              try  {
                  if  (  null  != connection)
                      connection.close();
              catch  (Throwable ignore) {
              }
          }
      }
  
}

測試:

測試一:

A、先啓動Sender類

B、再啓動Receiver類

C、結果無任何記錄被訂閱

測試二:

A、先啓動Receiver類,讓Receiver在相關主題上進行訂閱

B、中止Receiver類,再啓動Sender類

C、待Sender類運行完成後,再啓動Receiver類

D、結果發現相應主題的信息被訂閱

相關文章
相關標籤/搜索