AMQ學習筆記 - 06. 可靠消息傳送

概述


本文介紹JMS中可能發生消息故障的3個隱患階段,以及確保消息安全的3種保障機制。

故障分析


在介紹可靠傳送的確保機制以前,先分析消息在傳送的過程當中可能在哪一個階段出現問題。

1.兩個躍點

躍點的含義在於消息的持有者發生變化,如發送使消息由Producer持有變成JMS Provider持有。在消息傳送的過程當中,共有2個躍點:
  1. 發送躍點
    Producer將消息發送到JMS Provider的目的地
  2. 接收躍點
    Consumer從JMS Provider的目的地獲取消息

2.三個隱患階段

在消息傳送過程當中,可能在三個不一樣的階段出現問題:
  1. 發送階段
    Producer發送消息到目的地,可能會失敗。
  2. 消息緩存階段
    JMS Provider異常致使目的地中緩存的數據丟失。
  3. 接收階段
    Consumer從目的地接收消息,可能會失敗。

可靠傳送保障機制


爲了解決在隱患階段可能出現的問題,JMS體系定義了幾個保障機制:確認、事務、持久化。

1.Acknowledge - 確認

確認是客戶端(Producer或Consumer)和JMS Provider之間,爲了確保Message的可靠傳送而發送消息進行確認的機制。
這個機制能夠用於解決發送階段、接收階段的安全隱患。

1.1.對於Producer

確認的機制是:生成方調用的send()方法會被阻塞,直到JMS Provider收到了Message、存儲到目的地,併發送確認給生成方,阻塞才被解除。
因此,確認機制對於生成方來講是透明的。

1.2.對於Consumer

確認的機制是:消費者從目的地接收Message以後,發送接收確認給JMS Provider,JMS Provider收到確認後,將從目的地中刪除該消息,發送處理確認給消費者。
對於消費者而言,有幾種確認機制:
  • AUTO_ACKNOWLEDGE
    自動確認:會話對收到的每條Message,自動發送接收確認;而後會話線程阻塞,直到收到了JMS Provider發來的處理確認。
    接收確認的發送時機:同步接收中,調用receive()方法成功返回;異步接收中,MessageListener的onMessage(Message)方法被調用,併成功返回。
  • CLIENT_ACKNOWLEDGE
    客戶端確認:客戶端調用Message#acknowledge()發送接收確認[1];而後會話線程阻塞,直到收到了JMS Provider發來的處理確認。
    接收確認的發送時機:顯示調用Message#acknowledge()方法。
    注:
    [1] 每次確認不是隻對當前的Message進行確認,而是對自上次確認以來的全部Message進行確認.
  • DUPS_OK_ACKNOWLEDGE
    消息可重複確認:這個機制行爲上表現的和AUTO_ACKNOWLEDGE同樣,具備延遲確認的特色。這個機制可能會致使收到重複的消息。
    接收確認的發送時機:測試中,PTP Mode每一條消息都會即時確認;Pub/Sub Mode在接收的消息數,每超過prefetch size閥值[1]一半的時候確認一次。
    注:
    [1] 在brokerURL中能夠指定參數jms.prefetchPolicy.topicPrefetch做爲prefetch size閥值。你可能會猜想:應該也有參數jms.prefetchPolicy.queuePrefetch,可讓DUPS_OK_ACKNOWLEDGE在PTP Mode中也批量確認。可是實驗的結果是每一條消息都會即時確認,就好像DUPS_OK_ACKNOWLEDGE只是針對Pub/Sub設定的。

1.3.確認機制的設置

javax.jms.Session針對三種確認機制,分別定義了整型常量:
  • AUTO_ACKNOWLEDGE - 自動確認
  • CLIENT_ACKNOWLEDGE - 客戶端確認
  • DUPS_OK_ACKNOWLEDGE - 可重複確認
 
javax.jms.Connection提供了設置的方法:
  • createSession(boolean transacted, int acknowledgeMode):Session
    transacted - 設置事務,後面纔會談到事務,這裏設置false就行了
    acknowledgeMode - 設置確認機制
 
確認機制的設置是對於有接收須要的Session來說的,若是客戶端是純粹的Producer,確認機制的設置應該是被忽略的。鑑於JMS API只提供了上面一個方法來獲取Session實例,建議在Producer-Client端獲取Session時,使用Connection#createSession(false, Session.AUTO_ACKNOWLEDGE)。對於純粹的消費者,或者同時做爲生產者和消費者,要針對消費的須要來考慮選擇哪個確認機制。

1.4.確認機制的獲取

javax.jms.Session提供獲取的方法:
  • getAcknowledgeMode():int
    返回結果有4種,除了上述的3種確認機制以外,還有一個Session.SESSION_TRANSACTED表示Session啓用了事務。

1.5.測試

關於確認機制的測試,參考 確認機制的測試

2.Transaction - 事務

事務用於將多個操做組成一個原子操做,要麼所有成功,要麼所有失敗。與事務相關的兩個指令:提交、回滾。當確認全部的操做都是正確的時候,執行提交,這些操做就會生效;當其中有一個操做失敗的時候,執行回滾,全部已執行的操做就會撤銷。
事務的原理:類比於數據庫中的事務,原理相同。事務都是用於對於多個修改性質的操做進行綁定,以這些個修改操做要麼所有成功,要麼所有失敗的保障,來確保數據的一致性。數據庫中的事務是對修改數據的行爲進行綁定,JMS中的事務是對修改目的地中消息的行爲(發送消息、接收消息)進行綁定。
 
