RabbitMQ 從入門到精通(二)

1. 消息如何保障百分之百的投遞成功?

什麼是生產端的可靠性投遞?redis

  • 保障消息的成功發出
  • 保障MQ節點的成功接收
  • 發送端收到MQ節點(Broker)確認應答
  • 完善的進行消息補償機制

若是想保障消息百分百投遞成功,只作到前三步不必定可以保障。有些時候或者說有些極端狀況,好比生產端在投遞消息時可能就失敗了,或者說生產端投遞了消息,MQ也收到了,MQ在返回確認應答時,因爲網絡閃斷致使生產端沒有收到應答,此時這條消息就不知道投遞成功了仍是失敗了,因此針對這些狀況咱們須要作一些補償機制。算法

 

1.1 方案一:消息落庫,對消息狀態進行打標

 

  1. 進行數據的入庫,好比咱們要發送一條訂單消息,首先得把業務數據也就是訂單信息存庫,而後生成一條消息,把消息也進行入庫,這條消息應該包含消息狀態屬性 Create_Date(建立時間),並設置初始標誌 好比0,表示消息建立成功,正在發送中
  2. 首先要保證第一步消息都存儲成功了,沒有出現任何異常狀況,而後生產端再進行消息發送。若是失敗了就進行快速失敗機制
  3. MQ把消息收到的結果應答(confirm)給生產端
  4. 生產端有一個Confirm Listener,去異步的監聽Broker回送的響應,從而判斷消息是否投遞成功,若是成功,去數據庫查詢該消息,並將消息狀態更新爲1,表示消息投遞成功
     
    假設第二步OK了,在第三步回送響應時,網絡忽然出現了閃斷,致使生產端的Listener就永遠收不到這條消息的confirm應答了,也就是說這條消息的狀態就一直爲0了數據庫

  5. 此時咱們須要設置一個規則,好比說消息在入庫時候設置一個臨界值timeout,5分鐘以後若是仍是0的狀態那就須要把消息抽取出來。這裏咱們使用的是分佈式定時任務,去定時抓取DB中距離消息建立時間超過5分鐘的且狀態爲0的消息。
  6. 把抓取出來的消息進行從新投遞(Retry Send),也就是從第二步開始繼續往下走
  7. 固然有些消息可能就是因爲一些實際的問題沒法路由到Broker,好比routingKey設置不對,對應的隊列被誤刪除了,那麼這種消息即便重試屢次也仍然沒法投遞成功,因此須要對重試次數作限制,好比限制3次,若是投遞次數大於三次,那麼就將消息狀態更新爲2,表示這個消息最終投遞失敗。網絡

 

針對這種狀況如何去作補償呢,能夠有一個補償系統去查詢這些最終失敗的消息,而後給出失敗的緣由,固然這些可能都須要人工去操做。併發

第一種可靠性投遞,在高併發的場景下是否適合?異步

對於第一種方案,咱們須要作兩次數據庫的持久化操做,在高併發場景下顯然數據庫存在着性能瓶頸。其實在咱們的核心鏈路中只須要對業務進行入庫就能夠了,消息就不必先入庫了,咱們能夠作消息的延遲投遞,作二次確認,回調檢查。分佈式

固然這種方案不必定能保障百分百投遞成功,可是基本上能夠保障大概99.9%的消息是OK的,有些特別極端的狀況只能是人工去作補償了,或者使用定時任務去作均可以。ide

 

1.2 方案二:消息的延遲投遞,作二次確認,回調檢查

 

Upstream Service上游服務也就是生產端,Downstream service下游服務也就是消費端,Callback service就是回調服務。高併發

 

  1. 先將業務消息進行入庫,而後生產端將消息發送出去
  2. 在發送消息以後,緊接着生產端再次發送一條消息(Second Send Delay Check),即延遲消息投遞檢查,這裏須要設置一個延遲時間,好比5分鐘以後進行投遞。
  3. 消費端去監聽指定隊列,將收到的消息進行處理。
  4. 處理完成以後,發送一個confirm消息,也就是回送響應,可是這裏響應不是正常的ACK,而是從新生成一條消息,投遞到MQ中。
  5. 上面的Callback service是一個單獨的服務,其實它扮演了第一種方案的存儲消息的DB角色,它經過MQ去監聽下游服務發送的confirm消息,若是Callback service收到confirm消息,那麼就對消息作持久化存儲,即將消息持久化到DB中。
  6. 5分鐘以後延遲消息發送到MQ了,而後Callback service仍是去監聽延遲消息所對應的隊列,收到Check消息後去檢查DB中是否存在消息,若是存在,則不須要作任何處理,若是不存在或者消費失敗了,那麼Callback service就須要主動發起RPC通訊給上游服務,告訴它延遲檢查的這條消息我沒有找到,你須要從新發送,生產端收到信息後就會從新查詢業務消息而後將消息發送出去。

