ActiveMQ專題2: 持久化

AMQ的持久化問題

前言

​ 前面一篇AMQ專題中,咱們發現對於Topic這種類型的消息,即便將deliveryMode設置爲持久化,只要生產者在消費者以前啓動。消息生產者發佈的消息仍是會丟失。這是符合JMS規範的。
固然,做爲一個如此活躍的開源消息中間件,在實現JMS基本規範以後,必然會經過擴展的方式來實現Topic的持久化訂閱。
而所謂的deliveryMode持久化和訂閱持久化仍是兩個不一樣的概念。本篇博客咱們就經過實例來一探究竟。java

DeliveryMode持久化

​ 在前面一篇中,咱們經過producer.setDeliveryMode(DeliveryMode.PERSISTENT);將消息傳遞特性置爲持久化,可是當消息類型是topic的時候,無論該值設置爲啥,只要先啓動Producer,那麼對於後啓動的Consumer都沒法獲取原來發布的主題。服務器

​ 那麼這個DeliveryMode到底是用來幹啥的呢?session

  • DeliveryMode中的是否持久化,指的是當重啓activeMQ以後,原來隊列或者主題中未被消費的消息是否仍然保留

​ 我這裏本身經過代碼進行了以下測試,測試步驟和結果以下:ide

  1. 建立producer,並將producer的deliveryMode設置成持久化,運行producer
  2. 在消息被consumer消費以前,重啓activeMQ
  3. 運行consumer,發現接收到了activeMQ重啓以前Producer發送的消息
  4. 修改producer,將producer的deliveryMode設置成非持久化,運行producer
  5. 在消息被consumer消費以前,重啓activeMQ
  6. 運行consumer,沒有接收到任何消息,原producer產生的消息丟失

​ 持久化和非持久化最終隊列控制檯分別以下:測試

至此,不難發現,deliveryMode的是否持久化是針對activeMQ服務器是否重啓而言的。對於不支持持久化的設置,當mq重啓以後,沒有被消費的消息就會丟失。而支持持久化的設置,只要消息沒有被消費,重啓mq,仍然能被新加入的consumer消費。

訂閱持久化

​ JMS的規範是沒有要求實現訂閱持久化的。所幸的是activeMQ實現了這個特性。我的認爲所謂的訂閱持久化相對於消息的持久化,不過是一種僞持久化。先不作太多說明,咱們直接看一個示例代碼:設計

生產者

public class SimpleProducer {
    public static void main(String[] args) {
        // STEP1: 獲得鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
        
        Connection connection = null;
        Session session = null;
        MessageProducer topicProducer = null;
        Destination topicDestination = null;
        try {
            // STEP2: 從鏈接工廠獲得鏈接而且啓動鏈接
            connection = connectionFactory.createConnection();
            connection.start();
            
            // STEP3: 獲取會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // STEP4: 建立主題 
            topicDestination = session.createTopic("KiDe-topic-Demo");
            
            // STEP5: 建立消息生產者
            topicProducer = session.createProducer(topicDestination);
            topicProducer.setDeliveryMode(DeliveryMode.PERSISTENT);     // 設置爲持久化
            
            
            // STEP6: 發送消息
            for (int i=0; i<20; i++) {
                TextMessage message = session.createTextMessage("Producer message:" + i);
                topicProducer.send(message);
            }
            
            // STEP7: 若是開啓了事務 ,此時須要調用session提交操做
            // session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                }
            }
        }
    }
}

消費者

public class SimpleConsumer {
    public static void main(String[] args) {
        // STEP1: 建立鏈接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);

        Connection connection = null;
        Session session = null;
        MessageConsumer topicConsumer = null;
        try {
            // STEP2: 從鏈接工廠獲得鏈接而且啓動鏈接
            connection = connectionFactory.createConnection();
            connection.setClientID("1");        // 若是要進行持久化訂閱,必須對鏈接設置clientID
            connection.start();

            // STEP3: 獲取會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // STEP4: 建立持久化訂閱者
            TopicSubscriber topicSubscriber = session.createDurableSubscriber(session.createTopic("KiDe-topic-Demo"), "1");
                        
            // STEP5: 設置消息接收監聽
            topicSubscriber.setMessageListener(new MessageListener() {
                
                @Override
                public void onMessage(Message paramMessage) {
                    TextMessage message = (TextMessage) paramMessage;
                    try {
                        System.out.println("消費者接收到主題消息:" + message.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            TimeUnit.SECONDS.sleep(200);    // 睡眠200秒,使得客戶端能夠接收到對應消息
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                }
            }
        }
    }
}

​ 最終個人驗證步驟和結果以下:code

  1. 運行producer,向activeMQ發送主題消息
  2. 運行consumer,發現未收到任何消息
  3. 運行producer,此時運行中的consumer接收到了topic消息
  4. 中止運行consumer,從新運行producer
  5. 從新運行consumer,此時consumer接收到了剛剛producer產生的消息
  6. 建立consumer的session的時候,同時建立兩個同clientId的session時會報同一通道已被佔用的錯誤

​ 分析以上步驟,我最終對這種僞持久化訂閱的總結以下:中間件

  • 要實現僞持久化訂閱,必須先向activeMQ發佈持久化訂閱消息,經過clientId來標識不一樣的訂閱渠道。blog

    若是在發佈持久化訂閱消息以前producer就向mq發送了topic消息,那麼consumer仍是無法接收隊列

  • activeMQ肯定是不是同一持久化訂閱者的依據條件有兩個:connection.setClientID("3")中的clientId

    以及session.createDurableSubscriber(session.createTopic("KiDe-topic-Demo"), "12")中的name

總結

  • deliveryMode的持久化和訂閱持久化是兩個不一樣的概念,兩者互不干擾,組合實現業務需求
  • 須要弄清參數的實際意義第一步本身動手寫實例,看運行結果是否與本身預期一致。第二步則是狀況容許的時候,多看源碼,掌握好的代碼和設計
相關文章
相關標籤/搜索