ActiveMQ的設置消息時長,事務,確認機制 ,持久化

1.消息事務java

    消息事務是在生產者producer到broker或broker到consumer過程當中同一個session中發生的,保證幾條消息在發送過程當中的原子性。(Broker:消息隊列核心,至關於一個控制中心,負責路由消息、保存訂閱和鏈接、消息確認和控制事務)
    在支持事務的session中,producer發送message時在message中帶有transactionID。broker收到message後判斷是否有transactionID,若是有就把message保存在transaction store中,等待commit或者rollback消息。mysql


消息生產者-異步發送
消息生產者使用持久(persistent)傳遞模式發送消息的時候,Producer.send() 方法會被阻塞,直到 broker 發送一個確認消息給生產者(ProducerAck),這個確認消息暗示broker已經成功接收到消息並把消息保存到二級存儲中。這個過程一般稱爲同步發送。
若是應用程序可以容忍一些消息的丟失,那麼可使用異步發送。異步發送不會在受到 broker 的確認以前一直阻塞 Producer.send 方法。但有一個例外,當發送方法在一個事務上下文中時,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着全部的持久消息都以被寫到二級存儲中。
想要使用異步,在brokerURL中增長 jms.alwaysSyncSend=false&jms.useAsyncSend=true
若是設置了alwaysSyncSend=true系統將會忽略useAsyncSend設置的值都採用同步
     1) 當alwaysSyncSend=false時,「NON_PERSISTENT」(非持久化)、事務中的消息將使用「異步發送」
     2) 當alwaysSyncSend=false時,若是指定了useAsyncSend=true,「PERSISTENT」類型的消息使用異步發送。若是useAsyncSend=false,「PERSISTENT」類型的消息使用同步發送。
總結:默認狀況(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事務內的消息均採用異步發送;對於持久化消息採用同步發送。
 jms.sendTimeout:發送超時時間,默認等於0,若是jms.sendTimeout>0將會忽略(alwaysSyncSend、useAsyncSend、消息是否持久化)全部的消息都是用同步發送!
 即便使用異步發送,也能夠經過producerWindowSize來控制發送端無節制的向broker發送消息
producerWindowSize:窗口尺寸,用來約束在異步發送時producer端容許積壓的(還沒有ACK)的消息的尺寸,且只對異步發送有意義。每次發送消息以後,都將會致使memoryUsage尺寸增長(+message.size),當broker返回producerAck時,若是達到了producerWindowSize上限,即便是異步調用也會被阻塞,防止不停向broker發送消息。
經過jms.producerWindowSize=。。。來設置

2.消息時長,確認機制 
消息消費者-消息確認
一、確認機制(ack_mod)
      AUTO_ACKNOWLEDGE = 1    自動確認
      CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
      DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
      SESSION_TRANSACTED = 0    事務提交併確認
 ACK_MODE描述了Consumer與broker確認消息的方式(時機),好比當消息被Consumer接收以後,Consumer將在什麼時候確認消息。因此ack_mode描述的不是producer於broker之間的關係,而是customer於broker之間的關係。
對於broker而言,只有接收到ACK指令,纔會認爲消息被正確的接收或者處理成功了,經過ACK,能夠在consumer與Broker之間創建一種簡單的「擔保」機制.
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
第一個參數:是否支持事務,若是爲true,則會忽略第二個參數,自動被jms服務器設置爲SESSION_TRANSACTED。sql

發佈主題數據庫

public class TopicPub {
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection = factory.createConnection();
            connection.start();
 
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            /**
             Session javax.jms.Connection.createSession(boolean transacted, int acknowledgeMode) throws JMSException
             1.transacted事務,事務成功commit,纔會將消息發送到mom中
             2.acknowledgeMode消息確認機制
             1)、帶事務的session
             若是session帶有事務,而且事務成功提交,則消息被自動簽收。若是事務回滾,則消息會被再次傳送。
             消息事務是在生產者producer到broker或broker到consumer過程當中同一個session中發生的,
             保證幾條消息在發送過程當中的原子性。
             在支持事務的session中,producer發送message時在message中帶有transactionID。
             broker收到message後判斷是否有transactionID,若是有就把message保存在transaction store中,
             等待commit或者rollback消息。
             2)、不帶事務的session
             不帶事務的session的簽收方式,取決於session的配置。
             Activemq支持一下三種模式:
             Session.AUTO_ACKNOWLEDGE  消息自動簽收
             Session.CLIENT_ACKNOWLEDGE  客戶端調用acknowledge方法手動簽收
             Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重複發送。在第二次從新傳送消息的時候,消息
             頭的JmsDelivered會被置爲true標示當前消息已經傳送過一次,客戶端須要進行消息的重複處理控制。
             代碼示例以下:
             session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
             textMsg.acknowledge();
             */
            Topic topic = session.createTopic("wm5920.topic");
 
            MessageProducer producer = session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//設置非持久化
            //producer.setTimeToLive(5000);//5秒後過時,這個對點對點模式有效
            TextMessage message = session.createTextMessage();
            message.setText("message_" + System.currentTimeMillis());
            producer.send(message);
            System.out.println("Sent message: " + message.getText());
            //帶有事務得commit
            //session.commit();
            session.close();
            connection.stop();
            connection.close();
        }
 
    }