這麼作的目的是少作了一次DB的存儲,在高併發場景下,最關心的不是消息100%投遞成功,而是必定要保證性能,保證能抗得住這麼大的併發量。因此能節省數據庫的操做就儘可能節省,能夠異步的進行補償。

 

其實在主流程裏面是沒有這個Callback service的,它屬於一個補償的服務,整個核心鏈路就是生產端入庫業務消息,發送消息到MQ,消費端監聽隊列,消費消息。其餘的步驟都是一個補償機制。

第二種方案也是互聯網大廠更爲經典和主流的解決方案。可是若對性能要求不是那麼高,第一種方案要更簡單

 

2. 冪等性

2.1 冪等性是什麼?

簡單來講就是用戶對於同一操做發起的一次請求或者屢次請求的結果是一致的。

咱們能夠借鑑數據庫的樂觀鎖機制來舉個例子:

  • 首先爲表添加一個版本字段version

  • 在執行更新操做前呢,會先去數據庫查詢這個version

  • 而後執行更新語句,以version做爲條件,例如:

    UPDATE T_REPS SET COUNT = COUNT -1,VERSION = VERSION + 1 WHERE VERSION = 1

  • 若是執行更新時有其餘人先更新了這張表的數據,那麼這個條件就不生效了,也就不會執行操做了,經過這種樂觀鎖的機制來保障冪等性。

 

2.2 消息端冪等性保障

重複消費問題:

當消費者消費完消息時,在給生產端返回ack時因爲網絡中斷,致使生產端未收到確認信息,該條消息會從新發送並被消費者消費,但實際上該消費者已成功消費了該條消息,這就是重複消費問題。

 

2.2.1 惟一ID+指紋碼機制

惟一ID:業務表惟一的主鍵,如商品ID

指紋碼:爲了區別每次正常操做的碼,每次操做時生成指紋碼;能夠用時間戳+業務編號或者標誌位(具體視業務場景而定)

 

  • 惟一ID+指紋碼機制,利用數據庫主鍵去重
  • SELECT COUNT(1) FROM T_ORDER WHERE ID = 惟一ID and IS_CONSUM= 指紋碼
  • 好處:實現簡單
  • 壞處:高併發下有數據庫寫入的性能瓶頸
  • 解決方案:根據ID進行分庫分表算法路由

整個思路就是首先咱們須要根據消息生成一個全局惟一的ID,而後還須要加上一個指紋碼。這個指紋碼它並不必定是系統去生成的,而是一些外部的規則或者內部的業務規則去拼接,它的目的就是爲了保障此次操做是絕對惟一的。

將ID + 指紋碼拼接好的值做爲數據庫主鍵,就能夠進行去重了。即在消費消息前呢,先去數據庫查詢這條消息的指紋碼標識是否存在,沒有就執行insert操做,若是有就表明已經被消費了,就不須要管了。

 

2.2.2 利用Redis的原子性去實現

這裏只提用Redis的原子性去解決MQ冪等性重複消費的問題

注意:MQ的冪等性問題 根本在於的是生產端未正常接收ACK,多是網絡抖動、網絡中斷致使

 

個人方案:

MQ消費端在消費開始時 將 ID放入到Redis的BitMap中,MQ生產端每次生產數據時,從Redis的BitMap對應位置若不能取出ID,則生產消息發送,不然不進行消息發送。

可是有人可能會說,萬一消費端,生產端Redis命令執行失敗了怎麼辦,雖然又出現重複消費又出現Redis非正常執行命令的可能性極低,可是萬一呢?

OK,咱們能夠在Redis命令執行失敗時,將消息落庫,每日用定時器,對這種極特殊的消息進行處理。

 

3. Confirm機制

3.1 如何理解?

  • 消息的確認,是指生產者投遞消息後,若是Broker收到消息,則會給咱們生產者一個應答
  • 生產者進行接收應答,用來肯定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞

的核心保障

 

確認機制流程圖

生產端發送消息到Broker,而後Broker接收到了消息後,進行回送響應,生產端有一個Confirm Listener,去監聽應答,固然這個操做是異步進行的,生產端將消息發送出去就能夠不用管了,讓內部監聽器去監聽Broker給咱們的響應。

 

