MQ發佈確認的三種策略| RabbitMQ系列(六)

這是我參與8月更文挑戰的第11天,活動詳情查看:8月更文挑戰java


相關文章

RabbitMQ系列彙總:RabbitMQ系列編程


前言

  • 生產者將信道設置成 confirm 模式,一旦信道進入 confirm 模式,全部在該信道上面發佈的消息都將會被指派一個惟一的 ID(從 1 開始),一旦消息被投遞到全部匹配的隊列以後。安全

  • broker 就會發送一個確認給生產者(包含消息的惟一 ID),這就使得生產者知道消息已經正確到達目的隊列了,若是消息和隊列是可持久化的,那麼確認消息會在將消息寫入磁盤以後發出。markdown

  • broker 回傳 給生產者的確認消息中 delivery-tag 域包含了確認消息的序列號,此外 broker 也能夠設置 basic.ack 的 multiple 域,表示到這個序列號以前的全部消息都已經獲得了處理。併發

  • confirm 模式最大的好處在於他是異步的,一旦發佈一條消息,生產者應用程序就能夠在等信 道返回確認的同時繼續發送下一條消息,當消息最終獲得確認以後。dom

  • 生產者應用即可以經過回調方法來處理該確認消息,若是 RabbitMQ 由於自身內部錯誤致使消息丟失,就會發送一條 nack 消息,生產者應用程序一樣能夠在回調方法中處理該 nack 消息。異步

  • 開啓發布確認方式函數

    • 發佈確認默認是沒有開啓的,若是要開啓須要調用方法 confirmSelect,每當你要想使用發佈 確認,都須要在 channel 上調用該方法高併發

    • //開啓發布確認
      channel.confirmSelect();
      複製代碼

1、單個確認發佈

  • 這是一種簡單的確認方式,它是一種同步確認發佈的方式,也就是發佈一個消息以後只有它 被確認發佈,後續的消息才能繼續發佈,waitForConfirmsOrDie(long)這個方法只有在消息被確認 的時候才返回,若是在指定時間範圍內這個消息沒有被確認那麼它將拋出異常。post

  • 生產者

    • /** * 這是一個測試的生產者 *@author DingYongJun *@date 2021/8/1 */
      public class DyProducerTest_dingyuefabu {
      
          //設置執行次數
          public static final int MESSAGE_COUNT = 888;
          /** * 這裏爲了方便,咱們使用main函數來測試 * 純屬看你我的選擇 * @param args */
          public static void main(String[] args) throws Exception {
              //單個發佈確認執行
              publishMessageIndividually();
          }
      
          /** * 單個發佈確認 */
          public static void publishMessageIndividually() throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false, false, false, null);
              //開啓發布確認
              channel.confirmSelect();
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  //服務端返回 false 或超時時間內未返回,生產者能夠消息重發
                  boolean flag = channel.waitForConfirms();
                  if(flag){
                      System.out.println("消息發送成功");
                  }
              }
              long end = System.currentTimeMillis();
              System.out.println("發佈" + MESSAGE_COUNT + "個單獨確認消息,耗時" + (end - begin) +
                      "ms");
          }
      }
      複製代碼
  • 執行結果

    • image-20210803162231439.png
    • image-20210803162251045.png
  • 這種確認方式有一個最大的缺點就是:發佈速度特別的慢,由於若是沒有確認發佈的消息就會 阻塞全部後續消息的發佈,這種方式最多提供每秒不超過數百條發佈消息的吞吐量。固然對於某 些應用程序來講這可能已經足夠了。

  • 固然,如今跟你說慢,你莫得感知,下面幾種綜合起來對比你就會發現他的效率有多低了!

