ActiveMQ - 消息持久化

消息存儲

ActiveMQ有點對點和發佈訂閱兩種方式,這兩種的消息存儲仍是有稍微一點區別。html

點對點

隊列的存儲比較簡單,就是先進先出(FIFO),只有當該消息已被消費和確承認以刪除消息存儲。若是沒有被確認,其餘消費者是不能獲取消息的。
image.png
看看下面的例子:
生產者發送了10條消息:java

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageProducer producer = null;
    boolean useTransaction = false;
    try {
        // 建立一個ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 建立一個Connection
        connection = connectionFactory.createConnection();
        // 啓動消息傳遞的鏈接
        connection.start();
        // 建立一個session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 建立一個destination,把消息發送到test.queue
        destination = session.createQueue("test.persistence");
        // 建立一個生產者
        producer = session.createProducer(destination);
        // 建立消息
        for (int i = 0; i < 10; i++) {
            Message message = session.createTextMessage("this is test.persistence" + i);
            // 發送消息
            producer.send(message);
        }

    } catch (JMSException e) {
        e.printStackTrace();
    } finally {
        try {
            if (producer != null) {
                producer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消費者1消費5條,可是暫時沒確認mysql

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;

    boolean useTransaction = false;
    try {
        // 建立一個ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 建立一個Connection
        connection = connectionFactory.createConnection();
        // 啓動消息傳遞的鏈接
        connection.start();
        // 建立一個session
        session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE);
        // 建立一個destination,把消息發送到test.queue
        destination = session.createQueue("test.persistence");
        // 建立一個消費者
        consumer = session.createConsumer(destination);
        // 接收消息
        for (int i = 0; i < 5; i++) {
            Message message = consumer.receive();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
            if (i == 4) {
                TimeUnit.SECONDS.sleep(10);
                message.acknowledge();
            }
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消費者2此時就阻塞在獲取消息上面,直到消費者1確認後才接收到消息sql

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    Destination destination;
    MessageConsumer consumer = null;

    boolean useTransaction = false;
    try {
        // 建立一個ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 建立一個Connection
        connection = connectionFactory.createConnection();
        // 啓動消息傳遞的鏈接
        connection.start();
        // 建立一個session
        session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE);
        // 建立一個destination,把消息發送到test.queue
        destination = session.createQueue("test.persistence");
        // 建立一個消費者
        consumer = session.createConsumer(destination);
        // 接收消息
        for (int i = 0; i < 5; i++) {
            Message message = consumer.receive();
            System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
            if (i == 4) {
                message.acknowledge();
            }
        }
    } catch (JMSException e) {
        e.printStackTrace();
    }  finally {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

發佈訂閱

在發佈訂閱中,每一個消費者都會獲取到消息的拷貝,爲了節約空間,broker只存儲了一份消息,並存儲了每一個消費者所消費的信息,這樣每一個消費者雖然有不一樣的消費進度,最終仍是能一次獲取到消息。若是消息被全部訂閱者消費完了,broker就能夠刪除這個消息。
image.png數據庫

存儲方式

ActiveMQ提供了多種存儲方式,好比AMQ、KahaDB、JDBC、內存。apache

AMQ

參考官網
AMQ是早期版本的默認持久化存儲方式,基於文件的事務存儲,對於消息的存儲進行了調優,速度仍是很是快的。默認大小32M。當消息被成功使用時,就會被標記爲清理或者存檔,這個操做將在下個清理時發送。基本配置以下(其餘參數詳見官網):segmentfault

<broker persistent="true" xmlns="http://activemq.apache.org/schema/core">
...
<persistenceAdapter>
<amqPersistenceAdapter/>
</persistenceAdapter>
...
</broker>

KahaDB

5.4之後默認的持久化存儲方式,也是基於文件的,與AMQ不一樣的是,KahaDB採用了B-Tree存儲的佈局。擁有高性能和可擴展性等特色。基本配置以下:緩存

<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
...
</broker>

JDBC

JDBC有三個表,兩個表存儲消息,還有一個表當作鎖用,保證只能一個代理訪問broker。配置以下:
先把默認的kahaDB註釋掉,再用下面的替換,注意這個配置是在broker下面。session

<persistenceAdapter>
    <jdbcPersistenceAdapter  dataSource="#mysql-ds"/>
</persistenceAdapter>

而後再增長數據庫配置,注意,這個配置是broker外面,跟其餘bea同級。tcp

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
    destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

在啓動activemq.bat,以前,先建數據庫,而且編碼設置爲latin1,否則ACTIVEMQ_ACKS表會建立失敗致使啓動不了。而後根據報錯信息一次在lib下加入commons-dbcp-1.4.jar、commons-pool-1.5.4.jar、mysql-connector-java-5.1.36.jar包。以下所示,幫咱們建了三個表:
image.png
activemq_acks用於保存持久化訂閱信息。
image.png
activemq_lock,用於broker集羣時的Master選舉。
image.png
activemq_msgs,用於存儲消息信息。
image.png

點對點

代碼參考以前的點對點
演示步驟以下:
一、 啓動生產者,發送消息
二、 查看錶數據以下:
image.png
三、 啓動消費者,消息消費後表數據被刪除

發佈訂閱

代碼參考以前的發佈訂閱
演示步驟以下:
一、 啓動生產者,發送消息。
二、 查看錶數據,並無持久化,因此發佈訂閱默認不持久化的。
三、 啓動消費者,沒有消息被消費。
爲了讓消息能夠被消費者消費,咱們能夠這樣作:
消費者代碼以下

public static void main(String[] args) {
    ConnectionFactory connectionFactory;
    Connection connection = null;
    Session session = null;
    // 這邊的Topic
    Topic destination;
    TopicSubscriber consumer = null;
    Message message;
    boolean useTransaction = false;
    try {
        // 建立一個ConnectionFactory
        connectionFactory = new ActiveMQConnectionFactory();
        // 建立一個Connection
        connection = connectionFactory.createConnection();
        // 設置ClientId
        connection.setClientID("ZhangSan");
        // 啓動消息傳遞的鏈接
        connection.start();
        // 建立一個session
        session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
        // 建立一個destination,把消息發送到test.queue
        destination = session.createTopic("test.topic.mysql");
        // 建立一個消費者,調用createDurableSubscriber方法
        consumer = session.createDurableSubscriber(destination,"my");
        // 接收一個消息
        while (null != (message = consumer.receive())) {
            System.out.println("consumer receive:" + ((TextMessage) message).getText());
        }
    } catch (JMSException e) {
        e.printStackTrace();
    } finally {

    }
}

啓動兩個消費者者,activemq_acks表數據以下,已經有了兩個數據
image.png
activemq的subscribers界面以下,也有兩個數據,若是咱們消費者下線了,就會到Offline Durable Topic Subscribers列表
image.png
修改消費者的代碼後,生產者發送消息的時候,若是消費者在線,就直接消費,若是不在線,上線後還能夠繼續消費,下圖是消費者消費了幾回後,LAST_ACKED_ID變成了4。
image.png
發佈訂閱默認不持久化,因此生產者代碼能夠這樣修改,加下面一句話

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

生產者發送消息後,activemq_msgs表數據以下,與點對點不同的是,這個消息被消費後,不會被刪除。
image.png

日誌類型的JDBC

因爲JDBC的性能相對比較差,因此activemq還提供了日誌類型的jdbc,確保了JMS事務的一致性。由於它整合了很是快的信息寫入與緩存技術,它能夠顯着提升性能。配置以下:

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
        <journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
        </persistenceAdapter>
    </broker>
    <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
        <property name="databaseName" value="derbydb"/>
        <property name="createDatabase" value="create"/>
    </bean>
</beans>

雖然性能比jdbc快,可是他不支持master/slave。

內存

把消息存儲在內存中,因此沒有持久化的功能,所以要保證內存足夠大,來緩存消息。

<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker"
    persistent="false"
    xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            <transportConnector uri="tcp://localhost:61635"/>
        </transportConnectors>
    </broker>
</beans>
相關文章
相關標籤/搜索