TopN 是統計報表和大屏很是常見的功能,主要用來實時計算排行榜。流式的 TopN 不一樣於批處理的 TopN,它的特色是持續的在內存中按照某個統計指標(如出現次數)計算 TopN 排行榜,而後當排行榜發生變化時,發出更新後的排行榜。本文主要講解 Flink SQL 是如何從語法和實現上設計 TopN 的。算法
用戶最關心的是如何用 SQL 寫出 TopN 的查詢。你們最熟悉的 TopN 的寫法通常是這樣的:sql
SELECT column_name(s) FROM table_name WHERE condition ORDER BY order_field [DESC|ASC] LIMIT number
如上語法是 MySQL 的 TopN 語法,使用 ORDER BY
指定排序鍵和排序方向,使用 LIMIT
來指定選出前幾名。不一樣的數據庫的 TopN 語法不盡相同,好比 MS SQL Server 使用 TOP 的關鍵字,Oracle 使用 ROWNUM 的隱藏字段。不過幾家數據庫提供的 TopN 語法都是全局 TopN,也就是數據是全局進行排序的,查詢的結果只有一組排行榜。好比但願對全網商家按銷售額排序,計算出銷售額排名前十的商家。這就是全局 TopN,範例以下:數據庫
SELECT * FROM shop_sales ORDER BY sales DESC LIMIT 10
上文講述了全局 TopN 的語法,可是不少時候用戶但願根據不一樣的分組進行排序,計算出每一個分組的一個排行榜。例如對全網商家根據行業按銷售額排序,計算出每一個行業銷售額前十名的商家。這時候,傳統的 TopN 語法就沒法表達這種需求了。有些 Stream SQL 系統爲了解決這個問題,會 hack 一種新的 TopN 語法容許用戶指定分組字段。可是 Flink SQL 是基於 ANSI SQL 標準語法的,不能加入任何非標準的語法。因而咱們嘗試從批處理的角度去思考這個問題,在傳統批處理中經常使用 ROW_NUMBER 的開窗聚合函數來解決分組 TopN 的問題。語法以下所示:數據結構
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]
參數說明:併發
ROW_NUMBER()
: 是一個計算行號的OVER窗口函數,行號計算從1開始。PARTITION BY col1[, col2..]
: 指定分區的列,能夠不指定。ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
: 指定排序的列,能夠多列不一樣排序方向。如上語法所示,TopN 須要兩層 query,子查詢中使用ROW_NUMBER()
開窗函數來爲每條數據標上排名,排名的計算根據PARTITION BY
和ORDER BY
來指定分區列和排序列,也就是說每一條數據會計算其在所屬分區中,根據排序列排序獲得的排名。在外層查詢中,對排名進行過濾,只取出排名小於 N 的,如 N=10,那麼就是取 Top 10 的數據。若是沒有指定PARTITION BY
那麼就是一個全局 TopN 的計算,因此 ROW_NUMBER 在使用上更爲靈活。函數
如《實時計算 Flink SQL 核心功能解密》中所述,Flink SQL 是一個流與批統一的 API,也就是說理論上一段 SQL 要既能跑在批處理模式下,也能跑在流處理模式下,且輸出的結果是一致的。那麼在流處理模式下理所固然地應該支持 ROW_NUMBER 形式的 TopN 語法。例如上文說的對全網商家根據行業按銷售額排序,計算出每一個行業銷售額前十名的商家,SQL 範例以下。性能
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rownum FROM shop_sales) WHERE rownum <= 10
ROW_NUMBER 方式的 TopN 語法很是靈活,能知足全局 TopN 和分組 TopN 的需求。可是在流計算上的物理執行是一個挑戰。如上文所述的每一個行業銷售額前十商家排行榜,通過 SQL 編譯後獲得的抽象語法樹(AST)以下所示。優化
LogicalWindow 會對全部數據進行排名,也就是說每當到達一個數據,就要對歷史數據進行重排序,並輸出歷史數據的新的排名,而後 LogicalCalc 節點會根據排名進行過濾。這在性能上是很是糟糕的,由於這無限放大了流量。而咱們知道,最優的流式 TopN 的計算只須要維護一個 N 元素大小的小根堆,每當有數據到達時,只須要與堆頂元素比較,若是比堆頂元素還小,則直接丟棄;若是比堆頂元素大,則更新小根堆,並輸出更新後的排行榜。也就是說咱們不須要分爲兩個節點進行計算,不須要將全部數據進行排序,只須要在一個節點中就能夠高效地完成計算。因此咱們在查詢優化器中加入了一條規則,在使用 TopN 語法時,將 LogicalWindow 和 LogicalCalc 合併成了 LogicalRank 節點。LogicalRank 在翻譯成物理執行計劃時,會使用一個通過特殊設計的 TopN 算子。阿里雲
TopN 算子的實現上主要有兩個數據結構,一個是 TreeMap,另外一個是 MapState。TreeMap 的做用相似於上文的小根堆,有序地存放了排名前 N 的元素。可是 TreeMap 是個內存數據結構,在 failover 後會丟失,沒法保證數據的一致性。所以咱們還有一個 MapState 結構,MapState 是 Flink 提供的狀態接口,用來存儲 TopN 的數據(保證數據不丟)。當有 failover 發生後,MapState 能保證狀態的恢復,而 TreeMap 會從 MapState 中從新構造出來。咱們並有沒有把順序也存到狀態中去,由於順序是能夠在恢復時重構的。由於每一次狀態的讀寫操做都會涉及到序列化/反序列化,每每是性能的瓶頸,因此 TreeMap 的主要做用是下降了對 MapState 狀態的讀寫操做。對大部分數據來講都是與 TreeMap 進行交互,不須要對 MapState 進行讀寫的,全是內存操做,因此 TopN 的性能是很是高的。spa
TopN 算子的主要處理流程是,每當有數據到達時,會與 TreeMap 的最小的元素比較,若是比它小,那麼該數據就不多是 TopN 的一員,直接丟棄便可。若是比它大,那麼就會先更新 TreeMap,同時更新 MapState 中的存的數據。最後輸出更新後的排行榜。爲了減小冗餘數據的輸出,咱們只會輸出排名發生變化的數據。例如原先的第7名上升到了第六名,那麼只須要輸出新的第六名和第七名便可。
TopN 的計算與 GroupBy 的計算相似,若是數據存在傾斜,則會有計算熱點的現象。好比全局 TopN,那麼全部的數據只能聚集到一個節點進行 TopN 的計算,那麼計算能力就會受限於單臺機器,沒法作到水平擴展。解決思路與 GroupBy 是相似的,就是使用嵌套 TopN,或者說兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用於分散熱點。例如,計算全網排名前十的商鋪,會致使單點的數據熱點,那麼能夠先加一層分組 TopN,組的劃分規則是根據店鋪 ID 哈希取模後分紅128組(併發的倍數)。第二層 TopN 與原先的寫法同樣,沒有 PARTITION BY。第一層會計算出每一組的 TopN,然後在第二層中進行合併彙總,獲得最終的全網前十。第二層雖然還是單點,可是大量的計算量由第一層分擔了,而第一層是能夠水平擴展的。使用嵌套 TopN 的優化寫法以下所示:
CREATE VIEW tmp_topn AS SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY HASH_CODE(shop_id)%128 ORDER BY sales DESC) AS rownum FROM shop_sales) WHERE rownum <= 10 SELECT * FROM ( SELECT shop_id, shop_name, sales, ROW_NUMBER() OVER (ORDER BY sales DESC) AS rownum FROM tmp_topn) WHERE rownum <= 10
流式 TopN 不只在語法以及算法上會遇到不少挑戰,在不一樣場景下的優化方案也是個很是有意思的話題。目前 Flink SQL 的 TopN 功能已經大量應用於彩票業務、阿里雲的CDN項目、WAF項目等等。將來,咱們除了會針對更多的場景對 TopN 進行優化,還會提供除了 ROW_NUMBER 外的 RANK、RANK_DENSE 排名函數,使得 TopN 更加靈活