Introduction
有不少人問過我這麼一類問題:RabbitMQ如何確保消息可靠?不少時候,筆者的回答都是:說來話長的事情何來長話短說。的確,要確保消息可靠不僅是單單幾句就可以敘述明白的,包括Kafka也是如此。可靠並非一個絕對的概念,曾經有人也留言說過相似所有磁盤損毀也會致使消息丟失,筆者戲答:還有機房被炸了也會致使消息丟失。可靠性是一個相對的概念,在條件合理的範圍內系統所能確保的多少個9的可靠性。一切儘量的趨於完美而沒法企及於完美。
咱們能夠儘量的確保RabbitMQ的消息可靠。在詳細論述RabbitMQ的消息可靠性以前,咱們先來回顧下消息在RabbitMQ中的經由之路。 數據庫
如圖所示,從AMQP協議層面上來講:
1. 消息先從生產者Producer出發到達交換器Exchange;
2. 交換器Exchange根據路由規則將消息轉發對應的隊列Queue之上;
3. 消息在隊列Queue上進行存儲;
4. 消費者Consumer訂閱隊列Queue並進行消費。
咱們對於消息可靠性的分析也從這四個階段來一一探討。編程
Phase 1
消息從生產者發出到達交換器Exchange,在這個過程當中能夠發生各類狀況,生產者客戶端發送出去以後能夠發生網絡丟包、網絡故障等形成消息丟失。通常狀況下若是不採起措施,生產者沒法感知消息是否已經正確無誤的發送到交換器中。若是消息在傳輸到Exchange的過程當中發生失敗而可讓生產者感知的話,生產者能夠進行進一步的處理動做,好比從新投遞相關消息以確保消息的可靠性。緩存
爲此AMQP協議在創建之初就考慮到這種狀況而提供了事務機制。RabbitMQ客戶端中與事務機制相關的方法有三個:channel.txSelect、channel.txCommit以及channel.txRollback。channel.txSelect用於將當前的信道設置成事務模式,channel.txCommit用於提交事務,而channel.txRollback用於事務回滾。在經過channel.txSelect方法開啓事務以後,咱們即可以發佈消息給RabbitMQ了,若是事務提交成功,則消息必定到達了RabbitMQ中,若是在事務提交執行以前因爲RabbitMQ異常崩潰或者其餘緣由拋出異常,這個時候咱們即可以將其捕獲,進而經過執行channel.txRollback方法來實現事務回滾。注意這裏的RabbitMQ中的事務機制與大多數數據庫中的事務概念並不相同,須要注意區分。網絡
事務確實可以解決消息發送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,不然咱們即可在捕獲異常以後進行事務回滾,與此同時能夠進行消息重發。可是使用事務機制的話會「吸乾」RabbitMQ的性能,那麼有沒有更好的方法既能保證消息發送方確認消息已經正確送達,又能基本上不帶來性能上的損失呢?從AMQP協議層面來看並無更好的辦法,可是RabbitMQ提供了一個改進方案,即發送方確認機制(publisher confirm)。異步
生產者將信道設置成confirm(確認)模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的惟一ID),這就使得生產者知曉消息已經正確到達了目的地了。RabbitMQ回傳給生產者的確認消息中的deliveryTag包含了確認消息的序號,此外RabbitMQ也能夠設置channel.basicAck方法中的multiple參數,表示到這個序號以前的全部消息都已經獲得了處理。
事務機制在一條消息發送以後會使發送端阻塞,以等待RabbitMQ的迴應,以後才能繼續發送下一條消息。相比之下,發送方確認機制最大的好處在於它是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用即可以經過回調方法來處理該確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack(Basic.Nack)命令,生產者應用程序一樣能夠在回調方法中處理該nack命令。分佈式
生產者經過調用channel.confirmSelect方法(即Confirm.Select命令)將信道設置爲confirm模式,以後RabbitMQ會返回 Confirm.Select-Ok命令表示贊成生產者將當前信道設置爲confirm模式。全部被髮送的後續消息都被ack或者nack一次,不會出現一條消息即被ack又被nack的狀況。而且RabbitMQ也並無對消息被confirm的快慢作任何保證。性能
事務機制和publisher confirm機制二者是互斥的,不能共存。若是企圖將已開啓事務模式的信道再設置爲publisher confirm模式,RabbitMQ會報錯:{amqp_error, precondition_failed, 「cannot switch from tx to confirm mode」, ‘confirm.select’},或者若是企圖將已開啓publisher confirm模式的信道在設置爲事務模式的話,RabbitMQ也會報錯:{amqp_error, precondition_failed, 「cannot switch from confirm to tx mode」, ‘tx.select’ }。操作系統
事務機制和publisher confirm機制確保的是消息可以正確的發送至RabbitMQ,這裏的「發送至RabbitMQ」的含義是指消息被正確的發往至RabbitMQ的交換器,若是此交換器沒有匹配的隊列的話,那麼消息也將會丟失。因此在使用這兩種機制的時候要確保所涉及的交換器可以有匹配的隊列。更進一步的講,發送方要配合mandatory參數或者備份交換器一塊兒使用來提升消息傳輸的可靠性。設計
Phase 2
mandatory和immediate是channel.basicPublish方法中的兩個參數,它們都有當消息傳遞過程當中不可達目的地時將消息返回給生產者的功能。而RabbitMQ提供的備份交換器(Alternate Exchange)能夠將未能被交換器路由的消息(沒有綁定隊列或者沒有匹配的綁定)存儲起來,而不用返回給客戶端。
RabbitMQ 3.0版本開始去掉了對於immediate參數的支持,對此RabbitMQ官方解釋是:immediate參數會影響鏡像隊列的性能,增長代碼複雜性,建議採用TTL和DLX的方法替代。因此本文只簡單介紹mandatory和備份交換器。
當mandatory參數設爲true時,交換器沒法根據自身的類型和路由鍵找到一個符合條件的隊列的話,那麼RabbitMQ會調用Basic.Return命令將消息返回給生產者。當mandatory參數設置爲false時,出現上述情形的話,消息直接被丟棄。 那麼生產者如何獲取到沒有被正確路由到合適隊列的消息呢?這時候能夠經過調用channel.addReturnListener來添加ReturnListener監聽器實現。使用mandatory參數的關鍵代碼以下所示:orm
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("Basic.Return返回的結果是:" + message);
}
});
上面代碼中生產者沒有成功的將消息路由到隊列,此時RabbitMQ會經過Basic.Return返回「mandatory test」這條消息,以後生產者客戶端經過ReturnListener監聽到了這個事件,上面代碼的最後輸出應該是「Basic.Return返回的結果是:mandatory test」。
生產者能夠經過ReturnListener中返回的消息來從新投遞或者其它方案來提升消息的可靠性。
備份交換器,英文名稱Alternate Exchange,簡稱AE,或者更直白的能夠稱之爲「備胎交換器」。生產者在發送消息的時候若是不設置mandatory參數,那麼消息在未被路由的狀況下將會丟失,若是設置了mandatory參數,那麼須要添加ReturnListener的編程邏輯,生產者的代碼將變得複雜化。若是你不想複雜化生產者的編程邏輯,又不想消息丟失,那麼可使用備份交換器,這樣能夠將未被路由的消息存儲在RabbitMQ中,再在須要的時候去處理這些消息。 能夠經過在聲明交換器(調用channel.exchangeDeclare方法)的時候添加alternate-exchange參數來實現,也能夠經過策略的方式實現。若是二者同時使用的話,前者的優先級更高,會覆蓋掉Policy的設置。
參考下圖,若是此時咱們發送一條消息到normalExchange上,當路由鍵等於「normalKey」的時候,消息能正確路由到normalQueue這個隊列中。若是路由鍵設爲其餘值,好比「errorKey」,即消息不能被正確的路由到與normalExchange綁定的任何隊列上,此時就會發送給myAe,進而發送到unroutedQueue這個隊列。
備份交換器其實和普通的交換器沒有太大的區別,爲了方便使用,建議設置爲fanout類型,如若讀者想設置爲direct或者topic的類型也沒有什麼不妥。須要注意的是消息被從新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是同樣的。備份交換器的實質就是原有交換器的一個「備胎」,全部沒法正確路由的消息都發往這個備份交換器中,能夠爲全部的交換器設置同一個AE,不過這裏須要提早確保的是AE已經正確的綁定了隊列,最好類型也是fanout的。若是備份交換器和mandatory參數一塊兒使用,那麼mandatory參數無效。
Phase 3
mandatory或者AE可讓消息在路由到隊列以前獲得極大的可靠性保障,可是消息存入隊列以後的可靠性又如何保證?
首先是持久化。持久化能夠提升隊列的可靠性,以防在異常狀況(重啓、關閉、宕機等)下的數據丟失。隊列的持久化是經過在聲明隊列時將durable參數置爲true實現的,若是隊列不設置持久化,那麼在RabbitMQ服務重啓以後,相關隊列的元數據將會丟失,此時數據也會丟失。正所謂「皮之不存,毛將焉附」,隊列都沒有了,消息又能存在哪裏呢?隊列的持久化能保證其自己的元數據不會因異常狀況而丟失,可是並不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,須要將其設置爲持久化。經過將消息的投遞模式(BasicProperties中的deliveryMode屬性)設置爲2便可實現消息的持久化。
設置了隊列和消息的持久化,當RabbitMQ服務重啓以後,消息依舊存在。單單隻設置隊列持久化,重啓以後消息會丟失;單單隻設置消息的持久化,重啓以後隊列消失,既而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無心義。
在持久化的消息正確存入RabbitMQ以後,還須要有一段時間(雖然很短,可是不可忽視)才能存入磁盤之中。RabbitMQ並不會爲每條消息都作同步存盤(調用內核的fsync6方法)的處理,可能僅僅保存到操做系統緩存之中而不是物理磁盤之中。若是在這段時間內RabbitMQ服務節點發生了宕機、重啓等異常狀況,消息保存還沒來得及落盤,那麼這些消息將會丟失。
若是在Phase1中採用了事務機制或者publisher confirm機制的話,服務端的返回是在消息落盤以後執行的,這樣能夠進一步的提升了消息的可靠性。可是即使如此也沒法避免單機故障且沒法修復(好比磁盤損毀)而引發的消息丟失,這裏就須要引入鏡像隊列。鏡像隊列至關於配置了副本,絕大多數分佈式的東西都有多副本的概念來確保HA。在鏡像隊列中,若是主節點(master)在此特殊時間內掛掉,能夠自動切換到從節點(slave),這樣有效的保證了高可用性,除非整個集羣都掛掉。雖然這樣也不能徹底的保證RabbitMQ消息不丟失(好比機房被炸。。。),可是配置了鏡像隊列要比沒有配置鏡像隊列的可靠性要高不少,在實際生產環境中的關鍵業務隊列通常都會設置鏡像隊列。
Phase 4
進一步的從消費者的角度來講,若是在消費者接收到相關消息以後,還沒來得及處理就宕機了,這樣也算數據丟失。
爲了保證消息從隊列可靠地達到消費者,RabbitMQ提供了消息確認機制(message acknowledgement)。消費者在訂閱隊列時,能夠指定autoAck參數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回覆確認信號後才從內存(或者磁盤)中移去消息(實質上是先打上刪除標記,以後再刪除)。當autoAck等於true時,RabbitMQ會自動把發送出去的消息置爲確認,而後從內存(或者磁盤)中刪除,而無論消費者是否真正的消費到了這些消息。
採用消息確認機制後,只要設置autoAck參數爲false,消費者就有足夠的時間處理消息(任務),不用擔憂處理消息過程當中消費者進程掛掉後消息丟失的問題,由於RabbitMQ會一直等待持有消息直到消費者顯式調用Basic.Ack命令爲止。
當autoAck參數置爲false,對於RabbitMQ服務端而言,隊列中的消息分紅了兩個部分:一部分是等待投遞給消費者的消息;一部分是已經投遞給消費者,可是尚未收到消費者確認信號的消息。若是RabbitMQ一直沒有收到消費者的確認信號,而且消費此消息的消費者已經斷開鏈接,則RabbitMQ會安排該消息從新進入隊列,等待投遞給下一個消費者,固然也有可能仍是原來的那個消費者。
RabbitMQ不會爲未確認的消息設置過時時間,它判斷此消息是否須要從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開,這麼設計的緣由是RabbitMQ容許消費者消費一條消息的時間能夠好久好久。
若是消息消費失敗,也能夠調用Basic.Reject或者Basic.Nack來拒絕當前消息而不是確認,若是隻是簡單的拒絕那麼消息會丟失,須要將相應的requeue參數設置爲true,那麼RabbitMQ會從新將這條消息存入隊列,以即可以發送給下一個訂閱的消費者。若是requeue參數設置爲false的話,RabbitMQ當即會把消息從隊列中移除,而不會把它發送給新的消費者。
還有一種狀況須要考慮:requeue的消息是存入隊列頭部的,便可以快速的又被髮送給消費,若是此時消費者又不能正確的消費而又requeue的話就會進入一個無盡的循環之中。對於這種狀況,筆者的建議是在出現沒法正確消費的消息時不要採用requeue的方式來確保消息可靠性,而是從新投遞到新的隊列中,好比設定的死信隊列中,以此能夠避免前面所說的死循環而又能夠確保相應的消息不丟失。對於死信隊列中的消息能夠用另外的方式來消費分析,以便找出問題的根本