RabbitMQ 消費端的限流策略

假設一個場景,因爲咱們的消費端忽然所有不可用了,致使 rabbitMQ 服務器上有上萬條未處理的消息,這時候若是沒作任何如今,隨便開啓一個消費端客戶端,就會致使巨量的消息瞬間所有推送過來,可是咱們單個客戶端沒法同時處理這麼多的數據,就會致使消費端變得巨卡,有可能直接崩潰不可用了。因此在實際生產中,限流保護是很重要的。git

rabbitMQ 提供了一種 qos (服務質量保證)功能,即在非自動確認消息的前提下,若是必定數目的消息(經過基於 consume 或者 channel 設置 QOS 的值)未被確認前,不進行消費新的消息。關鍵代碼就是在聲明消費者代碼裏面的github

void basicQos(unit prefetchSize , ushort prefetchCount, bool global )
複製代碼
  1. prefetchSize:0api

  2. prefetchCount:會告訴 RabbitMQ 不要同時給一個消費者推送多於 N 個消息,即一旦有 N 個消息尚未 ack,則該 consumer 將 block 掉,直到有消息 ackbash

  3. global:true、false 是否將上面設置應用於 channel,簡單點說,就是上面限制是 channel 級別的仍是 consumer 級別服務器

備註:prefetchSize 和 global 這兩項,rabbitmq 沒有實現,暫且不研究。特別注意一點,prefetchCount 在 no_ask=false 的狀況下才生效,即在自動應答的狀況下這兩個值是不生效的。ide

代碼演示:fetch

代碼地址:    https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項目下
複製代碼

生產端代碼基本沒變化,改了 exchange 和 routingKey 而已ui

public class Procuder {

	private static final Logger log = LoggerFactory.getLogger(Procuder.class);

	public static void main(String[] args) throws IOException, TimeoutException {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
		connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
		connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);

		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();

		String msg = "Hello RabbitMQ limit Message";
        for(int i = 0; i < 5; i ++){
            log.info("生產端發送:{}", msg + i);
            channel.basicPublish(Consumer.EXCHANGE_NAME, Consumer.ROUTING_KEY, true, null, (msg + i).getBytes());
        }
	}
}
複製代碼

消費端代碼須要修改一下 autoAck 設置爲 false **
增長 ** channel.basicQos(0, 1, false);
this

完整的消費端代碼以下spa

/**
 * 使用自定義消費者
 */
public class Consumer {

	private static final Logger log = LoggerFactory.getLogger(Consumer.class);
	
	public static final String EXCHANGE_NAME = "test_qos_exchange";
	public static final String EXCHANGE_TYPE = "topic";
	public static final String ROUTING_KEY_TYPE = "qos.#";
	public static final String ROUTING_KEY = "qos.save";
	public static final String QUEUE_NAME = "test_qos_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		//1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMQCommon.RABBITMQ_HOST);
        connectionFactory.setPort(RabbitMQCommon.RABBITMQ_PORT);
        connectionFactory.setVirtualHost(RabbitMQCommon.RABBITMQ_DEFAULT_VIRTUAL_HOST);
        //2 獲取C	onnection
        Connection connection = connectionFactory.newConnection();
        //3 經過Connection建立一個新的Channel
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true, false, null);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_TYPE);
        
        /**
         * prefetchSize:0
         prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,限速N個
            即一旦有 N 個消息尚未 ack,則該 consumer 將 block 掉,直到有消息 ack 回來,你再發送 N 個過來
         global:true\false 是否將上面設置應用於channel級別,false是consumer級別
         prefetchSize 和global這兩項,rabbitmq沒有實現,暫且不研究
         */
        channel.basicQos(0, 1, false);

        //使用自定義消費者
        //1 限流方式  第一件事就是 autoAck設置爲 false
      //使用自定義消費者
        channel.basicConsume(QUEUE_NAME, false, new MyConsumer(channel));
        log.info("消費端啓動成功");
	}
}

複製代碼

自定義消費者

public class MyConsumer extends DefaultConsumer {

	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
	
	 private Channel channel;
	 
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
    public void handleDelivery(String consumerTag,  //消費者標籤
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //必定要手動ACK回去
       //channel.basicAck(envelope.getDeliveryTag(), false);
    }
}

複製代碼

而後啓動消費端,上管控臺查看 test_qos_exchange 和 test_qos_queue 是否生成了

確認 test_qos_exchange 上綁定了 test_qos_queue

啓動生產端發送 5 條消息

發現消費端只打印了一條消息

從管控臺上也看到總共 5 條消息,有 4 條等待着,一條消費了可是沒有 ack 回去
修改自定義消費者裏面的代碼,以下所示

public class MyConsumer extends DefaultConsumer {

	private static final Logger log = LoggerFactory.getLogger(MyConsumer.class);
	
	 private Channel channel;
	 
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
    public void handleDelivery(String consumerTag,  //消費者標籤
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        
        log.info("------limit-----consume message----------");
        log.info("consumerTag: " + consumerTag);
        log.info("envelope: " + envelope);
        log.info("properties: " + properties);
        log.info("body: " + new String(body));
        //必定要手動ACK回去
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
複製代碼

重啓消費端,看到消費端就按照一條一條消費,而且 ACK 回去了

如上所示就是簡單的RabbitMQ消費端的限流策略

相關文章
相關標籤/搜索