確認的機制是:消費者從目的地接收Message以後,發送接收確認給JMS Provider,JMS Provider收到確認後,將從目的地中刪除該消息,發送處理確認給消費者。
- AUTO_ACKNOWLEDGE
自動確認:會話對收到的每條Message,自動發送接收確認;而後會話線程阻塞,直到收到了JMS Provider發來的處理確認。
接收確認的發送時機:同步接收中,調用receive()方法成功返回;異步接收中,MessageListener的onMessage(Message)方法被調用,併成功返回。
- CLIENT_ACKNOWLEDGE
客戶端確認:客戶端調用Message#acknowledge()發送接收確認[1];而後會話線程阻塞,直到收到了JMS Provider發來的處理確認。
接收確認的發送時機:顯示調用Message#acknowledge()方法。
注:
[1] 每次確認不是隻對當前的Message進行確認,而是對自上次確認以來的全部Message進行確認.
- DUPS_OK_ACKNOWLEDGE
消息可重複確認:這個機制行爲上表現的和AUTO_ACKNOWLEDGE同樣,具備延遲確認的特色。這個機制可能會致使收到重複的消息。
接收確認的發送時機:測試中,PTP Mode每一條消息都會即時確認;Pub/Sub Mode在接收的消息數,每超過prefetch size閥值[1]一半的時候確認一次。
注:
[1] 在brokerURL中能夠指定參數jms.prefetchPolicy.topicPrefetch做爲prefetch size閥值。你可能會猜想:應該也有參數jms.prefetchPolicy.queuePrefetch,可讓DUPS_OK_ACKNOWLEDGE在PTP Mode中也批量確認。可是實驗的結果是每一條消息都會即時確認,就好像DUPS_OK_ACKNOWLEDGE只是針對Pub/Sub設定的。
1.3.確認機制的設置
javax.jms.Session針對三種確認機制,分別定義了整型常量:
- AUTO_ACKNOWLEDGE - 自動確認
- CLIENT_ACKNOWLEDGE - 客戶端確認
- DUPS_OK_ACKNOWLEDGE - 可重複確認
javax.jms.Connection提供了設置的方法:
- createSession(boolean transacted, int acknowledgeMode):Session
transacted - 設置事務,後面纔會談到事務,這裏設置false就行了
acknowledgeMode - 設置確認機制
確認機制的設置是對於有接收須要的Session來說的,若是客戶端是純粹的Producer,確認機制的設置應該是被忽略的。鑑於JMS API只提供了上面一個方法來獲取Session實例,建議在Producer-Client端獲取Session時,使用Connection#createSession(false, Session.AUTO_ACKNOWLEDGE)。對於純粹的消費者,或者同時做爲生產者和消費者,要針對消費的須要來考慮選擇哪個確認機制。
1.4.確認機制的獲取
javax.jms.Session提供獲取的方法:
- getAcknowledgeMode():int
返回結果有4種,除了上述的3種確認機制以外,還有一個Session.SESSION_TRANSACTED表示Session啓用了事務。
1.5.測試
2.Transaction - 事務
事務用於將多個操做組成一個原子操做,要麼所有成功,要麼所有失敗。與事務相關的兩個指令:提交、回滾。當確認全部的操做都是正確的時候,執行提交,這些操做就會生效;當其中有一個操做失敗的時候,執行回滾,全部已執行的操做就會撤銷。
事務的原理:類比於數據庫中的事務,原理相同。事務都是用於對於多個修改性質的操做進行綁定,以這些個修改操做要麼所有成功,要麼所有失敗的保障,來確保數據的一致性。數據庫中的事務是對修改數據的行爲進行綁定,JMS中的事務是對修改目的地中消息的行爲(發送消息、接收消息)進行綁定。
若是你對JDBC中的事務有所瞭解,對比其和JMS中的事務,興許能幫助你記憶JMS中的事務:
對比內容\事務 |
JDBC事務 |
JMS事務 |
範圍 |
java.sql.Connection實例的生命週期以內 |
javax.jms.Session實例的生命週期以內 |
設置和獲取 |
java.sql.Connection:
- setAutoCommit(false)
設置事務
- getAutoCommit():boolean
獲取事務
|
javax.jms.Connection:
- createSession(true, *)
設置事務。 第二個參數能夠設置確認模式,能夠是任意值:啓用了事務就會忽略確認模式。 不過建議使用Session.SESSION_TRANSACTED
javax.jms.Session:
- getTransacted():boolean
獲取事務
|
操做 |
java.sql.Connection:
- commit():void
提交
- rollback():void
回滾
|
javax.jms.Session:
- commit():void
提交
- rollback():void
回滾
|
事務的設置和確認機制的設置使用同一個方法,那麼有必要把確認機制和事務進行對比:
對比內容\機制 |
確認機制 |
事務機制 |
本質 |
確保消費者可以獲取到消息,在收到消費者的確認以後,纔會將消息從目的地移除。 |
將多個操做綁在一塊兒,成爲原子操做,從而有相同的操做結果(成功或失敗)。 |
客戶端編程 |
在Producer端是透明的,只能在Consumer端進行控制。 |
能夠在Producer和Consumer端實施控制。 |
功能性 |
確認消息已收到。 |
事務的commit,提供相似於批量確認的功能。 不要單純爲了批量確認而使用事務,要從業務的須要上考慮使用事務。 |
由於事務機制提供了確認的功能,因此確認機制和事務機制是互斥的。Connection#createSession(boolean transacted, int acknowledgeMode)方法用於設置事務或確認:
- 若是第一個參數爲true,表示啓用事務,第二個參數就會被忽略(建議傳入Session.SESSION_TRANSACTED)
- 若是第一個參數爲false,表示不啓用事務,第二個參數用於設置確認模式,就有效了。
2.1.使用模型
1 try {
2 // ...
3 javax.jms.Connection connection = ...;
4 javax.jms.Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
5
6 // send or receive operates
7
8 session.commit(); // commit at lase
9 } catch(JMSException e) {
10 session.rollback(); // rollback when exception happens
11 } finally {
12 // close resources
13 }
2.2.測試
3.持久化模式
確認模式和事務模式足以
[1]處理髮送階段、接收階段的安全隱患,持久化的機制則用來處理消息在目的地階段的安全隱患。
若是Producer選擇不使用持久化,則消息緩存在內存中,雖然能夠得到高吞吐率,可是一旦JMS Provider宕掉,就會致使消息的丟失。非持久化具備高吞吐量和低可靠性的特色。
若是Producer選擇持久化,則JMS Provider會將消息存到物理媒介上(文件、數據庫),Consumer獲取消息,也是從物理媒介讀取,吞吐量受到影響,可是即便JMS Provider宕掉,消息也不會丟失。持久化具備低吞吐量和高可靠性的特色。
針對持久化,JMS Provider能夠提供多種持久化方案,好比持久化到本地文件、不一樣的數據庫等,不過這些不是Producer須要關心的,Producer只須要告訴JMS Provider要不要持久化消息就行了。
持久化方案的選擇,在%ActiveMQ_HOME%\conf\activemq.xml文件,<persistenceAdapter />元素下配置。能夠參考
ActiveMQ持久化,這裏不介紹這塊內容。
補充:
[1] 實際上,在發送階段須要咱們本身提供容錯方案 - 好比對發送失敗的消息,緩存到本地文件等。
3.1.持久化設置
javax.jms.MessageProducer:
- setDeliveryMode(int):void
- 設置此producer實例的默認遞送模式
- send(Message message):void
- 以默認的deliveryMode發送消息
- send(Message message, int deliveryMode, int priority, long timeToLive):void
- 以指定的deliveryMode發送消息
javax.jms.DeliveryMode
- static int NON_PERSISTENT
非持久化模式
- static int PERSISTENT
持久化模式
設置遞送模式的代碼:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久化
producer.setDelicertMode(DeliveryMode.PERSISTENT); // 持久化
3.2.測試
參考