消息隊列-推/拉模式學習 & ActiveMQ及JMS學習

消息中間件的主要功能是消息的路由(Routing)緩存(Buffering)。在AMQP中提供相似功能的兩種域模型:Exchange 和 Message queue。html

AMQP的更多內容能夠看這裏: http://www.cnblogs.com/charlesblc/p/6058799.htmljava

 

一種分類是推和拉 。apache

還有一種分類是 Queue 和 Pub/Sub 。api

 

先看的這一篇:http://blog.csdn.net/heyutao007/article/details/50131089緩存

先講了JMS和遵照JMS的ActiveMQ。Java Message Service,JMS,指的是面向消息中間件(MOM),用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。安全

AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件(MOM)系統,例如發佈/訂閱隊列,沒有做爲基本元素實現。反而經過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一部分,造成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如以前提到的發佈/訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由。

 

 

JMS中定義了兩種消息模型:點對點(point to point, queue)和發佈/訂閱(publish/subscribe,topic)。主要區別就是是否能重複消費。服務器

 

點對點:Queue,不可重複消費

消息生產者生產消息發送到queue中,而後消息消費者從queue中取出而且消費消息。
消息被消費之後,queue中再也不有存儲,因此消息消費者不可能消費到已經被消費的消息。
Queue支持存在多個消費者,可是對一個消息而言,只會有一個消費者能夠消費。
注:Kafka不遵照JMS協議,因此Kafka實際應用中,極可能會須要ack,而後多個消費者可以會同時消費。。須要具體看。

發佈/訂閱:Topic,能夠重複消費

消息生產者(發佈)將消息發佈到topic中,同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不一樣,發佈到topic的消息會被全部訂閱者消費。

支持訂閱組的發佈訂閱模式:session

發佈訂閱模式下,當發佈者消息量很大時,顯然單個訂閱者的處理能力是不足的。
實際上現實場景中是多個訂閱者節點組成一個訂閱組負載均衡消費topic消息即分組訂閱,這樣訂閱者很容易實現消費能力線性擴展。

 

 

注:queue和topic在ActiveMQ裏面的實現和對比,能夠參考:《ActiveMQ的queue以及topic兩種消息處理機制分析架構

有完整queue和topic對比的代碼能夠看這裏:http://blog.csdn.net/zmx729618/article/details/51082844負載均衡

能夠看出區別 topic 是 session.createTopic("FirstTopic"); 而queue是 createQueue.

 

流行模型比較

 傳統企業型消息隊列ActiveMQ遵循了JMS規範,實現了點對點和發佈訂閱模型,但其餘流行的消息隊列RabbitMQ、Kafka並無遵循JMS規範。

3.一、RabbitMQ

RabbitMQ實現了AMQP協議,AMQP協議定義了消息路由規則和方式

