經歷上一篇的基礎API總結,其實RabbitMQ的基礎使用,就不成問題了。可是要想稍微拔高,仍是要經歷這一篇的洗禮。一直以來,我面試別人的時候,大多數面試者的簡歷中,都會寫上熟練使用RabbitMQ,然而,我問出一個只要是消息中間件就老生常談的話題的時候,幾乎清一色的,都沒有很好的說出來。這個問題就是:請介紹下,RabbitMQ如何保證消息的可靠性的。經過這一篇文章的總結,我想讓本身達到對這個問題的細節覆蓋全面的程度,至少一個架構師過來問我,我能有條理有邏輯的說明白,不會東一句西一句。做爲RabbitMQ,這種咱們平常生產中使用頻率至關高的消息中間件,我認爲,對他的掌控,要更好,才能說明咱們對技術的追求,而不僅是CURD。java
這兩個參數,是保證消息可靠性的第一扇門,咱們先來看看上篇文章中,消息發送的api源碼:面試
/** * 發佈一個消息到服務端。 * * @param mandatory 後面文章介紹 * @param immediate 後面文章介紹 */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
這兩個參數,上一篇裏面註釋的,這一章咱們來解開編程
咱們看看如何獲取到madatory爲true的時候,消息沒有被正確路由,返回給生產者的消息:api
channel.basicPublish("exchangeName", "routingKey", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "test".getBytes()); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("返回的結果是:" + msg); } });
mandatory主要保護的是:交換機是否能正確匹配到消息隊列,immediate主要保護的是消息隊列是否有消費者。經過這兩個參數,能夠保證消息在整個從發送到接收過程當中,全稱掌控。緩存
RabbitMQ3.0以後去掉了immediate參數的支持,官方說法是會影響性能,增長代碼複雜性,建議使用TTL(消息最大生存時間)和DLX(死信隊列)來代替服務器
這東西主要應對mandatory參數不想去設置,而且,這個參數設置了,會增大代碼的侵入性,那咱們又如何保障消息沒有匹配的隊列這種狀況不丟失呢,就使用這個。下面是一段使用備份交換機的代碼:架構
Map<String, Object> arguments = new HashMap<>(); // alternate-exchange這個參數就是設置具體的備份交換機是誰 arguments.put("alternate-exchange", "myAe"); channel.exchangeDeclare("nomalExchange", "direct", true, false, arguments); channel.exchangeDeclare("myAe","fanout",true,false,null); channel.queueDeclare("nomalQueue",true,false,false,null); channel.queueBind("nomalQueue","nomalExchange","normalKey"); channel.queueDeclare("unroutedQueue",true,false,false,null); channel.queueBind("unroutedQueue","myAe","");
這段代碼的主要示意圖以下:運維
有如下幾點:異步
過時時間分爲消息的過時時間和隊列的過時時間ide
消息過時時間設置,有兩種方式:
若是兩個一塊兒使用的話,會取較小的那個值。而且若是消息到了過時時間以後尚未消費者進行消費的話,就會變成死信。下面咱們首先來看看如何經過隊列屬性的方式設置過時時間:
Map<String, Object> arguments = new HashMap<>(); // x-message-ttl經過這個參數進行設置 arguments.put("x-message-ttl",6000); channel.queueDeclare("queueName",true,false,false,arguments);
畫外音:固然還能夠經過Policy與HTTPAPI的方式進行設置,可是我感受這兩種偏運維,這裏主要想寫寫開發視角,我就很少寫這兩種設置方式了
不設置這個參數,表示隊列裏面的消息不會過時,設置成0,除非消息立刻被消費者消費,不然將會被丟棄,這個設置0的特性能夠部分代替immediate這個參數。下面咱們來看看直接設置消息的TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2);// 持久化消息 builder.expiration("60000");//設置TTL=60000ms AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchangeName", "routingKey", properties, "123".getBytes());
兩種過時效果,對消息刪除的契機不太同樣:
隊列過時表示,這個隊列上面沒有任何的消費者,且隊列沒有被從新聲明過,而且在過時時間段內未調用過Basic.Get命令。RabbitMQ會確保再過時時間到達後將隊列刪除,但不能保證動做有多麼的及時,再RabbitMQ重啓以後,過時時間將會被從新計算,下面是設置隊列的過時時間:
Map<String, Object> arguments = new HashMap<>(); // x-expires經過這個參數進行設置 arguments.put("x-expires",6000); channel.queueDeclare("queueName",true,false,false,arguments);
一個消息,變成死信的時候,就會被髮送到一個交換機裏面,這個交換機就是DLX(死信交換機),綁定到DLX的隊列就是死信隊列。消息變成死信有以下幾個狀況:
其實DLX和通常交換機沒區別,就是將一個普通的隊列設置一下DLX的屬性,而後這個隊列裏面編程死信的消息就會被髮送到這個交換機上面。這個特性,咱們能夠爲DLX綁定一個隊列,而後配合TTL等於0,來彌補3.0中去除掉的immediate參數的功能。下面是一段簡單設置DLX的代碼:
channel.exchangeDeclare("exchange.dlx", "direct", true, false, false, null); channel.exchangeDeclare("exchange.normal", "fanout", true, false, false, null); Map<String, Object> argument = new HashMap<>(); // 設置DLX argument.put("x-dead-letter-exchange", "exchange.dlx"); // 設置DLK,就是消息變成死信以後的路由鍵 argument.put("x-dead-letter-routing-key", "routingkey"); // 設置隊列的過時時間 argument.put("x-message-ttl", 10000); channel.queueDeclare("queue.normal", false, false, false, argument); channel.queueBind("queue.normal", "exchange.normal", ""); channel.queueDeclare("queue.dlx", true, false, false, null); channel.queueBind("queue.dlx", "exchange.dlx", "routingkey"); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
核心的屬性:
下面是這個死信隊列的一個簡單的圖例:
接下來就能夠引出延遲隊列這個概念了,經過上面的TTL與DLX的詳細解說,其實咱們徹底能夠用這兩個來實現延遲隊列的功能。無非就是將消費者直接去消費死信隊列裏面的消息,而不是直接消費普通隊列的消息。這樣普通隊列,咱們能夠設置消息的TTL,而後,到了指定的過時時間,就會直接發送到DLX綁定的隊列裏面,這樣,咱們消費者就能消費到了。這樣就丁算是過了TTL毫秒,延遲收到消息。咱們徹底能夠經過bindingKey來動態的指定不一樣的隊列,每一個隊列設置不一樣的TTL,每一個隊列設置不一樣的DLX,而後每一個DLX又是不一樣的死信隊列,這樣,延遲消息就能夠運行了。這裏代碼不寫了,都是重複性的代碼。給出延遲隊列的圖例:
這一部分對於學習整個RabbitMQ的高可用、消息可靠性具備相當重要的做用。在介紹生產者確認以前,咱們來看看,至今爲止,咱們接觸到的相關RabbitMQ實體,有哪幾種持久化,與這幾種持久化對應的效果:
其實咱們使用默認的屬性封裝的常量,已經封裝了消息,咱們來看看源碼:
public class MessageProperties { ...... /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */ public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2,// deliveryMode 0, null, null, null, null, null, null, null, null, null); }
可是,即便是上面提到的實體,咱們都進行了持久化,咱們仍是會有沒法保證消息不會丟失的場景,下面說兩個:
爲了解決一些異常宕機或者其餘狀況致使的消息不可靠的場景,可使用如下兩種技術來解決:
生產者確認又能夠細分紅兩種:
下面咱們一個個來講
首先說說,這種事務機制,其實會榨乾RabbitMQ的全乎性能,徹底不推薦使用,不過做爲一種機制,仍是要細說。與具體的事務操做相似,整個發送的事務,也是三步走:
下面就是正常事務發送消息的時序圖:
下面是回滾的事務時序圖:
下面是極簡的一段代碼:
try { channel.txSelect(); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); channel.txCommit(); } catch (IOException e) { e.printStackTrace(); channel.txRollback(); }
首先咱們來看第一種確認機制的代碼:
channel.confirmSelect(); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); try { if(!channel.waitForConfirms()){ System.out.println("failed"); } } catch (InterruptedException e) { e.printStackTrace(); }
這種方式其實並不能增長吞吐量,由於是同一個線程進行同步確認的固然,咱們可使用一個容器,而且批量進行確認,增長吞吐量。下面是模板:
channel.confirmSelect(); int msgcount = 0; while (true) { channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); // 將發出去的消息存儲在一個容器裏面 if(++msgcount>34) { msgcount = 0; try { if (channel.waitForConfirms()) { // 將緩存清空 continue; } // 將緩存中的消息重發 } catch (InterruptedException e) { e.printStackTrace(); // 將緩存中的消息重發 } } }
固然,最佳的方式,是經過異步的方式,註冊監聽器,來處理這種生產者確認的方式。咱們來看看具體的代碼模板
channel.confirmSelect(); TreeSet<Long> confirmSet = new TreeSet<>(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("nack,seqNo"+deliveryTag+", nultiple:"+multiple); if(multiple){ confirmSet.headSet(deliveryTag-1).clear(); }else{ confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if(multiple){ confirmSet.headSet(deliveryTag-1).clear(); }else{ confirmSet.remove(deliveryTag); } // 這裏要從新發送消息 } }); while(true){ long nextSeq = channel.getNextPublishSeqNo(); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); confirmSet.add(nextSeq); }
這一部分,主要說幾個概念,也是對消息的消費頗有幫助的點
在消費者這一邊能夠經過一個方法,來設置Qos:
/** * 設置所謂的「服務質量」 * * 這個設置主要可以限制在服務端發給消費者消息的時候,最大能保持多少未確認的消息, * 在一個信道上面。所以,Qos就提供了一種基於消費者數據流控制的手段。 * @param prefetchSize 服務端發送給消費者最大消息大小 (使用八進制表示),0表示不控制 * @param prefetchCount 最大服務端發送給消費者的未確認消息數,0表示不控制 * @param global true表示這個設置要應用到此Connection上的各個消費者上面 */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; void basicQos(int prefetchCount, boolean global) throws IOException; void basicQos(int prefetchCount) throws IOException;
針對性的,咱們來講說global這個參數:
針對global爲true的時候要協調多個消費者,這種狀況下很是消耗性能,RabbitMQ針對性的修改了global的定義:
可見,主要是把限制範圍縮小了,從Connection級別到channel級別。
咱們先來看一段QueueingConsumer代碼:
QueueingConsumer consumer = new QueueingConsumer(channel); // channel.basicQos(4); channel.basicConsumer("QueueName",false,"consumer_zzh",consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); // 對消息作業務邏輯處理 channel.basicAck(dlivery.getEnvelope().getDeliveryTag(),false); }
若是環境不是特別的「傲嬌」,其實上面代碼也沒問題,可是要是一會兒來了很是大量的消息要消費,這個QueueingConsumer就是形成內存溢出狀況,由於他內部使用了一個LinkedBlockingQueue,每次都是循環逐條的進行處理,這樣,消息確定會堆積,內存佔用一會兒就上去了。固然咱們可使用Qos來控制這一點。可是,這東西還會存在下面的缺陷:
因此爲了不這麼多問題,儘可能都要使用DefaultConsumer的方式進行消費
最後這部分,咱們收攏一下這一章中的一些點。首先咱們來看看消息中間件中消息可靠性的三個級別:
RabbitMQ支持其中的最多一次和最少一次。咱們來看看最少一次投遞的時候,要考慮消息可靠性,要考慮如下幾個方面:
最多一次,咱們只要生產者隨意發送,消費者隨意消費,不過這樣很難確保消息的可靠性,不會丟失。另外在咱們的業務代碼中,要確保消費者的冪等性,以防止消息的重複發送。
至此RabbitMQ的基礎與高級的使用方式,已經講解完了,下面一章節,咱們進入RabbitMQ原理級別的總結。因爲是erlang寫的,我本人也看不懂erlang,主要就是對核心的幾個原理進行記錄一下罷了,根本沒有源碼講解,因此也請放鬆,不難,就看你努力不努力了。