高可用服務 AHAS 在消息隊列 MQ 削峯填穀場景下的應用

在消息隊列中,當消費者去消費消息的時候,不管是經過 pull 的方式仍是 push 的方式,均可能會出現大批量的消息突刺。若是此時要處理全部消息,極可能會致使系統負載太高,影響穩定性。但其實可能後面幾秒以內都沒有消息投遞,若直接把多餘的消息丟掉則沒有充分利用系統處理消息的能力。咱們但願能夠把消息突刺均攤到一段時間內,讓系統負載保持在消息處理水位之下的同時儘量地處理更多消息,從而起到「削峯填谷」的效果:html

上圖中紅色的部分表明超出消息處理能力的部分。微信

咱們能夠看到消息突刺每每都是瞬時的、不規律的,其後一段時間系統每每都會有空閒資源。咱們但願把紅色的那部分消息平攤到後面空閒時去處理,這樣既能夠保證系統負載處在一個穩定的水位,又能夠儘量地處理更多消息,這時候咱們就須要一個可以控制消費端消息勻速處理的利器 — AHAS 流控降級,來爲消息隊列削峯填谷,保駕護航。架構

AHAS 是如何削峯填谷的

AHAS 的流控降級是面向分佈式服務架構的專業流量控制組件,主要以流量爲切入點,從流量控制、熔斷降級、系統保護等多個維度來幫助您保障服務的穩定性,同時提供強大的聚合監控和歷史監控查詢功能。異步

AHAS 專門爲這種場景提供了勻速排隊的控制特性,能夠把忽然到來的大量請求以勻速的形式均攤,以固定的間隔時間讓請求經過,以穩定的速度逐步處理這些請求,起到「削峯填谷」的效果,從而避免流量突刺形成系統負載太高。同時堆積的請求將會排隊,逐步進行處理;當請求排隊預計超過最大超時時長的時候則直接拒絕,而不是拒絕所有請求。分佈式

好比在 RocketMQ 的場景下配置了勻速模式下請求 QPS 爲 5,則會每 200 ms 處理一條消息,多餘的處理任務將排隊;同時設置了超時時間,預計排隊時長超過超時時間的處理任務將會直接被拒絕。示意圖以下圖所示:阿里雲

RocketMQ Consumer 接入示例

本部分將引導您快速在 RocketMQ 消費端接入 AHAS 流控降級 Sentinel。url

1. 開通 AHASspa

首先您須要到AHAS 控制檯開通 AHAS 功能(免費)。能夠根據 開通 AHAS 文檔 裏面的指引進行開通。線程

2. 代碼改造3d

在結合阿里雲 RocketMQ Client 使用 Sentinel 時,用戶須要引入 AHAS Sentinel 的依賴 ahas-sentinel-client (以 Maven 爲例):

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>ahas-sentinel-client</artifactId>
    <version>1.1.0</version>
</dependency>

因爲 RocketMQ Client 未提供相應攔截機制,並且每次收到均可能是批量的消息,所以用戶在處理消息時須要手動進行資源定義(埋點)。咱們能夠在處理消息的邏輯處手動進行埋點,資源名能夠根據須要來肯定(如 groupId + topic 的組合):

private static Action handleMessage(Message message, String groupId, String topic) {
        Entry entry = null;
        try {
            // 資源名稱爲 groupId 和 topic 的組合,便於標識,同時能夠針對不一樣的 groupId 和 topic 配置不一樣的規則
            entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic);

            // 在此處編寫真實的處理邏輯
            System.out.println(System.currentTimeMillis() + " | handling message: " + message);
            return Action.CommitMessage;
        } catch (BlockException ex) {
            // 在編寫處理被流控的邏輯
            // 示例:能夠在此處記錄錯誤或進行重試
            System.err.println("Blocked, will retry later: " + message);
            return Action.ReconsumeLater; // 會觸發消息從新投遞
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    }

消費者訂閱消息的邏輯示例:

Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, "*", (message, context) -> {
    return handleMessage(message);
});
consumer.start();

更多關於 RocketMQ SDK 的信息能夠參考 消息隊列 RocketMQ 入門文檔

3. 獲取 AHAS 啓動參數

注意:若在本地運行接入 AHAS Sentinel 控制檯須要在頁面左上角選擇 公網環境,若在阿里雲 ECS 環境則在頁面左上角選擇對應的 Region 環境。

咱們能夠進入 AHAS 控制檯,點擊左側側邊欄的 流控降級,進入 AHAS 流控降級控制檯應用總覽頁面。在頁面右上角,單擊添加應用,選擇 SDK 接入頁籤,到 配置啓動參數 頁籤拿到須要的啓動參數(詳情請參考 SDK 接入文檔),相似於:

-Dproject.name=AppName -Dahas.license=<License>

