RabbitMQ吐血總結(2)---高級特性總結

經歷上一篇的基礎API總結,其實RabbitMQ的基礎使用,就不成問題了。可是要想稍微拔高,仍是要經歷這一篇的洗禮。一直以來,我面試別人的時候,大多數面試者的簡歷中,都會寫上熟練使用RabbitMQ,然而,我問出一個只要是消息中間件就老生常談的話題的時候,幾乎清一色的,都沒有很好的說出來。這個問題就是:請介紹下,RabbitMQ如何保證消息的可靠性的。經過這一篇文章的總結,我想讓本身達到對這個問題的細節覆蓋全面的程度,至少一個架構師過來問我,我能有條理有邏輯的說明白,不會東一句西一句。做爲RabbitMQ,這種咱們平常生產中使用頻率至關高的消息中間件,我認爲,對他的掌控,要更好,才能說明咱們對技術的追求,而不僅是CURD。java

1、mandatory與immediate參數

這兩個參數,是保證消息可靠性的第一扇門,咱們先來看看上篇文章中,消息發送的api源碼:面試

/**
 * 發佈一個消息到服務端。
 *
 * @param mandatory 後面文章介紹
 * @param immediate 後面文章介紹
 */
void basicPublish(String exchange,
            String routingKey,
            boolean mandatory,
            boolean immediate,
            BasicProperties props,
            byte[] body) throws IOException;

這兩個參數,上一篇裏面註釋的,這一章咱們來解開編程

一、mandatory

  • true:交換機沒法根據自身的烈性和路由鍵找到一個符合條件的隊列,那麼RabbitMQ就會調用Basic.Return命令將消息返回給生產者
  • false:如何出現true的狀況,消息直接被丟棄

咱們看看如何獲取到madatory爲true的時候,消息沒有被正確路由,返回給生產者的消息:api

channel.basicPublish("exchangeName", "routingKey",
        true, MessageProperties.PERSISTENT_TEXT_PLAIN, "test".getBytes());
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText,
                             String exchange, String routingKey,
                             AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body);
        System.out.println("返回的結果是:" + msg);
    }
});

二、immediate

  • true:若是交換機路由到隊列上並不存在任何消費者,那麼消息將不會存入隊列中,該消息經過Basic.Return返回給生產者
  • false:出現上述狀況,消息同樣會發送給消息隊列

mandatory主要保護的是:交換機是否能正確匹配到消息隊列,immediate主要保護的是消息隊列是否有消費者。經過這兩個參數,能夠保證消息在整個從發送到接收過程當中,全稱掌控。緩存

RabbitMQ3.0以後去掉了immediate參數的支持,官方說法是會影響性能,增長代碼複雜性,建議使用TTL(消息最大生存時間)和DLX(死信隊列)來代替服務器

2、備份交換機

這東西主要應對mandatory參數不想去設置,而且,這個參數設置了,會增大代碼的侵入性,那咱們又如何保障消息沒有匹配的隊列這種狀況不丟失呢,就使用這個。下面是一段使用備份交換機的代碼:架構

Map<String, Object> arguments = new HashMap<>();
// alternate-exchange這個參數就是設置具體的備份交換機是誰
arguments.put("alternate-exchange", "myAe");
channel.exchangeDeclare("nomalExchange", "direct", true, false, arguments);
channel.exchangeDeclare("myAe","fanout",true,false,null);
channel.queueDeclare("nomalQueue",true,false,false,null);
channel.queueBind("nomalQueue","nomalExchange","normalKey");
channel.queueDeclare("unroutedQueue",true,false,false,null);
channel.queueBind("unroutedQueue","myAe","");

這段代碼的主要示意圖以下:運維

有如下幾點:異步

  • 發送到備份交換機上面的路由鍵和原始的路由鍵一致
  • 若是設置的備份交換機不存在,客戶端和RabbitMQ服務端都不會有異常,此時消息丟失
  • 若是備份交換機沒有綁定隊列,客戶端和RabbitMQ服務端都不會有異常,此時消息丟失
  • 若是備份交換機沒有任何匹配的隊列,客戶端和RabbitMQ服務端都不會有任何異常,此時消息丟失
  • mandatory與備份交換機一塊兒使用,那麼mandatory無效

3、過時時間(TTL)

過時時間分爲消息的過時時間和隊列的過時時間ide

一、消息過時時間

