原文:Flink 流式聚合性能調優指南android
SQL 是數據分析中使用最普遍的語言。Flink Table API 和 SQL 使用戶可以以更少的時間和精力定義高效的流分析應用程序。此外,Flink Table API 和 SQL 是高效優化過的,它集成了許多查詢優化和算子優化。但並非全部的優化都是默認開啓的,所以對於某些工做負載,能夠經過打開某些選項來提升性能。web
這裏將介紹一些實用的優化選項以及流式聚合的內部原理,它們在某些狀況下能帶來很大的提高。緩存
注意:(1)目前,這裏提到的優化選項僅支持 Blink planner。(2)目前,流聚合優化僅支持無界聚合,窗口聚合優化將在將來支持。網絡
默認狀況下,無界聚合算子是逐條處理輸入的記錄,即:(1)從狀態中讀取累加器,(2)累加/撤回記錄至累加器,(3)將累加器寫回狀態,(4)下一條記錄將再次從(1)開始處理。這種處理模式可能會增長 StateBackend 開銷(尤爲是對於 RocksDB StateBackend )。此外,生產中很是常見的數據傾斜會使這個問題惡化,而且容易致使 job 發生反壓。app
MiniBatch 聚合
MiniBatch 聚合的核心思想是將一組輸入的數據緩存在聚合算子內部的緩衝區中。當輸入的數據被觸發處理時,每一個 key 只需一個操做便可訪問狀態。這樣能夠大大減小狀態開銷並得到更好的吞吐量。可是,這可能會增長一些延遲,由於它會緩衝一些記錄而不是當即處理它們。這是吞吐量和延遲之間的權衡。iphone
下圖說明了 mini-batch 聚合如何減小狀態操做。函數
Flink 流式聚合性能調優指南
默認狀況下 mini-batch 優化是被禁用的。開啓這項優化,須要設置選項
table.exec.mini-batch.enabled、
table.exec.mini-batch.allow-latency 和
table.exec.mini-batch.size。性能
下面的例子顯示如何啓用這些選項。優化
// instantiate table environment
TableEnvironment tEnv = ...ip
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
Local-Global 聚合
Local-Global 聚合是爲解決數據傾斜問題提出的,經過將一組聚合分爲兩個階段,首先在上游進行本地聚合,而後在下游進行全局聚合,相似於 MapReduce 中的 Combine + Reduce 模式。例如,就如下 SQL 而言:
SELECT color, sum(id)
FROM T
GROUP BY color
數據流中的記錄可能會傾斜,所以某些聚合算子的實例必須比其餘實例處理更多的記錄,這會產生熱點問題。本地聚合能夠將必定數量具備相同 key 的輸入數據累加到單個累加器中。全局聚合將僅接收 reduce 後的累加器,而不是大量的原始輸入數據。這能夠大大減小網絡 shuffle 和狀態訪問的成本。每次本地聚合累積的輸入數據量基於 mini-batch 間隔。這意味着 local-global 聚合依賴於啓用了 mini-batch 優化。
下圖顯示了 local-global 聚合如何提升性能。
Flink 流式聚合性能調優指南
下面的例子顯示如何啓用 local-global 聚合。
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // enable two-phase, i.e. local-global aggregation
拆分 distinct 聚合
Local-Global 優化可有效消除常規聚合的數據傾斜,例如 SUM、COUNT、MAX、MIN、AVG。可是在處理 distinct 聚合時,其性能並不使人滿意。
例如,若是咱們要分析今天有多少惟一用戶登陸。咱們可能有如下查詢:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
若是 distinct key (即 user_id)的值分佈稀疏,則 COUNT DISTINCT 不適合減小數據。即便啓用了 local-global 優化也沒有太大幫助。由於累加器仍然包含幾乎全部原始記錄,而且全局聚合將成爲瓶頸(大多數繁重的累加器由一個任務處理,即同一天)。
這個優化的想法是將不一樣的聚合(例如 COUNT(DISTINCT col))分爲兩個級別。第一次聚合由 group key 和額外的 bucket key 進行 shuffle。bucket key 是使用 HASH_CODE(distinct_key) % BUCKET_NUM 計算的。BUCKET_NUM 默認爲1024,能夠經過
table.optimizer.distinct-agg.split.bucket-num 選項進行配置。第二次聚合是由原始 group key 進行 shuffle,並使用 SUM 聚合來自不一樣 buckets 的 COUNT DISTINCT 值。因爲相同的 distinct key 將僅在同一 bucket 中計算,所以轉換是等效的。bucket key 充當附加 group key 的角色,以分擔 group key 中熱點的負擔。bucket key 使 job 具備可伸縮性來解決不一樣聚合中的數據傾斜/熱點。
拆分 distinct 聚合後,以上查詢將被自動改寫爲如下查詢:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下圖顯示了拆分 distinct 聚合如何提升性能(假設顏色表示 days,字母表示 user_id)。
Flink 流式聚合性能調優指南
注意:上面是能夠從這個優化中受益的最簡單的示例。除此以外,Flink 還支持拆分更復雜的聚合查詢,例如,多個具備不一樣 distinct key (例如 COUNT(DISTINCT a), SUM(DISTINCT b) )的 distinct 聚合,能夠與其餘非 distinct 聚合(例如 SUM、MAX、MIN、COUNT )一塊兒使用。
注意 當前,拆分優化不支持包含用戶定義的 AggregateFunction 聚合。
下面的例子顯示瞭如何啓用拆分 distinct 聚合優化。
// instantiate table environment
TableEnvironment tEnv = ...
tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString("table.optimizer.distinct-agg.split.enabled", "true"); // enable distinct agg split
在 distinct 聚合上使用 FILTER 修飾符
在某些狀況下,用戶可能須要從不一樣維度計算 UV(獨立訪客)的數量,例如來自 Android 的 UV、iPhone 的 UV、Web 的 UV 和總 UV。不少人會選擇 CASE WHEN,例如:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
可是,在這種狀況下,建議使用 FILTER 語法而不是 CASE WHEN。由於 FILTER 更符合 SQL 標準,而且能得到更多的性能提高。FILTER 是用於聚合函數的修飾符,用於限制聚合中使用的值。將上面的示例替換爲 FILTER 修飾符,以下所示:
SELECT day, COUNT(DISTINCT user_id) AS total_uv, COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv, COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv FROM T GROUP BY day Flink SQL 優化器能夠識別相同的 distinct key 上的不一樣過濾器參數。例如,在上面的示例中,三個 COUNT DISTINCT 都在 user_id 一列上。Flink 能夠只使用一個共享狀態實例,而不是三個狀態實例,以減小狀態訪問和狀態大小。在某些工做負載下,能夠得到顯著的性能提高。