RabbitMQ高級特性消費端限流策略實現

應用範圍爲服務訪問量忽然劇增,緣由可能有多種外部的調用或內部的一些問題致使消息積壓,對服務的訪問超過服務所能處理的最大峯值,致使系統超時負載從而崩潰。node

業務場景

舉一些咱們日常生活中的消費場景,例如:火車票、機票、門票等,一般來講這些服務在下單以後,後續的出票結果都是異步通知的,若是服務自己只支持每秒1000訪問量,因爲外部服務的緣由忽然訪問量增長到每秒2000併發,這個時候服務接收者由於流量的劇增,超過了本身系統自己所能處理的最大峯值,若是沒有對消息作限流措施,系統在這段時間內就會形成不可用,在生產環境這是一個很嚴重的問題,實際應用場景不止於這些,本文經過RabbitMQ來說解若是對消費端作限流措施。git

消費端限流機制

RabbitMQ提供了服務質量保證 (QOS) 功能,對channel(通道)預先設置必定的消息數目,每次發送的消息條數都是基於預先設置的數目,若是消費端一旦有未確認的消息,這時服務端將不會再發送新的消費消息,直到消費端將消息進行徹底確認,注意:此時消費端不能設置自動簽收,不然會無效。github

RabbitMQ v3.3.0 以後,放寬了限制,除了對channel設置以外,還能夠對每一個消費者進行設置。併發

如下爲 Node.js 開發語言 amqplib 庫對於限流實現提供的接口方法 prefetch異步

export interface Channel extends events.EventEmitter {
    prefetch(count: number, global?: boolean): Promise<Replies.Empty>;
    ...
}
複製代碼

prefetch 參數說明async

  • number:每次推送給消費端 N 條消息數目,若是這 N 條消息沒有被ack,生產端將不會再次推送直到這 N 條消息被消費。
  • global:在哪一個級別上作限制,ture 爲 channel 上作限制,false 爲消費端上作限制,默認爲 false。

創建生產端

生產端沒什麼變化,和正常聲明同樣,關於源碼參見rabbitmq-prefetch(Node.js客戶端版Demo)測試

const amqp = require('amqplib');

async function producer() {
    // 1. 建立連接對象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 獲取通道
    const channel = await connection.createChannel();

    // 3. 聲明參數
    const exchangeName = 'qosEx';
    const routingKey = 'qos.test001';
    const msg = 'Producer:';

    // 4. 聲明交換機
    await channel.assertExchange(exchangeName, 'topic', { durable: true });
    
    for (let i=0; i<5; i++) {
        // 5. 發送消息
        await channel.publish(exchangeName, routingKey, Buffer.from(`${msg}${i}條消息`));
    }

    await channel.close();
}

producer();
複製代碼

創建消費端

const amqp = require('amqplib');

async function consumer() {
    // 1. 建立連接對象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 獲取通道
    const channel = await connection.createChannel();

    // 3. 聲明參數
    const exchangeName = 'qosEx';
    const queueName = 'qosQueue';
    const routingKey = 'qos.#';

    // 4. 聲明交換機、對列進行綁定
    await channel.assertExchange(exchangeName, 'topic', { durable: true });
    await channel.assertQueue(queueName);
    await channel.bindQueue(queueName, exchangeName, routingKey);
    
    // 5. 限流參數設置
    await channel.prefetch(1, false);

    // 6. 限流,noAck參數必須設置爲false
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());

        // channel.ack(msg);
    }, { noAck: false });
}

consumer();
複製代碼
  • 未確認消息狀況測試

在 consumer 中咱們暫且將 channel.ack(msg) 註釋掉,分別啓動生產者和消費者,看看是什麼狀況?fetch

圖片描述

如上圖所示,總共5條消息按照預先設置的發送了一條消息,由於我將 channel.ack(msg) 註釋掉了,服務端在未獲得 ack 確認,將不會在發送剩下已 Ready 消息。ui

  • 確認消息測試

修改 consumer 代碼,打開確認消息註釋,從新啓動消費端進行測試spa

await channel.consume(queueName, msg => {
    console.log('Consumer:', msg.content.toString());

    channel.ack(msg); // 打開註釋
}, { noAck: false });
複製代碼

圖片描述

如上圖所示,Unacked 爲0,消息已所有消費成功。

RabbitMQ限流使用總結

限流在咱們的實際工做中仍是頗有意義的,在使用上生產端沒有變化,重點在消費端,着重看如下兩點:

  • 限流狀況 ack 不能設置自動簽收,修改 { noAck: false }
  • 增長限流參數設置 channel.prefetch(1, false)

資料

做者:五月君
連接:www.imooc.com/article/287… 來源:慕課網

相關文章
相關標籤/搜索