RabbitMQ - 消息確認這篇文章中,提到了消息預取,避免了rabbitmq一直往消費端發送數據,致使消費端出現無限制的緩衝區問題。消息預取定義了信道上或者消費者容許的最大未確認的消息數量。一旦未確認數達到了設置的值,RabbitMQ將中止傳遞更多消息,除非至少有一條未完成的消息獲得確認。
使用消息預取的時候,會調用chanel的basicQos方法,prefetchCount是未確認的消息數,global默認值爲false,是限制消費者未確認的消息數,設置爲true的時候,是限制信道上的未確認消息數。web
void basicQos(int prefetchCount, boolean global) throws IOException;
global設置爲false,當每一個消費者有2個未確認的消息時,不能再發消息給消費者。segmentfault
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("qos", false, false, false, null); // 異步回調處理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; /* DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); };*/ channel.basicQos(2, false); // 接收消息 channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); /*channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { });*/ }
運行後,往隊列發送了4條消息,能夠看到,未發送(ready)有2個,未確認2個。
控制檯確實只收到了兩個消息。
若是把註釋放開,同時有兩個消費者,能夠看到,未發送(ready)有0個,未確認4個。
控制檯結果以下,兩個都消費了兩個。
服務器
把上面兩個消費者的global改成true,改成信道限制的方式。異步
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("qos", false, false, false, null); // 異步回調處理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; channel.basicQos(2, true); // 接收消息 channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { }); }
能夠看到,未發送(ready)有2個,未確認2個。
每一個消費者都只消費了一個。由於此時,信道上未確認的消息數是2。
fetch
即設置了信道限制又設置了消費者限制,那結果是怎麼樣的呢?
先設置消費端只能有2個未確認的消息,通道只能有3個未確認的消息。spa
public static void main(String[] args) throws IOException, TimeoutException { // 聲明一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 建立一個與rabbitmq服務器的鏈接 Connection connection = factory.newConnection(); // 建立一個Channel Channel channel = connection.createChannel(); // 經過Channel定義隊列 channel.queueDeclare("qos", false, false, false, null); // 異步回調處理 DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; channel.basicQos(2, false); channel.basicQos(3, true); // 接收消息 channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { }); }
運行後控制檯以下,打印了三個消息,說明整個信道就只能有三個未確認的消息,第一個消費者有兩個未確認的消息後再也不接收,由第二個消費者接收。
web控制檯信息,確實只有3個未確認的消息,還有1個待發送。
注意,若是換了順序呢?
把下面的代碼code
channel.basicQos(2, false); channel.basicQos(3, true);
換成,先控制信道的未確認的消息是3個,再控制消費者未確認的消息是2個rabbitmq
channel.basicQos(3, true); channel.basicQos(2, false);
運行後,控制檯以下,每一個消費者都2個未確認的消息。此時信道的限制不生效了。
隊列