3.2 怎麼實現?

  • 第一步,在channel上開啓確認模式:channel.confirmSelect()
  • 第二步,在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,根據具體的結果對消息進行從新發送、或記錄日誌等後續處理!
public class Producer {
    public static void main(String[] args) throws Exception {
        
        //建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        
        //獲取Connection
        Connection connection = connectionFactory.newConnection();
        
        //經過connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        //指定咱們的消息投遞模式
        channel.confirmSelect();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.save";
        
        //發送一條信息
        String msg = "Hello RabbitMQ Send confirm message!";
        channel.basicPublish(exchangeName, routingkey, null, msg.getBytes());
        
        //添加一個確認監聽
        channel.addConfirmListener(new ConfirmListener() {
            
            @Override
            public void handleNack(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("-------no ack!---------");
            }
            
            @Override
            public void handleAck(long deliveryTag, boolean multiple)
                    throws IOException {
                System.out.println("--------ack!----------");
            }
        });
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception{
        //建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        
        //獲取Connection
        Connection connection = connectionFactory.newConnection();
        
        //經過connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_confirm_exchange";
        String routingkey = "confirm.#";
        String queueName = "test_confirm_queue"; 
        
        //聲明交換機和隊列 而後進行綁定和 設置 最後制定路由key
        channel.exchangeDeclare(exchangeName, "topic",true);
        channel.queueDeclare(queueName, true, false, false, null);
        
        channel.queueBind(queueName, exchangeName, routingkey);
        
        //建立消費者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true,queueingConsumer); 
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費端:" + msg);
        }
    }
}

 

運行說明

先啓動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設置OK,而後啓動生產端,消息被消費端消費,生產端也成功監聽到了ACK響應。

 

4. Return機制

4.1 如何理解?

  • Return Listener 用於處理一些不可路由的消息!
  • 咱們的消息生產者,經過指定一個Exchange 和Routingkey,把消息送達到某一個隊列中去, 而後咱們的消費者監聽隊列,進行消費處理操做!
  • 可是在某些狀況下,若是咱們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這個時候若是咱們須要監聽這種不可達的消息,就要使用Return Listener!

 

4.2 如何實現?

  1. 添加return監聽:addReturnListener,生產端去監聽這些不可達的消息,作一些後續處理,好比說,記錄下消息日誌,或者及時去跟蹤記錄,有可能從新設置一下就行了
  2. 發送消息時,設置Mandatory:若是爲true,則監聽器會接收到路由不可達的消息,而後進行後續處理,若是爲false,那麼broker端自動刪除該消息!

 

public class ReturnProducer {
     public static void main(String[] args) throws Exception {
            //1 建立ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.11");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHandshakeTimeout(20000);
            //2 獲取Connection
            Connection connection = connectionFactory.newConnection();
            //3 經過Connection建立一個新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_return_exchange";
            //String routingKey = "return.save";
            String routingKeyError = "abc.save";
            
            String msg = "Hello RabbitMQ Return Message";
            //添加return監聽
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange,
                        String routingKey, BasicProperties properties, byte[] body) throws IOException {
                    //replyCode:響應碼    replyText:響應信息
                    System.err.println("---------handle  return----------");
                    System.err.println("replyCode: " + replyCode);
                    System.err.println("replyText: " + replyText);
                    System.err.println("exchange: " + exchange);
                    System.err.println("routingKey: " + routingKey);
                    //System.err.println("properties: " + properties);
                    System.err.println("body: " + new String(body));
                }

                
            });
            //5 發送一條消息,第三個參數mandatory:必須設置爲true
            channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
        }
}

 

public class ReturnConsumer {
    
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 獲取Connection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        //4 聲明交換機和隊列,而後進行綁定設置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 建立消費者 
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消費者: " + msg);
        }
    }
}

 

運行說明

先啓動消費端,訪問管控臺:http://ip:15672,檢查Exchange和Queue是否設置OK,而後啓動生產端。
因爲生產端設置的是一個錯誤的路由key,因此消費端沒有任何打印,而生產端打印了以下內容

若是咱們將 Mandatory 屬性設置爲false,對於不可達的消息會被Broker直接刪除,那麼生產端就不會進行任何打印了。若是咱們的路由key設置爲正確的,那麼消費端可以正確消費,生產端也不會進行任何打印。

相關文章
相關標籤/搜索