其中 project.name 配置項表明應用名(會顯示在控制檯,好比 MqConsumerDemo),ahas.license 配置項表明本身的受權 license(ECS 環境不須要此項)。

4. 啓動 Consumer,配置規則

接下來咱們添加獲取到的啓動參數,啓動修改好的 Consumer 應用。因爲 AHAS 流控降級須要進行資源調用才能觸發初始化,所以首先須要向對應 group/topic 發送一條消息觸發初始化。消費端接收到消息後,咱們就能夠在 AHAS Sentinel 控制檯上看到咱們的應用了。點擊應用卡片,進入詳情頁面後點擊左側側邊欄的「機器列表」。咱們能夠在機器列表頁面看到剛剛接入的機器,表明接入成功:

點擊「請求鏈路」頁面,咱們能夠看到以前定義的資源。點擊右邊的「流控」按鈕添加新的流控規則:

咱們在「流控方式」中選擇「排隊等待」,設置 QPS 爲 10,表明每 100ms 勻速經過一個請求;而且設置最大超時時長爲 2000ms,超出此超時時間的請求將不會排隊,當即拒絕。配置完成後點擊新建按鈕。

5. 發送消息,查看效果

下面咱們能夠在 Producer 端批量發送消息,而後在 Consumer 端的控制檯輸出處觀察效果。能夠看到消息消費的速率是勻速的,大約每 100 ms 消費一條消息:

1550732955137 | handling message: Hello MQ 2453
1550732955236 | handling message: Hello MQ 9162
1550732955338 | handling message: Hello MQ 4944
1550732955438 | handling message: Hello MQ 5582
1550732955538 | handling message: Hello MQ 4493
1550732955637 | handling message: Hello MQ 3036
1550732955738 | handling message: Hello MQ 1381
1550732955834 | handling message: Hello MQ 1450
1550732955937 | handling message: Hello MQ 5871

同時不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。注意在處理請求被拒絕的時候,須要根據需求決定是否須要從新消費消息。

咱們也能夠點擊左側側邊欄的「監控詳情」進入監控詳情頁面,查看處理消息的監控曲線:

對比普通限流模式的監控曲線(最右面的部分):

若是不開啓勻速模式,只是普通的限流模式,則只會同時處理 10 條消息,其他的所有被拒絕,即便後面的時間系統資源充足多餘的請求也沒法被處理,於是浪費了許多空閒資源。兩種模式對比說明勻速模式下消息處理能力獲得了更好的利用。

Kafka 接入代碼示例

Kafka 消費端接入 AHAS 流控降級的思路與上面的 RocketMQ 相似,這裏給出一個簡單的代碼示例:

private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) {
    pool.submit(() -> {
        Entry entry = null;
        try {
            // 資源名稱爲 groupId 和 topic 的組合,便於標識,同時能夠針對不一樣的 groupId 和 topic 配置不一樣的規則
            entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic);

            // 在此處理消息.
            System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString());
        } catch (BlockException ex) {
            // Blocked.
            // NOTE: 在處理請求被拒絕的時候,須要根據需求決定是否須要從新消費消息
            System.err.println("Blocked: " + record.toString());
        } finally {
            if (entry != null) {
                entry.exit();
            }
        }
    });
}

消費消息的邏輯:

while (true) {
    try {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        // 必須在下次 poll 以前消費完這些數據, 且總耗時不得超過 SESSION_TIMEOUT_MS_CONFIG
        // 建議開一個單獨的線程池來消費消息,而後異步返回結果
        for (ConsumerRecord<String, String> record : records) {
            handleMessage(record, groupId, topic);
        }
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (Throwable ignore) {
        }
        e.printStackTrace();
    }
}

其它

以上介紹的只是 AHAS 流控降級的其中一個場景 —— 請求勻速,它還能夠處理更復雜的各類狀況,好比:

  • 流量控制:能夠針對不一樣的調用關係,以不一樣的運行指標(如 QPS、線程數、系統負載等)爲基準,對資源調用進行流量控制,將隨機的請求調整成合適的形狀(請求勻速、Warm Up 等)。
  • 熔斷降級:當調用鏈路中某個資源出現不穩定的狀況,如平均 RT 增高、異常比例升高的時候,會使對此資源的調用請求快速失敗,避免影響其它的資源致使級聯失敗。
  • 系統負載保護:對系統的維度提供保護。當系統負載較高的時候,提供了對應的保護機制,讓系統的入口流量和系統的負載達到一個平衡,保證系統在能力範圍以內處理最多的請求。

您能夠參考 AHAS 流控降級文檔 來挖掘更多的場景。

 

原文連接 更多技術乾貨 請關注阿里云云棲社區微信號 :yunqiinsight

相關文章
相關標籤/搜索