其實,還有1種場景須要考慮:當消費者接收到消息後,還沒處理完業務邏輯,消費者掛掉了,那消息也算丟失了?,好比用戶下單,訂單中心發送了1個消息到RabbitMQ裏的隊列,積分中心收到這個消息,準備給這個下單的用戶增長20積分,但積分還沒增長成功呢,積分中心本身掛掉了,致使數據出現問題。java
那麼如何解決這種問題呢?程序員
爲了保證消息被消費者成功的消費,RabbitMQ提供了消息確認機制(message acknowledgement),本文主要講解RabbitMQ中,如何使用消息確認機制來保證消息被消費者成功的消費,避免由於消費者忽然宕機而引發的消息丟失。spring
咱們開啓一個消費者的代碼是這樣的:springboot
// 建立隊列消費者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received Message '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer);
這裏的重點是channel.basicConsume(QUEUE_NAME, true, consumer);
方法的第2個參數,讓咱們先看下basicConsume()的源碼:ide
public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { return this.basicConsume(queue, autoAck, "", callback); }
這裏的autoAck參數指的是是否自動確認,若是設置爲ture,RabbitMQ會自動把發送出去的消息置爲確認,而後從內存(或者磁盤)中刪除,而無論消費者接收到消息是否處理成功;若是設置爲false,RabbitMQ會等待消費者顯式的回覆確認信號後纔會從內存(或者磁盤)中刪除。ui
建議將autoAck設置爲false,這樣消費者就有足夠的時間處理消息,不用擔憂處理消息過程當中消費者宕機形成消息丟失。this
此時,隊列裏的消息就分紅了2個部分:設計
若是RabbitMQ一直沒有收到消費者的確認信號,而且消費此消息的消費者已經斷開鏈接,則RabbitMQ會安排該消息從新進入隊列,等待投遞給下一個消費者,固然也有可能仍是原來的那個消費者。3d
RabbitMQ不會爲未確認的消息設置過時時間,它判斷此消息是否須要從新投遞給消費者的惟一依據是消費該消息的消費者鏈接是否已經斷開,這麼設計的緣由是RabbitMQ容許消費者消費一條消息的時間能夠好久好久。code
爲了便於理解,咱們舉個具體的例子,生產者的話的咱們延用上文中的DurableProducer:
package com.zwwhnly.springbootaction.rabbitmq.durable; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DurableProducer { private final static String EXCHANGE_NAME = "durable-exchange"; private final static String QUEUE_NAME = "durable-queue"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 建立一個鏈接 Connection connection = factory.newConnection(); // 建立一個通道 Channel channel = connection.createChannel(); // 建立一個Exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 發送消息 String message = "durable exchange test"; AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build(); channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes()); // 關閉頻道和鏈接 channel.close(); connection.close(); } }
而後新建一個消費者AckConsumer類:
package com.zwwhnly.springbootaction.rabbitmq.ack; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class AckConsumer { private final static String QUEUE_NAME = "durable-queue"; public static void main(String[] args) throws IOException, TimeoutException { // 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); // 設置 RabbitMQ 的主機名 factory.setHost("localhost"); // 建立一個鏈接 Connection connection = factory.newConnection(); // 建立一個通道 Channel channel = connection.createChannel(); // 建立隊列消費者 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); int result = 1 / 0; System.out.println("Received Message '" + message + "'"); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
咱們先將autoAck參數設置爲ture,即自動確認,並在消費消息時故意寫個異常,而後先運行生產者客戶端將消息寫入隊列中,而後運行消費者客戶端,發現消息未消費成功可是卻消失了:
而後咱們將autoAck設置爲false:
channel.basicConsume(QUEUE_NAME, false, consumer);
再次運行生產者客戶端將消息寫入隊列中,而後運行消費者客戶端,此時雖然消費者客戶端仍然代碼異常,可是消息仍然在隊列中:
而後咱們刪除掉消費者客戶端中的異常代碼,從新啓動消費者客戶端,發現消息消費成功了,可是消息一直未Ack:
手動停掉消費者客戶端,發現消息又到了Ready狀態,準備從新投遞:
之因此消費掉消息,卻一直仍是Unacked狀態,是由於咱們沒在代碼中添加顯式的Ack代碼:
String message = new String(body, "UTF-8"); //int result = 1 / 0; System.out.println("Received Message '" + message + "'"); long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false);
deliveryTag能夠看作消息的編號,它是一個64位的長整形值。
此時運行消費者客戶端,發現消息消費成功,而且在隊列中被移除: