在消息隊列中,當消費者去消費消息的時候,不管是經過 pull 的方式仍是 push 的方式,均可能會出現大批量的消息突刺。若是此時要處理全部消息,極可能會致使系統負載太高,影響穩定性。但其實可能後面幾秒以內都沒有消息投遞,若直接把多餘的消息丟掉則沒有充分利用系統處理消息的能力。咱們但願能夠把消息突刺均攤到一段時間內,讓系統負載保持在消息處理水位之下的同時儘量地處理更多消息,從而起到「削峯填谷」的效果:html
上圖中紅色的部分表明超出消息處理能力的部分。bash
咱們能夠看到消息突刺每每都是瞬時的、不規律的,其後一段時間系統每每都會有空閒資源。咱們但願把紅色的那部分消息平攤到後面空閒時去處理,這樣既能夠保證系統負載處在一個穩定的水位,又能夠儘量地處理更多消息,這時候咱們就須要一個可以控制消費端消息勻速處理的利器 — AHAS 流控降級,來爲消息隊列削峯填谷,保駕護航。架構
AHAS 的流控降級是面向分佈式服務架構的專業流量控制組件,主要以流量爲切入點,從流量控制、熔斷降級、系統保護等多個維度來幫助您保障服務的穩定性,同時提供強大的聚合監控和歷史監控查詢功能。異步
AHAS 專門爲這種場景提供了勻速排隊的控制特性,能夠把忽然到來的大量請求以勻速的形式均攤,以固定的間隔時間讓請求經過,以穩定的速度逐步處理這些請求,起到「削峯填谷」的效果,從而避免流量突刺形成系統負載太高。同時堆積的請求將會排隊,逐步進行處理;當請求排隊預計超過最大超時時長的時候則直接拒絕,而不是拒絕所有請求。分佈式
好比在 RocketMQ 的場景下配置了勻速模式下請求 QPS 爲 5,則會每 200 ms 處理一條消息,多餘的處理任務將排隊;同時設置了超時時間,預計排隊時長超過超時時間的處理任務將會直接被拒絕。示意圖以下圖所示:ui
本部分將引導您快速在 RocketMQ 消費端接入 AHAS 流控降級 Sentinel。阿里雲
首先您須要到AHAS 控制檯開通 AHAS 功能(免費)。能夠根據 開通 AHAS 文檔 裏面的指引進行開通。url
在結合阿里雲 RocketMQ Client 使用 Sentinel 時,用戶須要引入 AHAS Sentinel 的依賴 ahas-sentinel-client
(以 Maven 爲例):spa
<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>ahas-sentinel-client</artifactId> <version>1.1.0</version></dependency>複製代碼
因爲 RocketMQ Client 未提供相應攔截機制,並且每次收到均可能是批量的消息,所以用戶在處理消息時須要手動進行資源定義(埋點)。咱們能夠在處理消息的邏輯處手動進行埋點,資源名能夠根據須要來肯定(如 groupId + topic 的組合):線程
private static Action handleMessage(Message message, String groupId, String topic) { Entry entry = null; try { // 資源名稱爲 groupId 和 topic 的組合,便於標識,同時能夠針對不一樣的 groupId 和 topic 配置不一樣的規則 entry = SphU.entry("handleMqMessage:" + groupId + ":" + topic); // 在此處編寫真實的處理邏輯 System.out.println(System.currentTimeMillis() + " | handling message: " + message); return Action.CommitMessage; } catch (BlockException ex) { // 在編寫處理被流控的邏輯 // 示例:能夠在此處記錄錯誤或進行重試 System.err.println("Blocked, will retry later: " + message); return Action.ReconsumeLater; // 會觸發消息從新投遞 } finally { if (entry != null) { entry.exit(); } } }複製代碼
消費者訂閱消息的邏輯示例:
Consumer consumer = ONSFactory.createConsumer(properties);consumer.subscribe(topic, "*", (message, context) -> { return handleMessage(message);});consumer.start();複製代碼
更多關於 RocketMQ SDK 的信息能夠參考 消息隊列 RocketMQ 入門文檔。
注意:若在本地運行接入 AHAS Sentinel 控制檯須要在頁面左上角選擇 公網 環境,若在阿里雲 ECS 環境則在頁面左上角選擇對應的 Region 環境。
咱們能夠進入 AHAS 控制檯,點擊左側側邊欄的 流控降級,進入 AHAS 流控降級控制檯應用總覽頁面。在頁面右上角,單擊添加應用,選擇 SDK 接入頁籤,到 配置啓動參數 頁籤拿到須要的啓動參數(詳情請參考 SDK 接入文檔),相似於:
-Dproject.name=AppName -Dahas.license=<License>複製代碼
其中 project.name
配置項表明應用名(會顯示在控制檯,好比 MqConsumerDemo
),ahas.license
配置項表明本身的受權 license(ECS 環境不須要此項)。
接下來咱們添加獲取到的啓動參數,啓動修改好的 Consumer 應用。因爲 AHAS 流控降級須要進行資源調用才能觸發初始化,所以首先須要向對應 group/topic 發送一條消息觸發初始化。消費端接收到消息後,咱們就能夠在 AHAS Sentinel 控制檯上看到咱們的應用了。點擊應用卡片,進入詳情頁面後點擊左側側邊欄的「機器列表」。咱們能夠在機器列表頁面看到剛剛接入的機器,表明接入成功:
點擊「請求鏈路」頁面,咱們能夠看到以前定義的資源。點擊右邊的「流控」按鈕添加新的流控規則:
咱們在「流控方式」中選擇「排隊等待」,設置 QPS 爲 10,表明每 100ms 勻速經過一個請求;而且設置最大超時時長爲 2000ms,超出此超時時間的請求將不會排隊,當即拒絕。配置完成後點擊新建按鈕。
下面咱們能夠在 Producer 端批量發送消息,而後在 Consumer 端的控制檯輸出處觀察效果。能夠看到消息消費的速率是勻速的,大約每 100 ms 消費一條消息:
同時不斷有排隊的處理任務完成,超出等待時長的處理請求直接被拒絕。注意在處理請求被拒絕的時候,須要根據需求決定是否須要從新消費消息。
咱們也能夠點擊左側側邊欄的「監控詳情」進入監控詳情頁面,查看處理消息的監控曲線:
對比普通限流模式的監控曲線(最右面的部分):
若是不開啓勻速模式,只是普通的限流模式,則只會同時處理 10 條消息,其他的所有被拒絕,即便後面的時間系統資源充足多餘的請求也沒法被處理,於是浪費了許多空閒資源。兩種模式對比說明勻速模式下消息處理能力獲得了更好的利用。
Kafka 消費端接入 AHAS 流控降級的思路與上面的 RocketMQ 相似,這裏給出一個簡單的代碼示例:
private static void handleMessage(ConsumerRecord<String, String> record, String groupId, String topic) { pool.submit(() -> { Entry entry = null; try { // 資源名稱爲 groupId 和 topic 的組合,便於標識,同時能夠針對不一樣的 groupId 和 topic 配置不一樣的規則 entry = SphU.entry("handleKafkaMessage:" + groupId + ":" + topic); // 在此處理消息. System.out.printf("[%d] Receive new messages: %s%n", System.currentTimeMillis(), record.toString()); } catch (BlockException ex) { // Blocked. // NOTE: 在處理請求被拒絕的時候,須要根據需求決定是否須要從新消費消息 System.err.println("Blocked: " + record.toString()); } finally { if (entry != null) { entry.exit(); } } });}複製代碼
消費消息的邏輯:
while (true) { try { ConsumerRecords<String, String> records = consumer.poll(1000); // 必須在下次 poll 以前消費完這些數據, 且總耗時不得超過 SESSION_TIMEOUT_MS_CONFIG // 建議開一個單獨的線程池來消費消息,而後異步返回結果 for (ConsumerRecord<String, String> record : records) { handleMessage(record, groupId, topic); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); }}複製代碼
以上介紹的只是 AHAS 流控降級的其中一個場景 —— 請求勻速,它還能夠處理更復雜的各類狀況,好比:
您能夠參考 AHAS 流控降級文檔 來挖掘更多的場景。