若是你對JDBC中的事務有所瞭解,對比其和JMS中的事務,興許能幫助你記憶JMS中的事務:
對比內容\事務 JDBC事務 JMS事務
範圍 java.sql.Connection實例的生命週期以內 javax.jms.Session實例的生命週期以內
設置和獲取 java.sql.Connection:
  • setAutoCommit(false)
    設置事務
  • getAutoCommit():boolean
    獲取事務
javax.jms.Connection:
  • createSession(true, *)
    設置事務。
    第二個參數能夠設置確認模式,能夠是任意值:啓用了事務就會忽略確認模式。
    不過建議使用Session.SESSION_TRANSACTED
javax.jms.Session:
  • getTransacted():boolean
    獲取事務
操做 java.sql.Connection:
  • commit():void
    提交
  • rollback():void
    回滾
javax.jms.Session:
  • commit():void
    提交
  • rollback():void
    回滾
 
事務的設置和確認機制的設置使用同一個方法,那麼有必要把確認機制和事務進行對比:
對比內容\機制 確認機制 事務機制
本質 確保消費者可以獲取到消息,在收到消費者的確認以後,纔會將消息從目的地移除。 將多個操做綁在一塊兒,成爲原子操做,從而有相同的操做結果(成功或失敗)。
客戶端編程 在Producer端是透明的,只能在Consumer端進行控制。 能夠在Producer和Consumer端實施控制。
功能性 確認消息已收到。 事務的commit,提供相似於批量確認的功能。
不要單純爲了批量確認而使用事務,要從業務的須要上考慮使用事務。
由於事務機制提供了確認的功能,因此確認機制和事務機制是互斥的。Connection#createSession(boolean transacted, int acknowledgeMode)方法用於設置事務或確認:
  • 若是第一個參數爲true,表示啓用事務,第二個參數就會被忽略(建議傳入Session.SESSION_TRANSACTED)
  • 若是第一個參數爲false,表示不啓用事務,第二個參數用於設置確認模式,就有效了。

2.1.使用模型

複製代碼
 1 try {
 2     // ...
 3     javax.jms.Connection connection = ...;
 4     javax.jms.Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
 5     
 6     // send or receive operates
 7     
 8     session.commit(); // commit at lase
 9 } catch(JMSException e) {
10     session.rollback(); // rollback when exception happens
11 } finally {
12     // close resources
13 }
複製代碼

 

2.2.測試

關於事務的測試,參考 事務的測試

3.持久化模式

確認模式和事務模式足以 [1]處理髮送階段、接收階段的安全隱患,持久化的機制則用來處理消息在目的地階段的安全隱患。
若是Producer選擇不使用持久化,則消息緩存在內存中,雖然能夠得到高吞吐率,可是一旦JMS Provider宕掉,就會致使消息的丟失。非持久化具備高吞吐量和低可靠性的特色。
若是Producer選擇持久化,則JMS Provider會將消息存到物理媒介上(文件、數據庫),Consumer獲取消息,也是從物理媒介讀取,吞吐量受到影響,可是即便JMS Provider宕掉,消息也不會丟失。持久化具備低吞吐量和高可靠性的特色。
針對持久化,JMS Provider能夠提供多種持久化方案,好比持久化到本地文件、不一樣的數據庫等,不過這些不是Producer須要關心的,Producer只須要告訴JMS Provider要不要持久化消息就行了。
持久化方案的選擇,在%ActiveMQ_HOME%\conf\activemq.xml文件,<persistenceAdapter />元素下配置。能夠參考 ActiveMQ持久化,這裏不介紹這塊內容。
 
補充:
[1] 實際上,在發送階段須要咱們本身提供容錯方案 - 好比對發送失敗的消息,緩存到本地文件等。

3.1.持久化設置

javax.jms.MessageProducer:
  • setDeliveryMode(int):void
    - 設置此producer實例的默認遞送模式
  • send(Message message):void
    - 以默認的deliveryMode發送消息
  • send(Message message, int deliveryMode, int priority, long timeToLive):void
    - 以指定的deliveryMode發送消息
 
javax.jms.DeliveryMode
  • static int NON_PERSISTENT
    非持久化模式
  • static int PERSISTENT
    持久化模式
 
設置遞送模式的代碼:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久化
producer.setDelicertMode(DeliveryMode.PERSISTENT); // 持久化

3.2.測試

參考 持久化的測試

參考


    1. 第 2 章 客戶端編程模型
      本文的骨架是參考這篇文章整理的
    2. ActiveMQ訊息傳送機制以及ACK機制
      這篇文章對ACK的機制講的比較細,我嘗試了optimizeACK + prefetch + AUTO_ACKNOWLEDGE的預取機制,以及DUPS_OK_ACKNOWLEDGE在Pub/Sub模式下的延遲確認機制。
    3. ActiveMQ持久化介紹了幾種常見的持久化方案。
相關文章
相關標籤/搜索