假設一個場景,因爲咱們的消費端忽然所有不可用了,致使 rabbitMQ 服務器上有上萬條未處理的消息,這時候若是沒作任何如今,隨便開啓一個消費端客戶端,就會致使巨量的消息瞬間所有推送過來,可是咱們單個客戶端沒法同時處理這麼多的數據,就會致使消費端變得巨卡,有可能直接崩潰不可用了。因此在實際生產中,限流保護是很重要的。git
rabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息(經過基於 consume 或者 channel 設置 QOS 的值)未被確認前,不進行消費新的消息。關鍵代碼就是在聲明消費者代碼裏面的github
void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
複製代碼
prefetchSize:0api
prefetchCount:會告訴 RabbitMQ 不要同時給一個消費者推送多於 N 個消息,即一旦有 N 個消息尚未 ack,則該 consumer 將 block 掉,直到有消息 ackbash
global:true、false 是否將上面設置應用於 channel,簡單點說,就是上面限制是 channel 級別的仍是 consumer 級別服務器
備註:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的狀況下才生效,即在自動應答的狀況下這兩個值是不生效的。ide
代碼演示:fetch
代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下
複製代碼
生產端代碼基本沒變化,改了 exchange 和 routingKey 而已ui
public class Procuder {
private static final Logger log = LoggerFactory.getLogger(Procuder.class);
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ limit Message";
for(int i = 0; i < 5; i ++){
log.info("生產端發送:{}", msg + i);
channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
}
}
}
複製代碼
消費端代碼須要修改一下 autoAck 設置爲 false **
增長 ** channel.basicQos(0, 1, false);this
完整的消費端代碼以下spa
/**
* 使用自定義消費者
*/
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static final String EXCHANGE_NAME = "test_qos_exchange";
public static final String EXCHANGE_TYPE = "topic";
public static final String ROUTING_KEY_TYPE = "qos.#";
public static final String ROUTING_KEY = "qos.save";
public static final String QUEUE_NAME = "test_qos_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 獲取C onnection
Connection connection = connectionFactory.newConnection();
//3 經過Connection建立一個新的Channel
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
/**
* prefetchSize:0
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,限速N個
即一旦有 N 個消息尚未 ack,則該 consumer 將 block 掉,直到有消息 ack 回來,你再發送 N 個過來
global:true\false 是否將上面設置應用於channel級別,false是consumer級別
prefetchSize 和global這兩項,rabbitmq沒有實現,暫且不研究
*/
channel.basicQos(0, 1, false);
//使用自定義消費者
//1 限流方式 第一件事就是 autoAck設置爲 false
//使用自定義消費者
channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
log.info("消費端啓動成功");
}
}
複製代碼
自定義消費者
public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消費者標籤
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
//必定要手動ACK回去
//channel.basicAck(envelope.getDeliveryTag(), false);
}
}
複製代碼
而後啓動消費端,上管控臺查看 test_qos_exchange 和 test_qos_queue 是否生成了
確認 test_qos_exchange 上綁定了 test_qos_queue 啓動生產端發送 5 條消息 發現消費端只打印了一條消息 從管控臺上也看到總共 5 條消息,有 4 條等待着,一條消費了可是沒有 ack 回去 修改自定義消費者裏面的代碼,以下所示public class MyConsumer extends DefaultConsumer {
private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消費者標籤
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------limit-----consume message----------");
log.info("consumerTag: " + consumerTag);
log.info("envelope: " + envelope);
log.info("properties: " + properties);
log.info("body: " + new String(body));
//必定要手動ACK回去
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
複製代碼
重啓消費端,看到消費端就按照一條一條消費,而且 ACK 回去了
如上所示就是簡單的RabbitMQ消費端的限流策略