RabbitMQ - 消息確認

RabbitMQ - 隊列中提到,接收消息的時候,有兩個方式,一個是consume,一個是get,這兩個方法都有一個autoAck的參數。當咱們設置爲true的時候,說明消費者會經過AMQP顯示的向rabbitmq發送一個確認,rabbitmq自動視其確認了消息,而後把消息從隊列中刪除。下面用consume的方式作些例子來理解autoAck的參數設置。web

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

不確認

先往ack隊列發送5條數據,能夠看到ready是5,total是5。
image.png
運行如下代碼,autoAck設置爲false,且不對消息確認。segmentfault

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("ack", false, false, false, null);
    // 異步回調處理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("ack Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    };
    // 接收消息
    channel.basicConsume("ack", false, deliverCallback, consumerTag -> {
    });
}

運行結果以下,打印了5條消息:
image.png
在web控制檯能夠看出,ready是0,unacked是5,即未確認的消息數是5條。
image.png
把應用中止掉,即關閉消費者和rabbitmq的鏈接,web控制檯以下,unacked的5條數據回到了ready。
image.png
綜上,當autoAck爲false時,消息分爲兩個部分,一個是未投放放消費者的(ready),一個是投放給消費者但未確認的。若是未確認信息的消費者斷開了鏈接,這部分消息會回到ready從新投遞給消費者,確保了消息的可靠性。須要注意的是,若是消費者一直沒有斷開鏈接,也沒有進行確認,那這個消息會一直等待確認中。安全

確認

確認有兩種,一個是自動確認,一個是手動確認。自動確認的話,就是把autoAck設置爲true。
當rabbitmq向消費者傳遞消息的時候,會帶有一個deliveryTag的傳遞標識,標記唯一地標識信道上的傳遞,交付標記的做用域是每一個信道,因此必須在接收消息的信道上進行確認。交付標記是遞增的正整數,因此咱們看到是1,2,3這樣的遞增數字。
上面例子中,有註釋了一行代碼,就是用來手動確認的,第一個參數就是傳遞標記,第二個參數是是否批量確認。
上面的打印結果輸出了deliveryTag的值,從1到5。把註釋刪了再運行後,能夠看到rabbitmq把確認後的消息刪除。服務器

void basicAck(long deliveryTag, boolean multiple) throws IOException;

批量確認

basicAck方法中,有個參數是multiple,是用來批量確認的。當設置爲true的時候,RabbitMQ將確認全部未完成的傳遞標記,包括確認中指定的標記。與其餘與確認相關的內容同樣,這是按每一個信道肯定範圍的。好比收到標記爲一、2的沒確認,標記爲3的確認,那前面兩個也會一塊兒確認。若是multiple設置爲false,則僅確認當前的消息。
咱們看看下面的例子,每處理三個消息確認一次。異步

public static void main(String[] args) throws IOException, TimeoutException {
    // 聲明一個鏈接工廠
    ConnectionFactory factory = new ConnectionFactory();
    // 建立一個與rabbitmq服務器的鏈接
    Connection connection = factory.newConnection();
    // 建立一個Channel
    Channel channel = connection.createChannel();
    // 經過Channel定義隊列
    channel.queueDeclare("multiple", false, false, false, null);
    // 異步回調處理
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println("multiple Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
        if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
        }
    };
    // 接收消息
    channel.basicConsume("multiple", false, deliverCallback, consumerTag -> {
    });
}

啓動後,發送兩個消息,能夠看到打印了兩次
image.png
控制檯顯示未確認2個
image.png
再發送一條數據,打印了第三個
image.png
web控制檯看出,已經確認並刪除了隊列的消息
image.pngspa

使用哪一種確認方式

自動確認,這種模式一般被稱爲「發了就忘」,當消費端處理異常時,則服務器發送的消息將得不到正確的處理。所以,自動消息確認應該被認爲是不安全的。
手動確認模式中,一般使用消息預取,它限制了通道上未完成(「進行中」)交付的數量。可是,對於自動確認,沒有這樣的限制,因此消費者有可能因爲處理消息太慢,致使內存積壓、堆耗盡,致使程序沒法運行。所以,自動確認模式僅適用於可以高效、穩定地處理配送的消費者。code

相關文章
相關標籤/搜索