RabbitMQ生產者發送消息確認

在使用RabbitMQ的時候,能夠經過消息的持久化操做來解決由於服務器的異常崩潰而致使的消息丟失,除此以外,咱們還會遇到一個問題,當消息的生產者將消息發送出去之後,消息到底有沒有到達服務器呢?若是不進行特殊的配置,默認狀況下發送消息的操做是不會返回任何消息給生產者的,也就是默認狀況下是不知道消息有沒有正確地到達服務器。若是在消息到達服務器以前已經丟失,持久化操做也解決不了這個問題,由於消息根本沒有到達服務器,何談持久化?java

RabbitMQ針對這個問題,提供了兩種解決方法:編程

  ❤ 事務機制緩存

  ❤ 發送方確認機制服務器

事務機制

RabbitMQ客戶端中與事務機制相關的方法有三個:channel.txSelect、channel.txCommit、channel.txRollback。channel.txSelect用於將當前的信道設置成事務模式,channel.txCommit用於提交事務,channel.txRollback用於事務回滾。異步

在經過channel.txSelect方法開啓事務以後,咱們即可以發佈消息給RabbitMQ了,若是事務提交成功,則消息必定到達了RabbitMQ中,若是在事務提交以前因爲RabbitMQ異常崩潰或者其餘的緣由拋出異常,這個時候咱們能夠將其捕獲,進而經過執行channel.txRollback方法來實現事務回滾。ide

部分示例代碼以下:性能

     channel.txSelect();
        channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
        channel.txCommit();

上述代碼是正常的狀況下的事務機制的運轉過程,而事務回滾是什麼樣子的呢?下面的代碼示例:spa

  try {
            channel.txSelect();
            channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
            int result = 1 / 0;
            channel.txCommit();
        }catch (Exception e){
            e.printStackTrace();
            channel.txRollback();
        }

 上述代碼中明顯的有一個java.lang.ArithmeticException,在事務提交以前捕獲異常,以後顯示的回滾事務。code

若是要發送多條消息,則將channel.basicPublish和channel.txCommit等方法包裹進循環內便可,能夠參考如下代碼:blog

  channel.txSelect();
        for (int i = 0;i < 1000;i++){
            try {

                channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
                int result = 1 / 0;
                channel.txCommit();
            }catch (Exception e){
                e.printStackTrace();
                channel.txRollback();
            }
        }

事務確實可以解決消息發送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,不然即可在捕獲異常以後進行事務回滾,於此同時能夠進行消息重發。可是使用事務機制會「吸乾」RabbitMQ的性能,那麼有沒有更好的方法既能保證確認消息已經正確送達,又能基本上不帶來性能上的損失呢?從AMQP協議層面上看來沒有更好的方法,可是RabbitMQ提供了一個改進方案,即發送方確認機制。

發送方確認機制

生產者將信道設置爲confirm(確認)模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的惟一ID),這就使得生產者知道消息已經到達正確的目的地了。若是消息和隊列是持久化的,那麼確認消息會在消息寫入磁盤以後發出。RabbitMQ回傳給生產者的確認消息中的deliveryTag包含了確認消息的序號,此外RabbitMQ也能夠設置channel.basicAck方法中的multiple參數,表示到這個序號以前的全部消息都已經獲得了處理,以下圖所示:

事務機制在發送一條消息以後就會使得發送端阻塞,以等待RabbitMQ的迴應,以後才能繼續發送下一條消息。相比之下,發送方確認機制最大的好處就是在於它是異步的,一旦發佈一條消息,生產者應用程序就能夠在等待信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後,生產者應用程序即可以經過回調方法來處理確認消息,若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack(Basic.Nack)命令,生產者應用程序一樣能夠在回調方法中處理該nack命令。

生產者經過調用channel.confirmSelect方法(即Confirm.Select命令)將信道設置爲confirm模式,以後RabbitMQ會返回Confirm.Select-Ok命令表示贊成生產者將當前信道設置爲confirm模式,全部被髮送的後續消息都被ack或者nack一次,不會出現一條消息既被ack又被nack的狀況,而且RabbitMQ也沒有對消息的被confirm的快慢作出任何保證。

經過下面的例子來看一下publisher confirm機制怎麼運做,代碼以下:

  try {
            channel.confirmSelect();
            channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
            if (!channel.waitForConfirms()){
                System.out.println("message failed!");
                // do something
            }
        }catch (Exception e){
            e.printStackTrace();
        }

 若是發送多條消息,只須要將channel.basicPublish和channel.waitForConfirms方法包裹在循環裏面便可,能夠參考事務機制,不過不須要把channel.confirmSelect方法包裹在循環內部。

對於channel.waitForConfirms而言,在RabbitMQ客戶端有它的4個同類的方法:

  (1)boolean waitForConfirms() throws InterruptedException;

  (2)boolean waitForConfirms(long timeout) throws InterruptedException;

  (3)void waitForConfirmsOrDie() throws IOException,InterruptedException;

  (4)void waitForConfirmsOrDie(long timeout) throws IOException,InterruptedException, TimeoutException;

