RabbitMQ高級特性-消費端限流

爲何要進行消費者端限流

假設有個場景,RabbitMQ服務器上堆積上萬條未處理的消息,咱們隨便打開一個消費者客戶端會出現下面狀況:巨量的消息同時推送過來,可是咱們單個消費者客戶端沒法同時處理這麼多數據,服務器可能卡死java

什麼是消費端限流

RabbitMQ提供了一種qos(服務質量保證)功能,即在非自動確認消息的狀況下,若是必定數量的消息(經過基於consumer或者channel設置qos值)未被確認前,不消費新的消息服務器

消費端限流的實現

在消費端:ide

// 單條消息的大小限制,通常設爲0或不設置,不限制大小
int prefecthSize = 0;
// 告訴RabbitMQ不要同時給消費端推送n條消息,一旦有n個消息還沒ack,則該consumer將block掉,直到有ack;注意在自動應答下不生效
int prefecthCount = 1;
// 表示是否應用於channel上,便是channel級別仍是consumer級別
boolean global = false;
channel.basicQos(prefecthSize, prefecthCount, global);

producerspa

package com.wyg.rabbitmq.javaclient.consumer_limit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消費端限流
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 13:25
 * @since JDK1.8
 * @version V1.0
 */

public class Producer {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.abc";
        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");

        // 發送十條消息
        for (int i = 0; i < 10; i++) {
            String msg = "這是一條 消費端限流消息," + i;
            channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8"));
        }
        channel.close();
        connection.close();
    }

}

consumercode

package com.wyg.rabbitmq.javaclient.consumer_limit;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**
 * 消費端限流
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-22 14:07
 * @since JDK1.8
 * @version V1.0
 */

public class Consumer {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guset";
    private static final String PASSWORD = "guset";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost("/");
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String queueName = "test_qos_queue";
        String exchangeName = "test_qos_exchange";
        String routingKey = "qos.#";

        // 申明exchange
        channel.exchangeDeclare(exchangeName, "topic");
        // 申明隊列
        channel.queueDeclare(queueName, true, false, false, null);
        // 隊列綁定到exchange
        channel.queueBind(queueName, exchangeName, routingKey, null);

        // 單條消息的大小限制,通常設爲0或不設置,不限制大小
        int prefecthSize = 0;
        // 告訴RabbitMQ不要同時給消費端推送n條消息,一旦有n個消息還沒ack,則該consumer將block掉,直到有ack;注意在自動應答下不生效
        int prefecthCount = 1;
        // 表示是否應用於channel上,便是channel級別仍是consumer級別
        boolean global = false;

        channel.basicQos(prefecthSize, prefecthCount, global);
        DeliverCallback deliverCallback = new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                try {
                    System.out.println("---消費者---");
                    System.out.println(new String(message.getBody(), "UTF-8"));
                } finally {
                    // consumer手動 ack 給broker
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("---消費者:cancelCallback");
            }
        };

        // 消費消息,autoAck必定要設置爲false
        channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
    }
}

注意blog

第一次咱們註釋掉 手動 ack給RabbitMQ應答rabbitmq

運行結果:隊列

發現一直卡在第一條消息,由於未給RabbitMQ手動應答,因此RabbitMQ認爲消費端還未消費完,不推送新的消息

第二次開啓手動應答rem

運行結果:get

全部消息依次消費

相關文章
相關標籤/搜索