消息過時時間設置,有兩種方式:

  • 第一種是經過設置隊列屬性的方式,隊列中的全部消息都有相同的過時時間
  • 第二種是經過設置消息自己的屬性,沒調消息的過時時間不一樣

若是兩個一塊兒使用的話,會取較小的那個值。而且若是消息到了過時時間以後尚未消費者進行消費的話,就會變成死信。下面咱們首先來看看如何經過隊列屬性的方式設置過時時間:

Map<String, Object> arguments = new HashMap<>();
// x-message-ttl經過這個參數進行設置
arguments.put("x-message-ttl",6000);
channel.queueDeclare("queueName",true,false,false,arguments);

畫外音:固然還能夠經過Policy與HTTPAPI的方式進行設置,可是我感受這兩種偏運維,這裏主要想寫寫開發視角,我就很少寫這兩種設置方式了

不設置這個參數,表示隊列裏面的消息不會過時,設置成0,除非消息立刻被消費者消費,不然將會被丟棄,這個設置0的特性能夠部分代替immediate這個參數。下面咱們來看看直接設置消息的TTL:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);// 持久化消息
builder.expiration("60000");//設置TTL=60000ms
AMQP.BasicProperties properties = builder.build();
channel.basicPublish("exchangeName", "routingKey", properties, "123".getBytes());

兩種過時效果,對消息刪除的契機不太同樣:

  • 第一種:一旦過時就會從隊列中刪除消息
  • 第二種:在投遞到消費者以前進行斷定,而後刪除

二、隊列的TTL

隊列過時表示,這個隊列上面沒有任何的消費者,且隊列沒有被從新聲明過,而且在過時時間段內未調用過Basic.Get命令。RabbitMQ會確保再過時時間到達後將隊列刪除,但不能保證動做有多麼的及時,再RabbitMQ重啓以後,過時時間將會被從新計算,下面是設置隊列的過時時間:

Map<String, Object> arguments = new HashMap<>();
// x-expires經過這個參數進行設置
arguments.put("x-expires",6000);
channel.queueDeclare("queueName",true,false,false,arguments);

4、死信隊列與延遲隊列

一個消息,變成死信的時候,就會被髮送到一個交換機裏面,這個交換機就是DLX(死信交換機),綁定到DLX的隊列就是死信隊列。消息變成死信有以下幾個狀況:

  1. 消息被拒絕(Basic.Reject/Basic.Nack),並設置了requeue參數爲false
  2. 消息過時
  3. 隊列長度達到了最大

其實DLX和通常交換機沒區別,就是將一個普通的隊列設置一下DLX的屬性,而後這個隊列裏面編程死信的消息就會被髮送到這個交換機上面。這個特性,咱們能夠爲DLX綁定一個隊列,而後配合TTL等於0,來彌補3.0中去除掉的immediate參數的功能。下面是一段簡單設置DLX的代碼:

channel.exchangeDeclare("exchange.dlx", "direct",
        true, false, false, null);
channel.exchangeDeclare("exchange.normal", "fanout",
        true, false, false, null);
Map<String, Object> argument = new HashMap<>();
// 設置DLX
argument.put("x-dead-letter-exchange", "exchange.dlx");
// 設置DLK,就是消息變成死信以後的路由鍵
argument.put("x-dead-letter-routing-key", "routingkey");
// 設置隊列的過時時間
argument.put("x-message-ttl", 10000);


channel.queueDeclare("queue.normal", false, false, false, argument);
channel.queueBind("queue.normal", "exchange.normal", "");
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");
channel.basicPublish("exchange.normal", "rk",
        MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());

核心的屬性:

  • x-dead-letter-exchange:爲一個隊列配置DLX
  • x-dead-letter-routing-key:爲DLX指定一個路由鍵,沒有指定的話將使用遠消息的路由鍵

下面是這個死信隊列的一個簡單的圖例:

接下來就能夠引出延遲隊列這個概念了,經過上面的TTL與DLX的詳細解說,其實咱們徹底能夠用這兩個來實現延遲隊列的功能。無非就是將消費者直接去消費死信隊列裏面的消息,而不是直接消費普通隊列的消息。這樣普通隊列,咱們能夠設置消息的TTL,而後,到了指定的過時時間,就會直接發送到DLX綁定的隊列裏面,這樣,咱們消費者就能消費到了。這樣就丁算是過了TTL毫秒,延遲收到消息。咱們徹底能夠經過bindingKey來動態的指定不一樣的隊列,每一個隊列設置不一樣的TTL,每一個隊列設置不一樣的DLX,而後每一個DLX又是不一樣的死信隊列,這樣,延遲消息就能夠運行了。這裏代碼不寫了,都是重複性的代碼。給出延遲隊列的圖例:

4、生產者確認

這一部分對於學習整個RabbitMQ的高可用、消息可靠性具備相當重要的做用。在介紹生產者確認以前,咱們來看看,至今爲止,咱們接觸到的相關RabbitMQ實體,有哪幾種持久化,與這幾種持久化對應的效果:

  • 交換機持久化:false狀況重啓,交換機的元數據丟失,消息不會丟失,只不過沒法往這個交換機發送消息了
  • 隊列持久化:false狀況,重啓以後隊列元數據與消息都會丟失,消息最終存儲是在隊列裏面的
  • 消息持久化:發送時候經過發送屬性聲明,不持久化消息有可能會丟失

其實咱們使用默認的屬性封裝的常量,已經封裝了消息,咱們來看看源碼:

public class MessageProperties {

    ......

    /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
    public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,// deliveryMode
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
}

可是,即便是上面提到的實體,咱們都進行了持久化,咱們仍是會有沒法保證消息不會丟失的場景,下面說兩個:

  • 若是消息被消費的時候設置了autoAck爲true,以後消費者沒來得及處理就宕機了,消息在服務端也被刪除了。這種可使用autoAck爲false來解決
  • 若是消息設置成了持久化,可是消息剛剛發送到RabbitMQ服務端,到持久化這個過程仍是有一段時間間隔的,這段時間服務端宕機,那麼消息也會丟失

爲了解決一些異常宕機或者其餘狀況致使的消息不可靠的場景,可使用如下兩種技術來解決:

  • 鏡像(咱們在後面原理章節會介紹)
  • 生產者確認機制

生產者確認又能夠細分紅兩種:

  • 事務機制
  • 發送方確認機制

下面咱們一個個來講

一、事務機制

首先說說,這種事務機制,其實會榨乾RabbitMQ的全乎性能,徹底不推薦使用,不過做爲一種機制,仍是要細說。與具體的事務操做相似,整個發送的事務,也是三步走:

  1. channel.txSelect
  2. 發送消息
  3. channel.txCommit
  4. 回滾:channel.txRollback

下面就是正常事務發送消息的時序圖:

下面是回滾的事務時序圖:

下面是極簡的一段代碼:

try {
    channel.txSelect();
    channel.basicPublish("exchange.normal", "rk",
            MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
    channel.txCommit();
} catch (IOException e) {
    e.printStackTrace();
    channel.txRollback();
}

二、發送方確認機制

首先咱們來看第一種確認機制的代碼:

channel.confirmSelect();
channel.basicPublish("exchange.normal", "rk",
        MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
try {
    if(!channel.waitForConfirms()){
        System.out.println("failed");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

這種方式其實並不能增長吞吐量,由於是同一個線程進行同步確認的固然,咱們可使用一個容器,而且批量進行確認,增長吞吐量。下面是模板:

channel.confirmSelect();
int msgcount = 0;
while (true) {
    channel.basicPublish("exchange.normal", "rk",
            MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
    // 將發出去的消息存儲在一個容器裏面
    if(++msgcount>34) {
        msgcount = 0;
        try {
            if (channel.waitForConfirms()) {
                // 將緩存清空
                continue;
            }
            // 將緩存中的消息重發
        } catch (InterruptedException e) {
            e.printStackTrace();
            // 將緩存中的消息重發
        }
    }
}

固然,最佳的方式,是經過異步的方式,註冊監聽器,來處理這種生產者確認的方式。咱們來看看具體的代碼模板

channel.confirmSelect();
TreeSet<Long> confirmSet = new TreeSet<>();
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("nack,seqNo"+deliveryTag+", nultiple:"+multiple);
        if(multiple){
            confirmSet.headSet(deliveryTag-1).clear();
        }else{
            confirmSet.remove(deliveryTag);
        }

    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        if(multiple){
            confirmSet.headSet(deliveryTag-1).clear();
        }else{
            confirmSet.remove(deliveryTag);
        }
        // 這裏要從新發送消息
    }
});

while(true){
    long nextSeq = channel.getNextPublishSeqNo();
    channel.basicPublish("exchange.normal", "rk",
            MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
    confirmSet.add(nextSeq);
}

5、消息分發

這一部分,主要說幾個概念,也是對消息的消費頗有幫助的點

一、Qos是啥

在消費者這一邊能夠經過一個方法,來設置Qos:

/**
 * 設置所謂的「服務質量」
 *
 * 這個設置主要可以限制在服務端發給消費者消息的時候,最大能保持多少未確認的消息,
 * 在一個信道上面。所以,Qos就提供了一種基於消費者數據流控制的手段。
 * @param prefetchSize 服務端發送給消費者最大消息大小 (使用八進制表示),0表示不控制
 * @param prefetchCount 最大服務端發送給消費者的未確認消息數,0表示不控制
 * @param global true表示這個設置要應用到此Connection上的各個消費者上面
 */
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount, boolean global) throws IOException;

void basicQos(int prefetchCount) throws IOException;

針對性的,咱們來講說global這個參數:

  • false:(默認值)一個信道上面的全部消費者,每一個最大保持的未確認消息數都是prefetchCount
  • true:當前通訊鏈路(Connection)上全部消費者,都須要聽從prefetchCount的限定值

針對global爲true的時候要協調多個消費者,這種狀況下很是消耗性能,RabbitMQ針對性的修改了global的定義:

  • false:信道上新的消費者須要聽從prefetchCount限定值
  • true:信道上的全部消費者都須要聽從prefetchCount限定值

可見,主要是把限制範圍縮小了,從Connection級別到channel級別。

二、棄用QueueingConsumer

咱們先來看一段QueueingConsumer代碼:

QueueingConsumer consumer = new QueueingConsumer(channel);
// channel.basicQos(4);
channel.basicConsumer("QueueName",false,"consumer_zzh",consumer);
while(true){
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    // 對消息作業務邏輯處理
    channel.basicAck(dlivery.getEnvelope().getDeliveryTag(),false);
}

若是環境不是特別的「傲嬌」,其實上面代碼也沒問題,可是要是一會兒來了很是大量的消息要消費,這個QueueingConsumer就是形成內存溢出狀況,由於他內部使用了一個LinkedBlockingQueue,每次都是循環逐條的進行處理,這樣,消息確定會堆積,內存佔用一會兒就上去了。固然咱們可使用Qos來控制這一點。可是,這東西還會存在下面的缺陷:

  • QueueingConsumer會拖累同一個Connection下的全部信道,使其性能下降
  • 同步遞歸調用QueueingConsumer會產生死鎖
  • RabbitMQ自動鏈接恢復機制,不知道QueueingConsumer
  • QueueingConsumer不是事件驅動

因此爲了不這麼多問題,儘可能都要使用DefaultConsumer的方式進行消費

6、總結

最後這部分,咱們收攏一下這一章中的一些點。首先咱們來看看消息中間件中消息可靠性的三個級別:

  • 最多一次:消息可能會丟失,但毫不重複
  • 最少一次:消息毫不會丟失,但可能會重複
  • 恰好一次:每條消息確定會被傳輸一次,且僅一次

RabbitMQ支持其中的最多一次和最少一次。咱們來看看最少一次投遞的時候,要考慮消息可靠性,要考慮如下幾個方面:

  • 消息生產者須要開啓事物機制或者是生產者確認機制,以確保消息能夠可靠性的傳輸到RabbitMQ中
  • 消息生產者須要配合使用mandatory參數或者備份交換機來確保消息可以從交換機中路由到隊列裏面去,進而保證消息不會被丟棄
  • 消息與隊列都須要進行持久化,以確保RabbitMQ服務器遇到異常狀況時不形成消息丟失
  • 消費者在消費消息的同時須要將autoAck設置爲false,而後經過手動確認的方式去確認已經正確消費的消息,以免消費者這邊形成消息丟失

最多一次,咱們只要生產者隨意發送,消費者隨意消費,不過這樣很難確保消息的可靠性,不會丟失。另外在咱們的業務代碼中,要確保消費者的冪等性,以防止消息的重複發送。

至此RabbitMQ的基礎與高級的使用方式,已經講解完了,下面一章節,咱們進入RabbitMQ原理級別的總結。因爲是erlang寫的,我本人也看不懂erlang,主要就是對核心的幾個原理進行記錄一下罷了,根本沒有源碼講解,因此也請放鬆,不難,就看你努力不努力了。

相關文章
相關標籤/搜索