訂閱主題,注:若是在發佈主題前,沒有訂閱,是收不到消息的,這跟點對點的隊列模式不一樣apache

package com.activemq;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
 
 
    public class TopicSubs{
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection = factory.createConnection();
            connection.setClientID("wm5920");
            connection.start();
 
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic("wm5920.topic");
 
            //持久訂閱方式,不會漏掉信息
            TopicSubscriber subs=session.createDurableSubscriber(topic, "wm5920");
            subs.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage tm = (TextMessage) message;
                    try {
                        System.out.println("Received message: " + tm.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
 
            //非持久訂閱方式
//        MessageConsumer consumer = session.createConsumer(topic);
//        consumer.setMessageListener(new MessageListener() {
//            public void onMessage(Message message) {
//                TextMessage tm = (TextMessage) message;
//                try {
//                    System.out.println("Received message: " + tm.getText());
//                } catch (JMSException e) {
//                    e.printStackTrace();
//                }
//            }
//        });
//        session.commit();
//      session.close();
//      connection.stop();
//      connection.close();
        }
    }


1.ActiveMQ的幾種消息持久化機制
爲了不意外宕機之後丟失信息,須要作到重啓後能夠恢復消息隊列,消息系統通常都會採用持久化機制。
ActiveMQ的消息持久化機制有JDBC,AMQ,KahaDB和LevelDB,不管使用哪一種持久化方式,消息的存儲邏輯都是一致的。
就是在發送者將消息發送出去後,消息中心首先將消息存儲到本地數據文件、內存數據庫或者遠程數據庫等,而後試圖將消息發送給接收者,發送成功則將消息從存儲中刪除,失敗則繼續嘗試。
消息中心啓動之後首先要檢查指定的存儲位置,若是有未發送成功的消息,則須要把消息發送出去。
>> JDBC持久化方式
使用JDBC持久化方式,數據庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中。
(1)配置方式
配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml文件,
首先定義一個mysql-ds的MySQL數據源,而後在persistenceAdapter節點中配置jdbcPersistenceAdapter而且引用剛纔定義的數據源。緩存

<persistenceAdapter>
        <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>


dataSource指定持久化數據庫的bean,createTablesOnStartup是否在啓動的時候建立數據表,默認值是true,這樣每次啓動都會去建立數據表了,通常是第一次啓動的時候設置爲true,以後改爲false。
使用MySQL配置JDBC持久化:    
 服務器

<beans>
        <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
            <persistenceAdapter>
                <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
            </persistenceAdapter>
        </broker>
        <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
            <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
            <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
            <property name="username" value="activemq"/>
            <property name="password" value="activemq"/>
            <property name="maxActive" value="200"/>
            <property name="poolPreparedStatements" value="true"/>
        </bean>
    </beans>


(2)數據庫表信息
activemq_msgs用於存儲消息,Queue和Topic都存儲在這個表中:
ID:自增的數據庫主鍵
CONTAINER:消息的Destination
MSGID_PROD:消息發送者客戶端的主鍵
MSG_SEQ:是發送消息的順序,MSGID_PROD+MSG_SEQ能夠組成JMS的MessageID
EXPIRATION:消息的過時時間,存儲的是從1970-01-01到如今的毫秒數
MSG:消息本體的Java序列化對象的二進制數據
PRIORITY:優先級,從0-9,數值越大優先級越高session

activemq_acks用於存儲訂閱關係。若是是持久化Topic,訂閱者和服務器的訂閱關係在這個表保存:
主要的數據庫字段以下:
    CONTAINER:消息的Destination
    SUB_DEST:若是是使用Static集羣,這個字段會有集羣其餘系統的信息
    CLIENT_ID:每一個訂閱者都必須有一個惟一的客戶端ID用以區分
    SUB_NAME:訂閱者名稱
    SELECTOR:選擇器,能夠選擇只消費知足條件的消息。條件能夠用自定義屬性實現,可支持多屬性AND和OR操做
    LAST_ACKED_ID:記錄消費過的消息的ID。異步

    表activemq_lock在集羣環境中才有用,只有一個Broker能夠得到消息,稱爲Master Broker,
    其餘的只能做爲備份等待Master Broker不可用,纔可能成爲下一個Master Broker。
    這個表用於記錄哪一個Broker是當前的Master Broker。tcp

>> AMQ方式

性能高於JDBC,寫入消息時,會將消息寫入日誌文件,因爲是順序追加寫,性能很高。爲了提高性能,建立消息主鍵索引,而且提供緩存機制,進一步提高性能。每一個日誌文件的大小都是有限制的(默認32m,可自行配置)。
當超過這個大小,系統會從新創建一個文件。當全部的消息都消費完成,系統會刪除這個文件或者歸檔(取決於配置)。
主要的缺點是AMQ Message會爲每個Destination建立一個索引,若是使用了大量的Queue,索引文件的大小會佔用不少磁盤空間。
 並且因爲索引巨大,一旦Broker崩潰,重建索引的速度會很是慢。
 配置片斷以下:

<persistenceAdapter>
         <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
    </persistenceAdapter>


雖然AMQ性能略高於下面的Kaha DB方式,可是因爲其重建索引時間過長,並且索引文件佔用磁盤空間過大,因此已經不推薦使用。
>> KahaDB方式
    KahaDB是從ActiveMQ 5.4開始默認的持久化插件,也是咱們項目如今使用的持久化方式。
    KahaDb恢復時間遠遠小於其前身AMQ而且使用更少的數據文件,因此能夠徹底代替AMQ。
    kahaDB的持久化機制一樣是基於日誌文件,索引和緩存。
    配置方式:

<persistenceAdapter>
        <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
    </persistenceAdapter>


    directory : 指定持久化消息的存儲目錄
    journalMaxFileLength : 指定保存消息的日誌文件大小,具體根據你的實際應用配置  

 (1)KahaDB主要特性
    一、日誌形式存儲消息;
    二、消息索引以B-Tree結構存儲,能夠快速更新;
    三、徹底支持JMS事務;
    四、支持多種恢復機制;

 (2)KahaDB的結構
    消息存儲在基於文件的數據日誌中。若是消息發送成功,變標記爲可刪除的。系統會週期性的清除或者歸檔日誌文件。
    消息文件的位置索引存儲在內存中,這樣能快速定位到。按期將內存中的消息索引保存到metadata store中,避免大量消息未發送時,消息索引佔用過多內存空間。
    Data logs:
    Data logs用於存儲消息日誌,消息的所有內容都在Data logs中。
    同AMQ同樣,一個Data logs文件大小超過規定的最大值,會新建一個文件。一樣是文件尾部追加,寫入性能很快。
    每一個消息在Data logs中有計數引用,因此當一個文件裏全部的消息都不須要了,系統會自動刪除文件或放入歸檔文件夾。

    Metadata cache :
    緩存用於存放在線消費者的消息。若是消費者已經快速的消費完成,那麼這些消息就不須要再寫入磁盤了。
    Btree索引會根據MessageID建立索引,用於快速的查找消息。這個索引一樣維護持久化訂閱者與Destination的關係,以及每一個消費者消費消息的指針。

    Metadata store 
    在db.data文件中保存消息日誌中消息的元數據,也是以B-Tree結構存儲的,定時從Metadata cache更新數據。Metadata store中也會備份一些在消息日誌中存在的信息,這樣可讓Broker實例快速啓動。
    即使metadata store文件被破壞或者誤刪除了。broker能夠讀取Data logs恢復過來,只是速度會相對較慢些。
    >>LevelDB方式

    從ActiveMQ 5.6版本以後,又推出了LevelDB的持久化引擎。     目前默認的持久化方式仍然是KahaDB,不過LevelDB持久化性能高於KahaDB,多是之後的趨勢。     在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的數據複製方式,用於Master-slave方式的首選數據複製方案。  

相關文章
相關標籤/搜索