RabbitMQ使用教程(三)如何保證消息99.99%被髮送成功?

  在以上兩篇博客發佈後不久,有細心的網友就評論,建立的隊列和發送的消息,若是在沒有啓動消費者程序的時候,重啓了RabbitMQ服務,隊列和消息都丟失了。
  
  這就引出了一個很是重要的問題,也是面試中常常會問的:在使用RabbitMQ時,如何保證消息最大程度的不丟失而且被正確消費?
  
  2. 本篇概要
  
  RabbitMQ針對這個問題,提供瞭如下幾個機制來解決:
  
  生產者確認
  
  持久化
  
  手動Ack
  
  本篇博客咱們先講解下生產者確認機制,剩餘的機制後續單獨寫博客進行講解。
  
  3. 生產者確認
  
  要想保證消息不丟失,首先咱們得保證生產者能成功的將消息發送到RabbitMQ服務器。
  
  但在以前的示例中,當生產者將消息發送出去以後,消息到底有沒有正確地到達服務器呢?若是不進行特殊配置,默認狀況下發送消息的操做是不會返回任何消息給生產者的,也就是默認狀況下生產者是不知道消息有沒有正確的到達服務器。
  
  從basicPublish方法的返回類型咱們也能知曉:
  
  public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
  
  this.basicPublish(exchange, routingKey, false, props, body);
  
  }
  
  爲了更好理解,咱們將以前的生產者Producer類中的channel.queueDeclare(QUEUE_NAME, false, false, false, null);註釋:
  
  package com.zwwhnly.springbootaction.rabbitmq.helloworld;
  
  import com.rabbitmq.client.Channel;
  
  import com.rabbitmq.client.Connection;
  
  import com.rabbitmq.client.ConnectionFactory;
  
  import java.io.IOException;
  
  import java.util.concurrent.TimeoutException;
  
  public class Producer {
  
  private final static String QUEUE_NAME = "hello";
  
  public static void main(String[] args) throws IOException, TimeoutException {
  
  // 建立鏈接
  
  ConnectionFactory factory = new ConnectionFactory(www.meiwanyule.cn );
  
  // 設置 RabbitMQ 的主機名
  
  factory.setHost("localhost");
  
  // 建立一個鏈接
  
  Connection connection = factory.newConnection();
  
  // 建立一個通道
  
  Channel channel = connection.createChannel();
  
  // 指定一個隊列,不存在的話自動建立
  
  //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  
  // 發送消息
  
  String message = "Hello World!";
  
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  
  System.out.println(" [x] Sent '" + message + "'");
  
  // 關閉頻道和鏈接
  
  channel.close();
  
  connection.close();
  
  }
  
  }
  
  此時運行代碼,由於隊列不存在,消息確定沒地方存儲,可是程序卻並未出錯,也就是消息丟失了可是咱們卻並不知曉。
  
  RabblitMQ針對這個問題,提供了兩種解決方案:
  
  經過事務機制實現
  
  經過發送方確認(publisher confirm)機制實現
  
  4. 事務機制
  
  RabblitMQ客戶端中與事務機制相關的方法有如下3個:
  
  channel.txSelect:用於將當前的信道設置成事務模式
  
  channel.txCommit:用於提交事務
  
  channel.txRollback:用於回滾事務
  
  新建事務生產者類TransactionProducer,代碼以下:
  
  package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
  
  import com.rabbitmq.client.Channel;
  
  import com.rabbitmq.client.Connection;
  
  import com.rabbitmq.client.ConnectionFactory;
  
  import java.io.IOException;
  
  import java.util.concurrent.TimeoutException;
  
  public class TransactionProducer www.shengrenpt.com{
  
  private final static String QUEUE_NAME = "hello";
  
  public static void main(String[] args) throws IOException, TimeoutException {
  
  // 建立鏈接
  
  ConnectionFactory factory = new ConnectionFactory();
  
  // 設置 RabbitMQ 的主機名
  
  factory.setHost("localhost");
  
  // 建立一個鏈接
  
  Connection connection = factory.newConnection(www.huishenggw.cn);
  
  // 建立一個通道
  
  Channel channel = connection.createChannel();
  
  // 指定一個隊列,不存在的話自動建立
  
  channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  
  channel.txSelect();
  
  // 發送消息
  
  String message = "Hello World!";
  
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  
  channel.txCommit();
  
  System.out.println(" [x] Sent '" + message + "'");
  
  // 關閉頻道和鏈接
  
  channel.close();
  
  connection.close();
  
  }
  
  }
  
  運行代碼,發現隊列新增成功,消息發送成功:
  
  稍微修改下代碼,看下異常機制的事務回滾:
  
  try {
  
  channel.txSelect();
  
  // 發送消息
  
  String message = "Hello World!";
  
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  
  int result = 1 / 0;
  
  channel.txCommit();
  
  System.out.println(" [x] Sent '" + message + "'");
  
  } catch (IOException e) {
  
  e.printStackTrace(www.shengdaylgw.cn);
  
  channel.txRollback();
  
  }
  
  由於int result = 1 / 0;確定會觸發java.lang.ArithmeticException異常,因此事務會回滾,消息發送失敗:
  
  若是要發送多條消息,能夠將channel.basicPublish,channel.txCommit等方法放在循環體內,以下所示:
  
  channel.txSelect();
  
  int loopTimes = 10;
  
  for (int i = 0; i < loopTimes; i++) {
  
  try {
  
  // 發送消息
  
  String message = "Hello World!" + i;
  
  channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  
  channel.txCommit();
  
  System.out.println(" [www.yuntianyuL.cn] Sent '" + message + "'");
  
  } catch (IOException e) {
  
  e.printStackTrace();
  
  channel.txRollback();
  
  }
  
  }
  
  雖然事務可以解決消息發送方和RabbitMQ之間消息確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功,不然即可在捕獲異常以後進行事務回滾。可是使用事務機制會「吸乾」RabbitMQ的性能,所以建議使用下面講到的發送方確認機制。
  
  5. 發送方確認機制
  
  發送方確認機制是指生產者將信道設置成confirm(確認)模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到RabbitMQ服務器以後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的惟一ID),這就使得生產者知曉消息已經正確到達了目的地了。
  
  若是RabbitMQ由於自身內部錯誤致使消息丟失,就會發送一條nack(Basic.Nack)命令,生產者應用程序一樣能夠在回調方法中處理該nack指令。
  
  若是消息和隊列是可持久化的,那麼確認消息會在消息寫入磁盤以後發出。
  
  事務機制在一條消息發送以後會使發送端阻塞,以等待RabbitMQ的迴應,以後才能繼續發送下一條消息。
  
  相比之下,發送方確認機制最大的好處在於它是異步的,一旦發佈一條消息。生產者應用程序就能夠在等信道返回確認的同時繼續發送下一條消息,當消息最終獲得確認後,生產者應用程序即可以經過回調方法來處理該確認消息。
  
  5.1 普通confirm
  
  新建確認生產類NormalConfirmProducer,代碼以下:
  
  package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
  
  import com.rabbitmq.client.Channel;
  
  import com.rabbitmq.client.Connection;
  
  import com.rabbitmq.client.ConnectionFactory;
  
  import java.io.IOException;
  
  import java.util.concurrent.TimeoutException;
  
  public class NormalConfirmProducer {
  
  private final static String EXCHANGE_NAME = "normal-confirm-exchange";
  
  public static void main(String[] args) throws IOException, TimeoutException {
  
  // 建立鏈接
  
  ConnectionFactory factory =www.yongshi123.cn new ConnectionFactory();
  
  // 設置 RabbitMQ 的主機名
  
  factory.setHost("localhost");
  
  // 建立一個鏈接
  
  Connection connection = factory.newConnection();
  
  // 建立一個通道
  
  Channel channel = connection.createChannel();
  
  // 建立一個Exchange
  
  channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  
  try {
  
  channel.confirmSelect();
  
  // 發送消息
  
  String message = "normal confirm test";
  
  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  
  if (channel.waitForConfirms()) {
  
  System.out.println("send message success");
  
  } else {
  
  System.out.println("send message failed");
  
  // do something else...
  
  }
  
  } catch (InterruptedException e) {
  
  e.printStackTrace();
  
  }
  
  // 關閉頻道和鏈接
  
  channel.close();
  
  connection.close();
  
  }
  
  }
  
  channel.confirmSelect();將信道設置成confirm模式。
  
  channel.waitForConfirms();等待發送消息的確認消息,若是發送成功,則返回ture,若是發送失敗,則返回false。
  
  若是要發送多條消息,能夠將channel.basicPublish,channel.waitForConfirms等方法放在循環體內,以下所示:
  
  channel.confirmSelect();
  
  int loopTimes = 10;
  
  for (int i = 0; i < loopTimes; i++) {
  
  try {
  
  // 發送消息
  
  String message = "normal confirm test" + i;
  
  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  
  if (channel.waitForConfirms()) {
  
  System.out.println("send message success");
  
  } else {
  
  System.out.println("send message failed");
  
  // do something else...
  
  }
  
  } catch (InterruptedException e) {
  
  e.printStackTrace();
  
  }
  
  }
  
  運行結果:
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  send message success
  
  若是不開啓信道的confirm模式,調用channel.waitForConfirms()會報錯:
  
  注意事項:
  
  1)事務機制和publisher confirm機制是互斥的,不能共存。
  
  若是企圖將已開啓事務模式的信道再設置爲publisher confirm模式,RabbitMQ會報錯:
  
  channel.txSelect();
  
  channel.confirmSelect();
  
  若是企圖將已開啓publisher confirm模式的信道再設置爲事務模式,RabbitMQ也會報錯:
  
  channel.confirmSelect();
  
  channel.txSelect();
  
  2)事務機制和publisher confirm機制確保的是消息可以正確地發送至RabbitMQ,這裏的「發送至RabbitMQ」的含義是指消息被正確地發往至RabbitMQ的交換器,若是此交換器沒有匹配的隊列,那麼消息也會丟失。因此在使用這兩種機制的時候要確保所涉及的交換器可以有匹配的隊列。
  
  好比上面的NormalConfirmProducer類發送的消息,發送到了交換器normal-confirm-exchange,可是該交換器並無綁定任何隊列,從業務角度來說,消息仍然是丟失了。
  
  普通confirm模式是每發送一條消息後就調用channel.waitForConfirms()方法,以後等待服務端的確認,這其實是一種串行同步等待的方式。所以相比於事務機制,性能提高的並很少。
  
  5.2 批量confirm
  
  批量confirm模式是每發送一批消息後,調用channel.waitForConfirms()方法,等待服務器的確認返回,所以相比於5.1中的普通confirm模式,性能更好。
  
  可是很差的地方在於,若是出現返回Basic.Nack或者超時狀況,生產者客戶端須要將這一批次的消息所有重發,這樣會帶來明顯的重複消息數量,若是消息常常丟失,批量confirm模式的性能應該是不升反降的。
  
  代碼示例:
  
  package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
  
  import com.rabbitmq.client.Channel;
  
  import com.rabbitmq.client.Connection;
  
  import com.rabbitmq.client.ConnectionFactory;
  
  import java.io.IOException;
  
  import java.util.concurrent.ArrayBlockingQueue;
  
  import java.util.concurrent.BlockingQueue;
  
  import java.util.concurrent.TimeoutException;
  
  public class BatchConfirmProducer {
  
  private final static String EXCHANGE_NAME = "batch-confirm-exchange";
  
  public static void main(String[] args) throws IOException, TimeoutException {
  
  // 建立鏈接
  
  ConnectionFactory factory = new ConnectionFactory();
  
  // 設置 RabbitMQ 的主機名
  
  factory.setHost("localhost");
  
  // 建立一個鏈接
  
  Connection connection = factory.newConnection();
  
  // 建立一個通道
  
  Channel channel = connection.createChannel();
  
  // 建立一個Exchange
  
  channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  
  int batchCount = 100;
  
  int msgCount = 0;
  
  BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
  
  try {
  
  channel.confirmSelect();
  
  while (msgCount <= batchCount) {
  
  String message = "batch confirm test";
  
  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
  
  // 將發送出去的消息存入緩存中,緩存能夠是一個ArrayList或者BlockingQueue之類的
  
  blockingQueue.add(message);
  
  if (++msgCount >= batchCount) {
  
  try {
  
  if (channel.waitForConfirms()) {
  
  // 將緩存中的消息清空
  
  blockingQueue.clear();
  
  } else {
  
  // 將緩存中的消息從新發送
  
  }
  
  } catch (InterruptedException e) {
  
  e.printStackTrace();
  
  // 將緩存中的消息從新發送
  
  }
  
  }
  
  }
  
  } catch (IOException e) {
  
  e.printStackTrace();
  
  }
  
  // 關閉頻道和鏈接
  
  channel.close();
  
  connection.close();
  
  }
  
  }
  
  5.3 異步confirm
  
  異步confirm模式是在生產者客戶端添加ConfirmListener回調接口,重寫接口的handAck()和handNack()方法,分別用來處理RabblitMQ回傳的Basic.Ack和Basic.Nack。
  
  這兩個方法都有兩個參數,第1個參數deliveryTag用來標記消息的惟一序列號,第2個參數multiple表示的是是否爲多條確認,值爲true表明是多個確認,值爲false表明是單個確認。
  
  示例代碼:
  
  package com.zwwhnly.springbootaction.rabbitmq.producerconfirm;
  
  import com.rabbitmq.client.*;
  
  import java.io.IOException;
  
  import java.util.SortedSet;
  
  import java.util.TreeSet;
  
  import java.util.concurrent.TimeoutException;
  
  public class AsyncConfirmProducer {
  
  private final static String EXCHANGE_NAME = "async-confirm-exchange";
  
  public static void main(String[] args) throws IOException, TimeoutException {
  
  // 建立鏈接
  
  ConnectionFactory factory = new ConnectionFactory();
  
  // 設置 RabbitMQ 的主機名
  
  factory.setHost("localhost");
  
  // 建立一個鏈接
  
  Connection connection = factory.newConnection();
  
  // 建立一個通道
  
  Channel channel = connection.createChannel();
  
  // 建立一個Exchange
  
  channel.exchangeDeclare(EXCHANGE_NAME, "direct");
  
  int batchCount = 100;
  
  long msgCount = 1;
  
  SortedSet<Long> confirmSet = new TreeSet<Long>();
  
  channel.confirmSelect();
  
  channel.addConfirmListener(new ConfirmListener() {
  
  @Override
  
  public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  
  System.out.println("Ack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
  
  if (multiple) {
  
  confirmSet.headSet(deliveryTag - 1).clear();
  
  } else {
  
  confirmSet.remove(deliveryTag);
  
  }
  
  }
  
  @Override
  
  public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  
  System.out.println("Nack,SeqNo:" + deliveryTag + ",multiple:" + multiple);
  
  if (multiple) {
  
  confirmSet.headSet(deliveryTag - 1).clear();
  
  } else {
  
  confirmSet.remove(deliveryTag);
  
  }
  
  // 注意這裏須要添加處理消息重發的場景
  
  }
  
  });
  
  // 演示發送100個消息
  
  while (msgCount <= batchCount) {
  
  long nextSeqNo = channel.getNextPublishSeqNo();
  
  channel.basicPublish(EXCHANGE_NAME, "", null, "async confirm test".getBytes());
  
  confirmSet.add(nextSeqNo);
  
  msgCount = nextSeqNo;
  
  }
  
  // 關閉頻道和鏈接
  
  channel.close();
  
  connection.close();
  
  }
  
  }
  
  運行結果:
  
  Ack,SeqNo:1,multiple:false
  
  Ack,SeqNo:2,multiple:false
  
  Ack,SeqNo:3,multiple:false
  
  Ack,SeqNo:4,multiple:false
  
  Ack,SeqNo:5,multiple:false
  
  Ack,SeqNo:6,multiple:false
  
  Ack,SeqNo:7,multiple:false
  
  Ack,SeqNo:8,multiple:false
  
  Ack,SeqNo:9,multiple:false
  
  Ack,SeqNo:10,multiple:false
  
  注意:屢次運行,發現每次運行的輸出結果是不同的,說明RabbitMQ端回傳給生產者的ack消息並非以固定的批量大小回傳的。
  
  6. 性能比較
  
  到目前爲止,咱們瞭解到4種模式(事務機制,普通confirm,批量confirm,異步confirm)能夠實現生產者確認,讓咱們來對比下它們的性能,簡單修改下以上示例代碼中發送消息的數量,好比10000條,如下爲4種模式的耗時:
  
  發送10000條消息,事務機制耗時:2103
  
  發送10000條消息,普通confirm機制耗時:1483
  
  發送10000條消息,批量confirm機制耗時:281
  
  發送10000條消息,異步confirm機制耗時:214
  
  能夠看出,事務機制最慢,普通confirm機制雖有提高可是很少,批量confirm和異步confirm性能最好,你們能夠根據本身喜愛自行選擇使用哪一種機制,我的建議使用異步confirm機制。
  
  7. 源碼java

相關文章
相關標籤/搜索