前面一篇AMQ專題中,咱們發現對於Topic這種類型的消息,即便將deliveryMode設置爲持久化,只要生產者在消費者以前啓動。消息生產者發佈的消息仍是會丟失。這是符合JMS規範的。
固然,做爲一個如此活躍的開源消息中間件,在實現JMS基本規範以後,必然會經過擴展的方式來實現Topic的持久化訂閱。
而所謂的deliveryMode持久化和訂閱持久化仍是兩個不一樣的概念。本篇博客咱們就經過實例來一探究竟。java
在前面一篇中,咱們經過producer.setDeliveryMode(DeliveryMode.PERSISTENT);
將消息傳遞特性置爲持久化,可是當消息類型是topic
的時候,無論該值設置爲啥,只要先啓動Producer
,那麼對於後啓動的Consumer
都沒法獲取原來發布的主題。服務器
那麼這個DeliveryMode
到底是用來幹啥的呢?session
- DeliveryMode中的是否持久化,指的是當重啓activeMQ以後,原來隊列或者主題中未被消費的消息是否仍然保留
我這裏本身經過代碼進行了以下測試,測試步驟和結果以下:ide
- 建立producer,並將producer的deliveryMode設置成持久化,運行producer
- 在消息被consumer消費以前,重啓activeMQ
- 運行consumer,發現接收到了activeMQ重啓以前Producer發送的消息
- 修改producer,將producer的deliveryMode設置成非持久化,運行producer
- 在消息被consumer消費以前,重啓activeMQ
- 運行consumer,沒有接收到任何消息,原producer產生的消息丟失
持久化和非持久化最終隊列控制檯分別以下:測試
至此,不難發現,deliveryMode的是否持久化是針對activeMQ服務器是否重啓而言的。對於不支持持久化的設置,當mq重啓以後,沒有被消費的消息就會丟失。而支持持久化的設置,只要消息沒有被消費,重啓mq,仍然能被新加入的consumer消費。
JMS的規範是沒有要求實現訂閱持久化的。所幸的是activeMQ實現了這個特性。我的認爲所謂的訂閱持久化相對於消息的持久化,不過是一種僞持久化。先不作太多說明,咱們直接看一個示例代碼:設計
public class SimpleProducer { public static void main(String[] args) { // STEP1: 獲得鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection = null; Session session = null; MessageProducer topicProducer = null; Destination topicDestination = null; try { // STEP2: 從鏈接工廠獲得鏈接而且啓動鏈接 connection = connectionFactory.createConnection(); connection.start(); // STEP3: 獲取會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // STEP4: 建立主題 topicDestination = session.createTopic("KiDe-topic-Demo"); // STEP5: 建立消息生產者 topicProducer = session.createProducer(topicDestination); topicProducer.setDeliveryMode(DeliveryMode.PERSISTENT); // 設置爲持久化 // STEP6: 發送消息 for (int i=0; i<20; i++) { TextMessage message = session.createTextMessage("Producer message:" + i); topicProducer.send(message); } // STEP7: 若是開啓了事務 ,此時須要調用session提交操做 // session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } } }
public class SimpleConsumer { public static void main(String[] args) { // STEP1: 建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); Connection connection = null; Session session = null; MessageConsumer topicConsumer = null; try { // STEP2: 從鏈接工廠獲得鏈接而且啓動鏈接 connection = connectionFactory.createConnection(); connection.setClientID("1"); // 若是要進行持久化訂閱,必須對鏈接設置clientID connection.start(); // STEP3: 獲取會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // STEP4: 建立持久化訂閱者 TopicSubscriber topicSubscriber = session.createDurableSubscriber(session.createTopic("KiDe-topic-Demo"), "1"); // STEP5: 設置消息接收監聽 topicSubscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message paramMessage) { TextMessage message = (TextMessage) paramMessage; try { System.out.println("消費者接收到主題消息:" + message.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); TimeUnit.SECONDS.sleep(200); // 睡眠200秒,使得客戶端能夠接收到對應消息 } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { } } } } }
最終個人驗證步驟和結果以下:code
- 運行producer,向activeMQ發送主題消息
- 運行consumer,發現未收到任何消息
- 運行producer,此時運行中的consumer接收到了topic消息
- 中止運行consumer,從新運行producer
- 從新運行consumer,此時consumer接收到了剛剛producer產生的消息
- 建立consumer的session的時候,同時建立兩個同clientId的session時會報同一通道已被佔用的錯誤
分析以上步驟,我最終對這種僞持久化訂閱的總結以下:中間件
要實現僞持久化訂閱,必須先向activeMQ發佈持久化訂閱消息,經過clientId來標識不一樣的訂閱渠道。blog
若是在發佈持久化訂閱消息以前producer就向mq發送了topic消息,那麼consumer仍是無法接收隊列
activeMQ肯定是不是同一持久化訂閱者的依據條件有兩個:
connection.setClientID("3")
中的clientId以及
session.createDurableSubscriber(session.createTopic("KiDe-topic-Demo"), "12")
中的name
- deliveryMode的持久化和訂閱持久化是兩個不一樣的概念,兩者互不干擾,組合實現業務需求
- 須要弄清參數的實際意義第一步本身動手寫實例,看運行結果是否與本身預期一致。第二步則是狀況容許的時候,多看源碼,掌握好的代碼和設計