若是信道沒有開啓publisher confirm模式,那麼調用任何的waitForConfirms方法都會報錯java.lang.IllegalStateException。對於沒有參數的waitForConfirms方法來講,其返回的條件是客戶端收到了相應的Basic.Ack/.Nack或者被中斷。參數timeout表示超時時間,一旦等待RabbitMQ迴應超時就會拋出java.util.concurrent.TimeoutException異常。兩個waitForConfirmsOrDie方法在接收到RabbitMQ返回的Basic.Nack以後拋出java.io.IOException。業務代碼能夠根據自身的特性靈活的運用這四種方法來保障消息的可靠發送。

注意:

  ❤ 事務機制和publisher confirm機制是二者互斥的,不能共存。若是企圖將已開啓事務模式的信道再設置爲publisher confirm模式,RabbitMQ會報錯。或者企圖將已開啓publisher confirm模式的信道再設置爲事務模式,RabbitMQ也會報錯;

  ❤ 事務機制和publisher confirm機制確保的是消息可以正確的發送至RabbitMQ,這裏的「發送至RabbitMQ」的含義是指消息被正確的發送至RabbitMQ的交換器,若是此交換器沒有匹配的隊列,那麼消息也會丟失。因此在使用這兩種機制的時候要確保所涉及的交換器可以有匹配的隊列。更進一步的講,發送方要配合mandatory參數或者備份交換器一塊兒使用來提升消息傳輸的可靠性。

publisher confirm的優點在於並不必定須要同步確認。能夠改進一下使用方式:

  (1)批量confirm方法:每發送一批消息後,調用chann.waitForConfirms方法,等待服務器的確認返回

  (2)異步confirm方法:提供一個回調方法,服務端確認了一條或者多條消息後客戶端會回調這個方法進行處理

批量confirm方法

在批量confirm方法中,客戶端程序須要按期或者定量(達到多少條),亦或者二者結合起來調用chann.waitForConfirms來等待RabbitMQ的確認返回。可是存在一個問題就是在返回Basic.Nack或者超時的狀況下,客戶端須要將這一批的消息所有重發,這會帶來明顯的重複消息數量,而且當消息常常丟失時,批量confirm的性能是不升反降的。

批量confirm代碼以下:

  try {
            channel.confirmSelect();
            int nsgCount = 0;
            while (true){
                channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
                //將發送出去的消息存入緩存中,緩存能夠是一個ArrayList或者BlockQueue之類的
                if (++nsgCount >= 1000){
                    nsgCount = 0;
                    try {
                        if (channel.waitForConfirms()){
                            //將緩存中消息清空
                            continue;
                        }
                        //將緩存中消息重發
                    }catch (InterruptedException e){
                        e.printStackTrace();
                        //將緩存中的消息重發
                    }
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }

異步confirm方法

異步confirm方法的編程實現是最複雜的,在客戶端的channel接口中提供的addConfirmListener方法能夠添加ConfirmListener這個回調接口,這個ConfirmListener接口包含兩個方法:handleAck和handleNack,分別用來處理RabbitMQ回傳的Basic.Ack和Basic.Nack。在這兩個方法中都包含有一個參數deliveryTag(在publisher confirm模式下用來標記消息的惟一有序序號)。咱們須要爲每個信道維護一個「unconfirm」的消息序號集合,每發送一條消息,集合中的元素就加1.每當調用ConfirmListener中的handleAck方法時,「unconfirm」集合中就刪除相應的一條(multiple設置爲false)或者多條(multiple設置爲true)記錄。從程序的運行效率來看,這個「unconfirm」集合最好採用有序集合SortedSet的存儲結構。事實上,Java端SDK中的waitForConfirms方法也是經過SortedSet維護消息序號的。

下面的代碼示例:

     SortedSet confirmSet = new TreeSet();
        channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack,SeqNo: " + deliveryTag + ", multiple: " + multiple);
                if (multiple){
                    confirmSet.headSet(deliveryTag + 1);
                }else {
                    confirmSet.remove(deliveryTag);
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    confirmSet.headSet(deliveryTag + 1).clear();
                }else {
                    confirmSet.remove(deliveryTag);
                }
                //這裏添加處理消息重發的場景
            }
        });

        //這裏模仿一直髮送消息的場景
        while (true){
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("exchange","routingkey",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello".getBytes());
            confirmSet.add(nextSeqNo);
        }

 將事務、普通confirm、批量confirm、和異步confirm一塊兒來比較它們的QPS,以下圖所示:

能夠看出批量和異步這兩種方式所呈現的性能要比其他兩種好的多。不過異步和批量的編程比較複雜,普通和事務編程較簡單。

不過仍是推薦使用批量和異步來實現。

參考:《RabbitMQ實戰指南》 朱忠華 編著; 

相關文章
相關標籤/搜索