做者:徐懷宇git
在 SQL 中,聚合操做對一組值執行計算,並返回單個值。TiDB 實現了 2 種聚合算法:Hash Aggregation 和 Stream Aggregation。github
咱們首先以 AVG
函數爲例(案例參考 Stack Overflow),簡述這兩種算法的執行原理。算法
假設表 t
以下:sql
列 a | 列 b |
---|---|
1 | 9 |
1 | -8 |
2 | -7 |
2 | 6 |
1 | 5 |
2 | 4 |
SQL: select avg(b) from t group by a
, 要求將表 t
的數據按照 a
的值分組,對每一組的 b
值計算平均值。無論 Hash 仍是 Stream 聚合,在 AVG
函數的計算過程當中,咱們都須要維護 2 箇中間結果變量 sum
和 count
。Hash 和 Stream 聚合算法的執行原理以下。express
在 Hash Aggregate 的計算過程當中,咱們須要維護一個 Hash 表,Hash 表的鍵爲聚合計算的 Group-By
列,值爲聚合函數的中間結果 sum
和 count
。在本例中,鍵爲 列 a
的值,值爲 sum(b)
和 count(b)
。數組
計算過程當中,只須要根據每行輸入數據計算出鍵,在 Hash 表中找到對應值進行更新便可。對本例的執行過程模擬以下。併發
輸入數據 a b |
Hash 表 [key] (sum, count) |
---|---|
1 9 | [1] (9, 1) |
1 -8 | [1] (1, 2) |
2 -7 | [1] (1, 2) [2] (-7, 1) |
2 6 | [1] (1, 2) [2] (-1, 2) |
1 5 | [1] (6, 3) [2] (-1, 2) |
2 4 | [1] (6, 3) [2] (3, 3) |
輸入數據輸入完後,掃描 Hash 表並計算,即可以獲得最終結果:分佈式
Hash 表 | avg(b) |
---|---|
[1] (6, 3) |
2 |
[2] (3, 3) |
1 |
Stream Aggregate 的計算須要保證輸入數據按照 Group-By
列有序。在計算過程當中,每當讀到一個新的 Group 的值或全部數據輸入完成時,便對前一個 Group 的聚合最終結果進行計算。函數
對於本例,咱們首先對輸入數據按照 a
列進行排序。排序後,本例執行過程模擬以下。性能
輸入數據 | 是否爲新 Group 或全部數據輸入完成 | (sum, count) |
avg(b) |
---|---|---|---|
1 9 | 是 | (1, 9) | 前一個 Group 爲空,不進行計算 |
1 -8 | 否 | (2, 1) | |
1 5 | 否 | (3, 6) | |
2 -7 | 是 | (1, -7) | 2 |
2 6 | 否 | (2, -1) | |
2 4 | 否 | (3, 3) | |
是 | 1 |
由於 Stream Aggregate 的輸入數據須要保證同一個 Group 的數據連續輸入,因此 Stream Aggregate 處理完一個 Group 的數據後能夠馬上向上返回結果,不用像 Hash Aggregate 同樣須要處理完全部數據後才能正確的對外返回結果。當上層算子只須要計算部分結果時,好比 Limit,當獲取到須要的行數後,能夠提早中斷 Stream Aggregate 後續的無用計算。
當 Group-By
列上存在索引時,由索引讀入數據能夠保證輸入數據按照 Group-By
列有序,此時同一個 Group 的數據連續輸入 Stream Aggregate 算子,能夠避免額外的排序操做。
因爲分佈式計算的須要,TiDB 對於聚合函數的計算階段進行劃分,相應定義了 5 種計算模式:CompleteMode,FinalMode,Partial1Mode,Partial2Mode,DedupMode。不一樣的計算模式下,所處理的輸入值和輸出值會有所差別,以下表所示:
AggFunctionMode | 輸入值 | 輸出值 |
---|---|---|
CompleteMode | 原始數據 | 最終結果 |
FinalMode | 中間結果 | 最終結果 |
Partial1Mode | 原始數據 | 中間結果 |
Partial2Mode | 中間結果 | 進一步聚合的中間結果 |
DedupMode | 原始數據 | 去重後的原始數據 |
以上文提到的 select avg(b) from t group by a
爲例,經過對計算階段進行劃分,能夠有多種不一樣的計算模式的組合,如:
CompleteMode
此時 AVG
函數的整個計算過程只有一個階段,如圖所示:
Partial1Mode --> FinalMode
此時咱們將 AVG
函數的計算過程拆成兩個階段進行,如圖所示:
除了上面的兩個例子外,還可能有以下的幾種計算方式:
聚合被下推到 TiKV 上進行計算(Partial1Mode),並返回通過預聚合的中間結果。爲了充分利用 TiDB server 所在機器的 CPU 和內存資源,加快 TiDB 層的聚合計算,TiDB 層的聚合函數計算能夠這樣進行:Partial2Mode --> FinalMode。
當聚合函數須要對參數進行去重,也就是包含 DISTINCT
屬性,且聚合算子由於一些緣由不能下推到 TiKV 時,TiDB 層的聚合函數計算能夠這樣進行:DedupMode --> Partial1Mode --> FinalMode。
聚合函數分爲幾個階段執行, 每一個階段對應的模式是什麼,是否要下推到 TiKV,使用 Hash 仍是 Stream 聚合算子等都由優化器根據數據分佈、估算的計算代價等來決定。
構建邏輯執行計劃 時,會調用 NewAggFuncDesc 將聚合函數的元信息封裝爲一個 AggFuncDesc。 其中 AggFuncDesc.RetTp
由 AggFuncDesc.typeInfer 根據聚合函數類型及參數類型推導而來;AggFuncDesc.Mode
統一初始化爲 CompleteMode。
構建物理執行計劃時,PhysicalHashAgg
和 PhysicalStreamAgg
的 attach2Task
方法會根據當前 task
的類型嘗試進行下推聚合計算,若是 task
類型知足下推的基本要求,好比 copTask
,接着會調用 newPartialAggregate 嘗試將聚合算子拆成 TiKV 上執行的 Partial 算子和 TiDB 上執行的 Final
算子,其中 AggFuncToPBExpr 函數用來判斷某個聚合函數是否能夠下推。若聚合函數能夠下推,則會在 TiKV 中進行預聚合並返回中間結果,所以須要將 TiDB 層執行的 Final
聚合算子的 AggFuncDesc.Mode
修改成 FinalMode,並將其 AggFuncDesc.Args
修改成 TiKV 預聚合後返回的中間結果,TiKV 層的 Partial 聚合算子的 AggFuncDesc
也須要做出對應的修改,這裏再也不詳述。若聚合函數不能夠下推,則 AggFuncDesc.Mode
保持不變。
構建 HashAgg 執行器時,首先檢查當前 HashAgg
算子是否能夠並行執行。目前當且僅當兩種狀況下 HashAgg
不能夠並行執行:
DISTINCT
的狀況目前僅能單線程執行。tidb_hashagg_partial_concurrency
和 tidb_hashagg_final_concurrency
被同時設置爲 1 時。這兩個系統變量分別用來控制 Hash Aggregation 並行計算時候,TiDB 層聚合計算 partial 和 final 階段 worker 的併發數。當它們都被設置爲 1 時,選擇單線程執行。若 HashAgg
算子能夠並行執行,使用 AggFuncDesc.Split 根據 AggFuncDesc.Mode
將 TiDB 層的聚合算子的計算拆分爲 partial 和 final 兩個階段,並分別生成對應的 AggFuncDesc
,設爲 partialAggDesc
和 finalAggDesc
。若 AggFuncDesc.Mode == CompleteMode
,則將 TiDB 層的計算階段拆分爲 Partial1Mode --> FinalMode
;若 AggFuncDesc.Mode == FinalMode
,則將 TiDB 層的計算階段拆分爲 Partial2Mode --> FinalMode
。進一步的,咱們能夠根據 partialAggDesc
和 finalAggDesc
分別 構造出對應的執行函數。
TiDB 的並行 Hash Aggregation 算子執行過程當中的主要線程有:Main Thead,Data Fetcher,Partial Worker,和 Final Worker:
Hash Aggregation 的執行階段可分爲以下圖所示的 5 步:
啓動 Data Fetcher,Partial Workers 及 Final Workers。
這部分工做由 prepare4Parallel 函數完成。該函數會啓動一個 Data Fetcher,多個 Partial Worker 以及 多個 Final Worker。Partial Worker 和 Final Worker 的數量能夠分別經過 tidb_hashgg_partial_concurrency
和 tidb_hashagg_final_concurrency
系統變量進行控制,這兩個系統變量的默認值都爲 4。
DataFetcher 讀取子節點的數據並分發給 Partial Workers。
這部分工做由 fetchChildData 函數完成。
Partial Workers 預聚合計算,及根據 Group Key shuffle 給對應的 Final Workers。
這部分工做由 HashAggPartialWorker.run 函數完成。該函數調用 updatePartialResult 函數對 DataFetcher 發來數據執行 預聚合計算,並將預聚合結果存儲到 partialResultMap 中。其中 partialResultMap
的 key 爲根據 Group-By
的值 encode 的結果,value 爲 PartialResult 類型的數組,數組中的每一個元素表示該下標處的聚合函數在對應 Group 中的預聚合結果。shuffleIntermData 函數完成根據 Group 值 shuffle 給對應的 Final Worker。
Final Worker 計算最終結果,發送給 Main Thread。
這部分工做由 HashAggFinalWorker.run 函數完成。該函數調用 consumeIntermData 函數 接收 PartialWorkers 發送來的預聚合結果,進而 合併 獲得最終結果。getFinalResult 函數完成發送最終結果給 Main Thread。
Main Thread 接收最終結果並返回。
此處以 TPC-H query-17 爲例,測試並行 Hash Aggregation 相較於單線程計算時的性能提高。引入並行 Hash Aggregation 前,它的計算瓶頸在 HashAgg_35
。
該查詢執行計劃以下:
在 TiDB 中,使用 EXPLAIN ANALYZE 能夠獲取 SQL 的執行統計信息。因篇幅緣由此處僅貼出 TPC-H query-17 部分算子的 EXPLAIN ANALYZE 結果。
HashAgg
單線程計算時:
查詢總執行時間 23 分 24 秒,其中 HashAgg
執行時間約 17 分 9 秒。
HashAgg
並行計算時(此時 TiDB 層 Partial 和 Final 階段的 worker 數量都設置爲 16):
總查詢時間 8 分 37 秒,其中 HashAgg
執行時間約 1 分 4 秒。
並行計算時,Hash Aggregation 的計算速度提高約 16 倍。