2、批量確認發佈

  • 與單個等待確認消息相比,先發布一批消息而後一塊兒確承認以極大地 提升吞吐量。

  • 生產者

    • /** * 批量發佈確認 */
      public static void publishMessageBatch() throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              //隊列名使用uuid來獲取不重複的值,不須要本身再進行命名了。
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false, false, false, null);
              //開啓發布確認
              channel.confirmSelect();
              //批量確認消息大小
              int batchSize = 88;
              //未確認消息個數
              int outstandingMessageCount = 0;
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  outstandingMessageCount++;
                  if (outstandingMessageCount == batchSize) {
                      channel.waitForConfirms();//確認代碼
                      outstandingMessageCount = 0;
                  }
      
              }
              //爲了確保還有剩餘沒有確認消息 再次確認
              if (outstandingMessageCount > 0) {
                  channel.waitForConfirms();
              }
              long end = System.currentTimeMillis();
              System.out.println("發佈" + MESSAGE_COUNT + "個批量確認消息,耗時" + (end - begin) +
                      "ms");
          }
      複製代碼
  • 執行結果

    • image-20210803163106931.png
    • image-20210803163121282.png
  • 缺點:當發生故障致使發佈出現問題時,不知道是哪一個消息出現問題了,咱們必須將整個批處理保存在內存中,以記錄重要的信息然後從新發布消息。

  • 固然這種方案仍然是同步的,也同樣阻塞消息的發佈。

3、異步確認發佈

  • 異步確認雖然編程邏輯比上兩個要複雜,可是性價比最高,不管是可靠性仍是效率都沒得說, 他是利用回調函數來達到消息可靠性傳遞的,這個中間件也是經過函數回調來保證是否投遞成功, 下面就讓咱們來詳細講解異步確認是怎麼實現的。

  • 生產者

    • /** * 異步發佈確認 */
          public static void publishMessageAsync() throws Exception {
              try (Channel channel = RabbitMqUtils.getChannel()) {
                  String queueName = UUID.randomUUID().toString();
                  channel.queueDeclare(queueName, false, false, false, null);
                  //開啓發布確認
                  channel.confirmSelect();
                  /** * 線程安全有序的一個哈希表,適用於高併發的狀況 * 1.輕鬆的將序號與消息進行關聯 * 2.輕鬆批量刪除條目 只要給到序列號 * 3.支持併發訪問 */
                  ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                          ConcurrentSkipListMap<>();
                  /** * 確認收到消息的一個回調 * 1.消息序列號 * 2.true 能夠確認小於等於當前序列號的消息 * false 確認當前序列號消息 */
                  ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                      if (multiple) {
                          //返回的是小於等於當前序列號的未確認消息 是一個 map
                          ConcurrentNavigableMap<Long, String> confirmed =
                                  outstandingConfirms.headMap(sequenceNumber, true);
                          //清除該部分未確認消息
                          confirmed.clear();
                      }else{
                          //只清除當前序列號的消息
                          outstandingConfirms.remove(sequenceNumber);
                      }
                  };
                  ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                      String message = outstandingConfirms.get(sequenceNumber);
                      System.out.println("發佈的消息"+message+"未被確認,序列號"+sequenceNumber);
                  };
                  /** * 添加一個異步確認的監聽器 * 1.確認收到消息的回調 * 2.未收到消息的回調 */
                  channel.addConfirmListener(ackCallback, null);
                  long begin = System.currentTimeMillis();
                  for (int i = 0; i < MESSAGE_COUNT; i++) {
                      String message = "消息" + i;
                      /** * channel.getNextPublishSeqNo()獲取下一個消息的序列號 * 經過序列號與消息體進行一個關聯 * 所有都是未確認的消息體 */
                      outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                      channel.basicPublish("", queueName, null, message.getBytes());
                  }
                  long end = System.currentTimeMillis();
                  System.out.println("發佈" + MESSAGE_COUNT + "個異步確認消息,耗時" + (end - begin) +
                          "ms");
              }
          }
      複製代碼
  • 執行結果

    • image-20210803163759179.png
    • image-20210803163826281.png
  • 很容易看出,這種方式速度快的飛起呀!

  • 如何處理未確認的消息?

    • 最好的解決的解決方案就是把未確認的消息放到一個基於內存的能被髮佈線程訪問的隊列, 好比說用 ConcurrentLinkedQueue 這個隊列在 confirm callbacks 與發佈線程之間進行消息的傳遞。

4、總結

  • 單獨發佈消息

    • 耗時:21210ms
    • 同步等待確認,簡單,但吞吐量很是有限。
  • 批量發佈消息

    • 耗時:525ms
    • 批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是那條 消息出現了問題。
  • 異步處理

    • 耗時:45ms
    • 最佳性能和資源使用,在出現錯誤的狀況下能夠很好地控制,可是實現起來稍微難些

路漫漫其修遠兮,吾必將上下求索~

若是你認爲i博主寫的不錯!寫做不易,請點贊、關注、評論給博主一個鼓勵吧~hahah

相關文章
相關標籤/搜索