(更多AMQP內容,看這裏:http://www.cnblogs.com/charlesblc/p/6058799.html

生產端經過路由規則發送消息到不一樣queue,消費端根據queue名稱消費消息。

RabbitMQ既支持內存隊列也支持持久化隊列,消費端爲推模型消費狀態和訂閱關係由服務端負責維護消息消費完後當即刪除,不保留歷史消息。

(1)點對點

生產端發送一條消息經過路由投遞到Queue,只有一個消費者能消費到。 

(2)多訂閱

當RabbitMQ須要支持多訂閱時,發佈者發送的消息經過路由同時寫到多個Queue,不一樣訂閱組消費不一樣的Queue。
因此支持多訂閱時,消息會多個拷貝。

3.二、Kafka

Kafka只支持消息持久化,消費端爲拉模型消費狀態和訂閱關係由客戶端端負責維護,消息消費完後不會當即刪除,會保留歷史消息

所以支持多訂閱時,消息只會存儲一份就能夠了。可是可能產生重複消費的狀況

(1)點對點&多訂閱(由於不刪消息,因此這兩種就不區分了) 

發佈者生產一條消息到topic中,不一樣訂閱組消費此消息。

 

上面是三種最流行MQ的比較(ActiveMQ, RabbitMQ, Kafka,沒有涉及C++的zeorq)。

下面這篇文章針對ActiveMQ的推拉模型進行介紹。

http://www.cnblogs.com/hapjin/p/5683648.html

 

對於消費者而言有兩種方式從消息中間件獲取消息:

①Push方式:由消息中間件主動地將消息推送給消費者;
②Pull方式:由消費者主動向消息中間件拉取消息。

看一段官網對Push方式的解釋:

To be able to achieve high performance it is important to stream messages to consumers as fast as possible 
so that the consumer always has a buffer of messages, in RAM, ready to process 
- rather than have them explicitly pull messages from the server which adds significant latency per message.

比較:

採用Push方式,能夠儘量快地將消息發送給消費者(stream messages to consumers as fast as possible)

而採用Pull方式,會增長消息的延遲,即消息到達消費者的時間有點長(adds significant latency per message)。

可是,Push方式會有一個壞處

若是消費者的處理消息的能力很弱(一條消息須要很長的時間處理),而消息中間件不斷地向消費者Push消息,消費者的緩衝區可能會溢出。

ActiveMQ是怎麼解決這個問題的呢?那就是  prefetch limit

prefetch limit 規定了一次能夠向消費者Push(推送)多少條消息。

Once the prefetch limit is reached, no more messages are dispatched to the consumer 
until the consumer starts sending back acknowledgements of messages (to indicate that the message has been processed)
當推送消息的數量到達了perfetch limit規定的數值時,消費者尚未向消息中間件返回ACK,消息中間件將再也不繼續向消費者推送消息。

prefetch limit設置的大小根據場景而定:

那prefetch limit的值設置爲多少合適?視具體的應用場景而定。

If you have very few messages and each message takes a very long time to process 
you might want to set the prefetch value to 1 so that a consumer is given one message at a time. 
若是消息的數量不多(生產者生產消息的速率不快),可是每條消息 消費者須要很長的時間處理,那麼prefetch limit設置爲1比較合適。
這樣,消費者每次只會收到一條消息,當它處理完這條消息以後,向消息中間件發送ACK,此時消息中間件再向消費者推送下一條消息。

prefetch limit 設置成0意味着什麼?意味着變成 拉pull模式。

Specifying a prefetch limit of zero means the consumer will poll for more messages, one at a time, 
instead of the message being pushed to the consumer.
意味着此時,消費者去輪詢消息中間件獲取消息。再也不是Push方式了,而是Pull方式了。即消費者主動去消息中間件拉取消息。

 

prefetch Limit>0即爲prefetch,=0爲Pull,看起來沒有不prefetch的push,push都要設置prefetch。

 

另外,對於prefetch模式(,那麼消費須要進行響應ACK。由於服務器須要知道consumer消費的狀況。

perfetch limit是「消息預取」的值,這是針對消息中間件如何向消費者發消息 而設置的。
與之相關的還有針對 消費者以何種方式向消息中間件返回確認ACK(響應):
好比消費者是每次消費一條消息以後就向消息中間件確認呢?仍是採用「延遲確認」---即採用批量確認的方式(消費了若干條消息以後,統一再發ACK)。

這就是 Optimized Acknowledge

引用 一段話

若是prefetchACK爲true,那麼prefetch必須大於0;當prefetchACK爲false時,你能夠指定prefetch爲0以及任意大小的正數。
不過,當prefetch=0是,表示consumer將使用PULL(拉取)的方式從broker端獲取消息,
broker端將不會主動push消息給client端,直到client端發送PullCommand時; 當prefetch>0時,就開啓了broker push模式,此後只要當client端消費且ACK了必定的消息以後,會當即push給client端多條消息。

 

在程序中如何採用Push方式或者Pull方式呢?

從是否阻塞來看,消費者有兩種方式獲取消息。同步方式和異步方式。

同步方式使用的是ActiveMQMessageConsumer的receive()方法。而異步方式則是採用消費者實現MessageListener接口,監聽消息。

同步方式:

使用同步方式receive()方法獲取消息時,prefetch limit便可以設置爲0,也能夠設置爲大於0

prefetch limit爲零 意味着:
「receive()方法將會首先發送一個PULL指令並阻塞,直到broker端返回消息爲止,這也意味着消息只能逐個獲取(相似於Request
<->Response)」 prefetch limit 大於零 意味着:
「broker端將會批量push給client必定數量的消息(
<= prefetch),client端會把這些消息(unconsumed Message)放入到本地的隊列中,
只要此隊列有消息,那麼receive方法將會當即返回(並消費),

當必定量的消息ACK以後,broker端會繼續批量push消息給client端。」

異步方式:

當使用MessageListener異步獲取消息時,prefetch limit必須大於零了。
由於,prefetch limit 等於零 意味着消息中間件不會主動給消費者Push消息,而此時消費者又用MessageListener被動獲取消息(不會主動去輪詢消息)。
這兩者是矛盾的。

此外,還有一個要注意的地方,即消費者採用同步獲取消息(receive方法) 與 異步獲取消息的方法(MessageListener) ,對消息的確認時機是不一樣的。

這裏提到了這篇文章:http://shift-alt-ctrl.iteye.com/blog/2020182 文章名《ActiveMQ消息傳送機制以及ACK機制詳解

 

ActiveMQ消息傳送機制

Producer客戶端使用來發送消息的, Consumer客戶端用來消費消息;
它們的協同中心就是ActiveMQ broker,broker也是讓producer和consumer調用過程解耦的工具,最終實現了異步RPC/數據交換的功能。

隨着ActiveMQ的不斷髮展,支持了愈來愈多的特性,也解決開發者在各類場景下使用ActiveMQ的需求。
好比producer支持異步調用;
使用flow control機制讓broker協同consumer的消費速率;
consumer端可使用prefetchACK來最大化消息消費的速率;
提供"重發策略"等來提升消息的安全性等。

一條消息的生命週期以下:

 

一條消息從producer端發出以後,一旦被broker正確保存,那麼它將會被consumer消費,而後ACK,broker端纔會刪除
不過當消息過時或者存儲設備溢出時,也會終結它。

 

上面的圖裏面寫的很清晰。

上半部分是producer的流程,下半部分consumer的流程分爲兩塊,同步的consumer.receive和異步的MessageListener。從圖中能夠看出異步的MessageLister也是一條一條處理的,由delivered隊列控制的

這張圖片中簡單的描述了:1)producer端如何發送消息 2) consumer端如何消費消息 3) broker端如何調度。
若是用文字來描述圖示中的概念,恐怕一言難盡。
圖示中,說起到prefetchAck,以及消息同步、異步發送的基本邏輯;這對你瞭解下文中的ACK機制將有很大的幫助。

 

