假設有個場景,RabbitMQ服務器上堆積上萬條未處理的消息,咱們隨便打開一個消費者客戶端會出現下面狀況:巨量的消息同時推送過來,可是咱們單個消費者客戶端沒法同時處理這麼多數據,服務器可能卡死java
RabbitMQ提供了一種qos(服務質量保證)功能,即在非自動確認消息的狀況下,若是必定數量的消息(經過基於consumer或者channel設置qos值)未被確認前,不消費新的消息服務器
在消費端:ide
// 單條消息的大小限制,通常設爲0或不設置,不限制大小 int prefecthSize = 0; // 告訴RabbitMQ不要同時給消費端推送n條消息,一旦有n個消息還沒ack,則該consumer將block掉,直到有ack;注意在自動應答下不生效 int prefecthCount = 1; // 表示是否應用於channel上,便是channel級別仍是consumer級別 boolean global = false; channel.basicQos(prefecthSize, prefecthCount, global);
producerspa
package com.wyg.rabbitmq.javaclient.consumer_limit; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消費端限流 * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String routingKey = "qos.abc"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 發送十條消息 for (int i = 0; i < 10; i++) { String msg = "這是一條 消費端限流消息," + i; channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8")); } channel.close(); connection.close(); } }
consumercode
package com.wyg.rabbitmq.javaclient.consumer_limit; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 消費端限流 * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_qos_queue"; String exchangeName = "test_qos_exchange"; String routingKey = "qos.#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明隊列 channel.queueDeclare(queueName, true, false, false, null); // 隊列綁定到exchange channel.queueBind(queueName, exchangeName, routingKey, null); // 單條消息的大小限制,通常設爲0或不設置,不限制大小 int prefecthSize = 0; // 告訴RabbitMQ不要同時給消費端推送n條消息,一旦有n個消息還沒ack,則該consumer將block掉,直到有ack;注意在自動應答下不生效 int prefecthCount = 1; // 表示是否應用於channel上,便是channel級別仍是consumer級別 boolean global = false; channel.basicQos(prefecthSize, prefecthCount, global); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("---消費者---"); System.out.println(new String(message.getBody(), "UTF-8")); } finally { // consumer手動 ack 給broker channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消費者:cancelCallback"); } }; // 消費消息,autoAck必定要設置爲false channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
注意
blog
第一次咱們註釋掉 手動 ack給RabbitMQ應答rabbitmq
運行結果:隊列
發現一直卡在第一條消息,由於未給RabbitMQ手動應答,因此RabbitMQ認爲消費端還未消費完,不推送新的消息
第二次開啓手動應答rem
運行結果:get
全部消息依次消費