RabbitMQ - 隊列中提到,接收消息的時候,有兩個方式,一個是consume,一個是get,這兩個方法都有一個autoAck的參數。當咱們設置爲true的時候,說明消費者會經過AMQP顯示的向rabbitmq發送一個確認,rabbitmq自動視其確認了消息,而後把消息從隊列中刪除。下面用consume的方式作些例子來理解autoAck的參數設置。web
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; GetResponse basicGet(String queue, boolean autoAck) throws IOException;
先往ack隊列發送5條數據,能夠看到ready是5,total是5。
運行如下代碼,autoAck設置爲false,且不對消息確認。segmentfault
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("ack", false, false, false, null); // 異步回調處理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 接收消息 channel.basicConsume("ack", false, deliverCallback, consumerTag -> { }); }
運行結果以下,打印了5條消息:
在web控制檯能夠看出,ready是0,unacked是5,即未確認的消息數是5條。
把應用中止掉,即關閉消費者和rabbitmq的鏈接,web控制檯以下,unacked的5條數據回到了ready。
綜上,當autoAck爲false時,消息分爲兩個部分,一個是未投放放消費者的(ready),一個是投放給消費者但未確認的。若是未確認信息的消費者斷開了鏈接,這部分消息會回到ready從新投遞給消費者,確保了消息的可靠性。須要注意的是,若是消費者一直沒有斷開鏈接,也沒有進行確認,那這個消息會一直等待確認中。安全
確認有兩種,一個是自動確認,一個是手動確認。自動確認的話,就是把autoAck設置爲true。
當rabbitmq向消費者傳遞消息的時候,會帶有一個deliveryTag的傳遞標識,標記唯一地標識信道上的傳遞,交付標記的做用域是每一個信道,因此必須在接收消息的信道上進行確認。交付標記是遞增的正整數,因此咱們看到是1,2,3這樣的遞增數字。
上面例子中,有註釋了一行代碼,就是用來手動確認的,第一個參數就是傳遞標記,第二個參數是是否批量確認。
上面的打印結果輸出了deliveryTag的值,從1到5。把註釋刪了再運行後,能夠看到rabbitmq把確認後的消息刪除。服務器
void basicAck(long deliveryTag, boolean multiple) throws IOException;
basicAck方法中,有個參數是multiple,是用來批量確認的。當設置爲true的時候,RabbitMQ將確認全部未完成的傳遞標記,包括確認中指定的標記。與其餘與確認相關的內容同樣,這是按每一個信道肯定範圍的。好比收到標記爲一、2的沒確認,標記爲3的確認,那前面兩個也會一塊兒確認。若是multiple設置爲false,則僅確認當前的消息。
咱們看看下面的例子,每處理三個消息確認一次。異步
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("multiple", false, false, false, null); // 異步回調處理 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("multiple Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true); } }; // 接收消息 channel.basicConsume("multiple", false, deliverCallback, consumerTag -> { }); }
啓動後,發送兩個消息,能夠看到打印了兩次
控制檯顯示未確認2個
再發送一條數據,打印了第三個
web控制檯看出,已經確認並刪除了隊列的消息
spa
自動確認,這種模式一般被稱爲「發了就忘」,當消費端處理異常時,則服務器發送的消息將得不到正確的處理。所以,自動消息確認應該被認爲是不安全的。
手動確認模式中,一般使用消息預取,它限制了通道上未完成(「進行中」)交付的數量。可是,對於自動確認,沒有這樣的限制,因此消費者有可能因爲處理消息太慢,致使內存積壓、堆耗盡,致使程序沒法運行。所以,自動確認模式僅適用於可以高效、穩定地處理配送的消費者。code