Prefetch和optimizeACK 

咱們須要在brokerUrl指定optimizeACK選項,在destinationUri中指定prefetchSize(預獲取)選項。

其中brokerUrl參數選項是全局的,即當前factory下全部的connection/session/consumer都會默認使用這些值;
而destinationUri中的選項,只會在使用此destination的consumer實例中有效;
若是同時指定,brokerUrl中的參數選項值將會被覆蓋。
optimizeAck表示是否開啓「優化ACK」,只有在爲true的狀況下,
prefetchSize(下文中將會簡寫成prefetch)以及optimizeAcknowledgeTimeout參數纔會有意義。(prefetch依賴於
optimizeAck?看起來是筆誤)
此處須要注意"optimizeAcknowledgeTimeout"選項只能在brokerUrl中配置。

prefetch值建議在destinationUri中指定,由於在brokerUrl中指定比較繁瑣;
在brokerUrl中,queuePrefetchSize和topicPrefetchSize都須要單獨設定:
"
&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等來逐個指定。 1) 在brokerUrl中增長以下查詢字符串: String brokerUrl = "tcp://localhost:61616?" + "jms.optimizeAcknowledge=true" + "&jms.optimizeAcknowledgeTimeOut=30000" + "&jms.redeliveryPolicy.maximumRedeliveries=6"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); 2) 在destinationUri中,增長以下查詢字符串: String queueName = "test-queue?customer.prefetchSize=100"; Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue(queueName);

關於prefetchAck、同步、異步api(上面講過了,溫習一下):

若是prefetchACK爲true,那麼prefetch必須大於0;
當prefetchACK爲false時,你能夠指定prefetch爲0以及任意大小的正數。
不過,當prefetch=0是,表示consumer將使用PULL(拉取)的方式從broker端獲取消息,
broker端將不會主動push消息給client端,直到client端發送PullCommand時;
當prefetch>0時,就開啓了broker push模式,此後只要當client端消費且ACK了必定的消息以後,會當即push給client端多條消息。 當consumer端使用receive()方法同步獲取消息時,prefetch能夠爲0和任意正值;
當prefetch=0時,那麼receive()方法將會首先發送一個PULL指令並阻塞,
直到broker端返回消息爲止,這也意味着消息只能逐個獲取(相似於Request<->Response),這也是Activemq中PULL消息模式;
當prefetch > 0時,broker端將會批量push給client 必定數量的消息(<= prefetch),client端會把這些消息(unconsumedMessage)放入到本地的隊列中,
只要此隊列有消息,那麼receive方法將會當即返回,當必定量的消息ACK以後,broker端會繼續批量push消息給client端。 當consumer端使用MessageListener異步獲取消息時,這就須要開發設定的prefetch值必須 >=1,即至少爲1;
在異步消費消息模式中,設定prefetch=0,是相悖的,也將得到一個Exception。

重發選項:

咱們還能夠brokerUrl中配置「redelivery」策略,好比當一條消息處理異常時,broker端能夠重發的最大次數;
和下文中提到REDELIVERED_ACK_TYPE互相協同。當消息須要broker端重發時,
consumer會首先在本地的「deliveredMessage隊列」(Consumer已經接收但還未確認的消息隊列)刪除它,
而後向broker發送「REDELIVERED_ACK_TYPE」類型的確認指令,
broker將會把指令中指定的消息從新添加到pendingQueue(亟待發送給consumer的消息隊列)中,直到合適的時機,再次push給client。

consumer消費快慢,決定了架構和設計如何處理:

按照良好的設計準則,
當consumer消費速度很慢時,咱們一般會部署多個consumer客戶端,並使用較小的prefetch,同時關閉optimizeACK,
可讓消息在多個consumer間「負載均衡」(即均勻的發送給每一個consumer);
若是較大的prefetchSize,將會致使broker一次性push給client大量的消息,可是這些消息須要好久才能ACK(消息積壓),
並且在client故障時,還會致使這些消息的重發。

其餘情景:

若是consumer端消費速度很快,可是producer端生成消息的速率較慢,並且咱們還部署了多個consumer,
這種場景下,建議開啓optimizeACK,可是須要設置的prefetchSize不能過大
這樣能夠保證每一個consumer都能有"活幹",不然將會出現一個consumer很是忙碌,可是其餘consumer幾乎收不到消息。 若是消息很重要,特別是不肯意接收到」redelivery「的消息,那麼咱們須要將optimizeACK=false,prefetchSize=1

錯誤處理與重發:

既然optimizeACK是」延遲「確認,那麼就引入一種潛在的風險:
在消息被消費以後尚未來得及確認時,client端發生故障,
那麼這些消息就有可能會被從新發送給其餘consumer,那麼這種風險就須要client端可以容忍「重複」消息。

從上面的圖能夠看出,沒有ACK的狀況下,隊列是blocking的。

不管如何設定此值,client持有的消息條數最大爲:prefetch + 「DELIVERED_ACK_TYPE消息條數」(DELIVERED_ACK_TYPE參見下文)

optimizeACK其餘注意:

即便當optimizeACK爲true,也只會當session的ACK模式爲AUTO_ACKNOWLEDGE時纔會生效,即在其餘類型的ACK模式時consumer端仍然不會「延遲確認」,即:
consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()  
 
當consumer.optimizeACK有效時,若是客戶端已經消費但還沒有確認的消息(deliveredMessage)達到prefetch * 0.65,consumer端將會自動進行ACK;
同時若是離上一次ACK的時間間隔,已經超過"optimizeAcknowledgeTimout"毫秒,也會致使自動進行ACK。 此外簡單的補充一下,批量確認消息時,只須要在ACK指令中指明「firstMessageId」和「lastMessageId」便可,即消息區間,
那麼broker端就知道此consumer(根據consumerId識別)須要確認哪些消息。

 

ACK模式與類型介紹

JMS API中約定了Client端可使用四種ACK模式

在javax.jms.Session接口中:
 
AUTO_ACKNOWLEDGE = 1 自動確認 CLIENT_ACKNOWLEDGE = 2 客戶端手動確認 DUPS_OK_ACKNOWLEDGE = 3 自動批量確認 SESSION_TRANSACTED = 0 事務提交併確認

此外AcitveMQ補充了一個自定義的ACK模式: INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認

對於broker而言,只有接收到ACK指令,纔會認爲消息被正確的接收或者處理成功了,經過ACK,能夠在consumer(/producer)與Broker之間創建一種簡單的「擔保」機制. 

Client端指定了ACK模式,可是在Client與broker在交換ACK指令的時候,還須要告知ACK_TYPE,ACK_TYPE表示此確認指令的類型,
不一樣的ACK_TYPE將傳遞着消息的狀態,broker能夠根據不一樣的ACK_TYPE對消息進行不一樣的操做。 好比Consumer消費消息時出現異常,就須要向broker發送ACK指令,ACK_TYPE爲"REDELIVERED_ACK_TYPE",那麼broker就會從新發送此消息。
在JMS API中並無定義ACT_TYPE,由於它一般是一種內部機制,並不會面向開發者。ActiveMQ中定義了以下幾種ACK_TYPE(參看MessageAck類): DELIVERED_ACK_TYPE = 0 消息"已接收",但還沒有處理結束 STANDARD_ACK_TYPE = 2 "標準"類型,一般表示爲消息"處理成功",broker端能夠刪除消息了 POSION_ACK_TYPE = 1 消息"錯誤",一般表示"拋棄"此消息,好比消息重發屢次後,都沒法正確處理時,消息將會被刪除或者DLQ(死信隊列) REDELIVERED_ACK_TYPE = 3 消息需"重發",好比consumer處理消息時拋出了異常,broker稍後會從新發送此消息 INDIVIDUAL_ACK_TYPE = 4 表示只確認"單條消息",不管在任何ACK_MODE下 UNMATCHED_ACK_TYPE = 5 在Topic中,若是一條消息在轉發給「訂閱者」時,發現此消息不符合Selector過濾條件,那麼此消息將 不會轉發給訂閱者,
消息將會被存儲引擎刪除(至關於在Broker上確認了消息)。

ACK的基本流程見下圖:

Consumer消費消息的風格有2種: 同步/異步. 使用consumer.receive()就是同步,使用messageListener就是異步。

在同一個consumer中,咱們不能同時使用這2種風格,好比在使用listener的狀況下,當調用receive()方法將會得到一個Exception。

兩種風格下,消息確認時機有所不一樣。

"同步"僞代碼:

//receive僞代碼---過程  
Message message = sessionMessageQueue.dequeue();  
if(message != null){  
    ack(message);  
}  
return message  
 
同步調用時,在消息從receive方法返回以前,就已經調用了ACK;所以若是Client端沒有處理成功,此消息將丟失(可能重發,與ACK模式有關)。
    

"異步"僞代碼: //基於listener Session session = connection.getSession(consumerId); sessionQueueBuffer.enqueue(message); Runnable runnable = new Ruannale(){ run(){ Consumer consumer = session.getConsumer(consumerId); Message md = sessionQueueBuffer.dequeue(); try{ consumer.messageListener.onMessage(md); ack(md);// }catch(Exception e){ redelivery();//sometime,not all the time; } } //session中將採起線程池的方式,分發異步消息 //所以同一個session中多個consumer能夠並行消費 threadPool.execute(runnable); 基於異步調用時,消息的確認是在onMessage方法返回以後,若是onMessage方法異常,會致使消息不能被ACK,會觸發重發

 

ACK模式詳解

 

AUTO_ACKNOWLEDGE : 

自動確認,這就意味着消息的確認時機將有consumer擇機確認.
"擇機確認"彷佛充滿了不肯定性,這也意味着,開發者必須明確知道"擇機確認"的具體時機,不然將有可能致使消息的丟失,或者消息的重複接收.
那麼在ActiveMQ中,AUTO_ACKNOWLEDGE是如何運做的呢?
1) 對於consumer而言,optimizeAcknowledge屬性只會在AUTO_ACK模式下有效。 2) 其中DUPS_ACKNOWLEGE也是一種潛在的AUTO_ACK,只是確認消息的條數和時間上有所不一樣。 3) 在「同步」(receive)方法返回message以前,會檢測optimizeACK選項是否開啓,若是沒有開啓,此單條消息將當即確認,
因此在這種狀況下,message返回以後,若是開發者在處理message過程當中出現異常,會致使此消息也不會redelivery,即"潛在的消息丟失";
若是開啓了optimizeACK,則會在unAck數量達到prefetch * 0.65時確認,固然咱們能夠指定prefetchSize = 1來實現逐條消息確認。 4) 在"異步"(messageListener)方式中,將會首先調用listener.onMessage(message),此後再ACK,
若是onMessage方法異常,將致使client端補充發送一個ACK_TYPE爲REDELIVERED_ACK_TYPE確認指令;
若是onMessage方法正常,消息將會正常確認(STANDARD_ACK_TYPE)。
此外須要注意,消息的重發次數是有限制的,每條消息中都會包含「redeliveryCounter」計數器,用來表示此消息已經被重發的次數,
若是重發次數達到閥值,將會致使發送一個ACK_TYPE爲POSION_ACK_TYPE確認指令,這就致使broker端認爲此消息沒法消費,
此消息將會被刪除或者遷移到"dead letter"通道中。 所以當咱們使用messageListener方式消費消息時,一般建議在onMessage方法中使用try-catch,這樣能夠在處理消息出錯時記錄一些信息,
而不是讓consumer不斷去重發消息
若是你沒有使用try-catch,就有可能會由於異常而致使消息重複接收的問題,須要注意你的onMessage方法中邏輯是否可以兼容對重複消息的判斷

 

CLIENT_ACKNOWLEDGE : 

客戶端手動確認,這就意味着AcitveMQ將不會「自做主張」的爲你ACK任何消息,開發者須要本身擇機確認。

不管是「同步」/「異步」,ActiveMQ都不會發送STANDARD_ACK_TYPE,直到message.acknowledge()調用。
若是在client端未確認的消息個數達到prefetchSize * 0.5時,會補充發送一個ACK_TYPE爲DELIVERED_ACK_TYPE的確認指令,
這會觸發broker端能夠繼續push消息到client端。

注意防止不ack而hang住:

若是client端由於某種緣由致使acknowledge方法未被執行,將致使大量消息不能被確認,
broker端將不會push消息,事實上client端將處於「假死」狀態,而沒法繼續消費消息。

咱們要求client端在消費1.5*prefetchSize個消息以前,必須acknowledge()一次;
一般咱們老是每消費一個消息調用一次,這是一種良好的設計。

broker依據ack速率進行負載平衡:

在CLIET_ACK模式下,消息在交付給listener以前,都會首先建立一個DELIVERED_ACK_TYPE的ACK指令,
直到client端未確認的消息達到"prefetchSize * 0.5"時纔會發送此ACK指令,如
果在此以前,開發者調用了acknowledge()方法,會致使消息直接被確認(STANDARD_ACK_TYPE)。

broker端一般會認爲「DELIVERED_ACK_TYPE」確認指令是一種「slow consumer」信號,
若是consumer不能及時的對消息進行acknowledge而致使broker端阻塞,那麼此consumer將會被標記爲「slow」,
此後queue中的消息將會轉發給其餘Consumer。

 

DUPS_OK_ACKNOWLEDGE :

"消息可重複"確認,意思是此模式下,可能會出現重複消息,並非一條消息須要發送屢次ACK才行。
它是一種潛在的"AUTO_ACK"確認機制,爲批量確認而生,並且具備「延遲」確認的特色。

對於開發者而言,這種模式下的代碼結構和AUTO_ACKNOWLEDGE同樣,不須要像CLIENT_ACKNOWLEDGE那樣調用acknowledge()方法來確認消息。

發生做用的時機:

1) 在ActiveMQ中,若是在Destination是Queue通道,咱們真的能夠認爲DUPS_OK_ACK就是「AUTO_ACK + optimizeACK + (prefetch > 0)」
這種狀況,在確認時機上幾乎徹底一致;
此外在此模式下,若是prefetchSize =1 或者沒有開啓optimizeACK,也會致使消息逐條確認,從而失去批量確認的特性。 2) 若是Destination爲Topic,DUPS_OK_ACKNOWLEDGE纔會產生JMS規範中詮釋的意義,
即不管optimizeACK是否開啓,都會在消費的消息個數>=prefetch * 0.5時,批量確認(STANDARD_ACK_TYPE),
在此過程當中,不會發送DELIVERED_ACK_TYPE的確認指令,這是DUPS和AUTO_ACK的最大的區別
這也意味着,當consumer故障重啓後,那些還沒有ACK的消息會從新發送過來。

 

SESSION_TRANSACTED :

當session使用事務時,就是使用此模式。

在事務開啓以後,和session.commit()以前,全部消費的消息,要麼所有正常確認,要麼所有redelivery。
這種嚴謹性,一般在基於GROUP(消息分組)或者其餘場景下特別適合。

在SESSION_TRANSACTED模式下,optimizeACK並不能發揮任何效果,由於在此模式下,optimizeACK會被強制設定爲false,
不過prefetch仍然能夠決定DELIVERED_ACK_TYPE的發送時機。 由於Session非線程安全,那麼當前session下全部的consumer都會共享同一個transactionContext;
同時建議,一個事務類型的Session中只有一個Consumer,以免rollback()或者commit()方法被多個consumer調用而形成的消息混亂。

確認過程,以及確認ACK的發送時機:

事務的確認過程當中,首先把本地的deliveredMessage隊列中還沒有確認的消息所有確認(STANDARD_ACK_TYPE)
此後向broker發送transaction提交指令並等待broker反饋,

若是broker端事務操做成功,那麼將會把本地deliveredMessage隊列清空,新的事務開始
若是broker端事務操做失敗(此時broker已經rollback),那麼對於session而言,將執行inner-rollback,
這個rollback所作的事情,就是將當前事務中的消息清空並要求broker重發(REDELIVERED_ACK_TYPE),同時commit方法將拋出異常。

 

INDIVIDUAL_ACKNOWLEDGE :

不多使用,它的確認時機和CLIENT_ACKNOWLEDGE幾乎同樣

當消息消費成功以後,須要調用message.acknowledege來確認此消息(單條)
而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法將致使整個session中全部消息被確認(批量確認)

 

(完)

相關文章
相關標籤/搜索