Flink使用Calcite構造SQL引擎,那麼他們 是怎麼合做的? drill, hive,storm 和其餘的一干apache 大數據引擎也用calcite , 那麼對於同一個sql 語句(statement) , 不管複雜簡單與否,他們和Flink產生的執行計劃是否是同樣的? 若是不同, 區別是怎麼產生的? 應該在哪裏實施優化和發力?優化的手段和原則有那些,等等? 本文不會對calcite 面面作具到的介紹,重點是SQL執行計劃的優化框架,流程和策略, 對執行計劃進行優化是calcite 的主要業務。爲了有助於理解優化框架,對於必要的概念會有介紹, 好比關係,關係代數,關係演算,等價原理,謂詞邏輯等。java
Calcite 只支持於關係型數據模型(不支持層次,網狀,對象數據庫的模型), 那麼什麼是關係型數據庫呢 ? 建議讀一下引用5中的那本書 ,雖然我也沒讀完 。下面解釋一下一些比較容易混淆的概念 。程序員
關係:關係一詞來自離散數學裏的集合論,根據維基百科的的定義,給定任意集合A和B,若(笛卡爾乘積),則稱R爲從A到B的二元關係,特別在A=B時,稱R爲A上的二元關係。若是一個有N列的二維表, 每一列的取值範圍爲Ai 則該表是定義在A1x..Ai..xAn上的N元關係。可見,關係(Relation)是N元有序序列的集合。關係在數據庫的概念裏稱做表(Table),關係的每個有序序列叫作元組或行(Row), 元組的每個量叫份量或列(Column)。算法
關係模式(Schema):是對關係或表的描述。包括關係名稱(Table Name),列名以及列的定義域(Domain)。sql
關係模型(Model):指的是一系列關係模式的集合, 概念上對應數據庫。數據庫
維度(Dimension):一般是指列離散定義域的列。定義域上的每個值稱爲基(cardinality), 一個關係已經使用的全部的基的個數成爲基數 (cardinal number) , 也就是 distinct count , 也成 NDV (Number of Distinct Value) 。express
關係代數:是由 Edgar F. Codd提出一種利用具備良好語義的代數結構用於對數據建模和定義查詢的理論。代數結構是在一種或多種運算下封閉的一個或多個集合,那麼關係代數在閉合關係上的良好語義的運算的集合。通俗的說,關係代數是一種經過由代數運算和輸入關係組成的表達式來表達輸出關係的理論。 好比 圖-3中,SQL 產生的關係能夠多個樹形的表達式表示,樹的形狀和節點的排列順序表明運算的過程(從下到上) 。關係代數是面向過程的。
概念 | 解釋 | 例子 |
RelNode | 關係代數中的關係和運算的基類。RelNode 有不少繼承者, 見舉例。 | TableScan對應一個數據源的一個關係,Filter對應Filter運算 。Filter有一些系列的繼承者, 每個繼承者對應一個CallingConvention, 也對應一個優化階段的使用的數據結構 。 好比Join<-LogicalJoin<-FlinkLocalJoin<-FlinkBatchExecJoin, FlinkExecStreamJoin 。 |
RelDataType | 表明域,是關係中列的定義域 | 整數,日期,浮點數,定點數,字符串 |
RexNode | 表明Project裏Filter中表達式 | 好比下面關係裏一個份量(InputRef),一個常量值(Literal),一個或多個份量函數(RexCall, 加、減、乘、除、CAST 等) |
RelTrait | 表明關係運算的物理特性,這個應該是calcite 裏最讓人迷惑的概念了。但Trait, set, subset 是 volcanoPlanner 最依賴的概念。 關係的物理特性是跟關係代數沒有關係的一些特性,因此只有跟物理執行系統比較臨近的關係運算纔會有物理特性,好比FlinkBatchExecFilter 有物理特性, 它是被翻譯成FlinkJobGraph的輸入計劃的節點類型。而LogicalFilter觀徹底邏輯上的關係觀念,所以不會有任何物理特性。 Calcite 有三種類型的特性, 類型叫作TraitDef 。
|
最初的關係表達式中的join用calcite LocalJoin表示,在LOGICAL convention 中用FlinkLocalJoin, BATCH_PHYSICAL convention 中用 FlinkBatchExecHashJoin。一個在Flink優化的join節點會有隨着convention 改變可能有以下的變形。 LogicalJoin --> FlinkLogicalJoin--> FlinkBatchExecHashJoin。 |
AbstractConverter |
當關系表達式的 Convention 發生變化的時候, VolcanoPlanner 會在Relset 裏建立Relsubset 表明這種traits, 隨後建立AbstractConverter用於轉化成真正的表達式。 ExpendConventionRule 配備AbstractConverter, 將它轉化成合適的節點, 好比 BroadcastExchange, Sort 等 。 | 參考FlinkExpandConversionRule, calcite: AbstractConverter, Relset.addAbstractConverter。 |
RelOptRule | 全部的優化規則的基類, 它構造函數第一個參數就是關係運算的類型(好比, Join, Filter 等),(還有一些別的, 不展開了)。當Planner 遍歷表達式圖的每個節點時,他會調用匹配這個節點類型的規則 。除了匹配類型,VolcanoPlanner 還會給規則設置優先級,級別高的會別先調用。每個規則有兩個函數: matches() 繼續深度判斷改規則是否真的應該調用。 onMatch 執行規則實際的動做:根據測量增、減、改變、升級表達式的節點。 優化規則有的會經過元數據提供者查詢元數據信息,從而作相應的措施。有的不會。從規則的角度來看,planner 都是無區別的。 因此除了少許的例外(好比前面提到的ExpendConvensionRule),大部分均可以被HepPlanner 和 VocanoPlanner 調用的。 |
例子有不少, 好比有名的謂詞下推,子查詢替換,join-recorder, 常量替換等。google裏搜索一下, 會不少介紹 。 想看全面的, 請參考 org.apache.calcite.rel.rules裏面的規則, 或 FlinkBatchRuleSets.scala裏面的使用的規則。 |
RelMetadataProvider RelOptCost |
前文屢次提到的統計數據提供者,他是一個能handle不一樣統計類型數據的handler的集合。好比 RelMdRowCount是提供關係運算產生的rowCount估計, RelMdDistinctRowCount 提供某個列的cardinal number 估計 。RelMdSelectivity 提供關係運算後的rowcount 原來的比例 估計。 RelOptCost是代價模型, calcite 的代價模型是對關係的行數,一般是考慮IO(disk IO + network IO) 和CPU使用率, memory, 和 關係規模(rowcount)的一個綜合衡量。 |
Calcite 裏有DefaultRelMetadataProvider 提供了各類Handle 缺省計算方法。ReflectiveRelMetadataProvider 因爲接受DPS端實現的Provider , 好比 Flink裏實現的FlinkDefaultRelMetadataProvider . 還有一個JaninoRelMetadataProvider, 看起來是經過動態編譯的生成Provider ? Handler的元數據的估計算法請閱引用5的第13章。 |
Schema Table Lattice ,Tile |
Schema和table的概念全面說過。 Lattice(格) 是除了關係、關係代數以外,另外一個來自於數學領域的名詞。看起來calcite 是真的很想提高廣大數據程序員的數學格調。 格同關係代數同樣是一種代數結構(集合+一種二元關係),集合的成員的二元關係是反自反, 和傳遞的, 並且這個關係有明確的上下界, 則稱這種代數結構爲格。 很抽象, 能夠看引用8中的哈斯圖理解 。{ x, y, z }的冪集按包含偏序排序就是一個格。 這個和多維cube聚合計算的物化視圖的結構很像。 物化視圖裏的每個頂點都是一個tile , 最上面的tile 包含了全部的維度, 最下面的維度爲空, 維度集合以及包含關係組成了一個格。 格從上到下是是降維的過程,則低維聚合計算可由高維聚合導出 。因此lattice , Tile 是爲 物化視圖引入的, 只不過換了一個文藝的名字而已 。 物化視圖是一個預計算的結果,物化在硬盤或內存裏,若是把查詢計劃裏可以利用物化視圖,執行的很定會飛快 。 |
來自引用8apache |
HepVertex HepProgram HepPlanner |
HepVertex是HepProgram裏用於組成計劃圖的頂點。HepProgram 是一些優化規則的集合,HepPlanner 利用HepVertex創建將計劃樹轉化成計劃圖, 而後利用HepProgram按照必定順序遍歷其中的優化規則。 HepPlanner 調用流程如右側代碼所示。
|
//build program val builder = new HepProgramBuilder() builder .addMatchLimit(10) .addRuleInstance(SubQueryRemoveRule.FILTER) .addRuleInstance(SubQueryRemoveRule.JOIN) .addMatchOrder(HepMatchOrder.BOTTOM_UP) val hepProgram = builder.build() //create planner val planner = new HepPlanner(hepProgram, ...) //build new operator expression graph planner.setRoot(root) planner.changeTraits(desiredTraits) //apply rules in programs to optimize the operator expression planner.findBestExp
|
Relset RelSubset VolcanoProgram ValcanoPlanner |
RelSubset 表明一個目標物理特性, 好比BATCH_PHYSICAL.Broadcast.ANY, 表明一個BATCH_PHYSICAL convention 等價 子計劃,行發佈方式是廣播, 列排序方式爲任意。BATCH_PHYSICAL.Hash.ANY,是另爲一個subet 。 Relset 是全部具備不一樣物理特性的等價的RelSubset的集合這些, 好比BATCH_PHYSICAL.Broadcast.[]和 BATCH_PHYSICAL.Hash.[]是等價的。 Relset 仍是全部等價關係的集合,好比HashExchange+SortMergeJoin,HashExchange+HashJoin, BroadcastExchange+Hash, ASC sort + SingleExchange+Hash 都是等價關係表達式。 RelSubset 會在從等價關係集合裏選擇符合自身trait的最便宜的做爲他的best 。從上到下的best組成最終的計劃圖 。 VocanoPlanner 運行流程和HepPlanner相似。
|
HepPlanner 的尋優流程很簡單,setRoot重建planGraph, findBestExp 就是按照指定的順序將HelpProgram裏的規則觸發一遍。 若是擔憂有問題貪心算法的問題,能夠將這兩步多作幾回。網絡
VocanoPlanner 的尋優流程如前所述。 TraitSet 一般包含了 Convension, distribution, collation 三個維度 , 這三個維度不一樣的基的組成的組合(subset)都是等價的可是cost不相等,但並非代價把最低的subset 輸入給上游總代價就會最低的。最低的代價須要綜合考把慮上游和下游的狀況,尋找最搭配的搭檔。因此這個尋優過程須要一個動態規劃的方式來求解。在使用動態規劃(也就是遞歸的方法, volcanoPlanner 的命名就來自這裏吧 )求解以前, 咱們須要把各類可能的計劃的每一層的知足須要trait的subset, 以及對應關係求出來。 而知足要求的subset, 並且在考慮到輸入的組合,狀態轉移公式大概以下。數據結構
BestExp(subset)) = argmin( cost(rel1 ), cost(rel2), ... ) cost (rel ) = algoCost(rel.self) + cost(BestExp(rel.input1)) + cost(BestExp(rel.input2))
第一行中,rel1, rel2 是知足traits 的等價表達式,表示當前subet的最佳表達式是他們之中裏cost最低的表達式。架構
第二行中, 表示表達式的cost 等於最上層節點自身算法代價估計 和下層輸入subset的最佳表達式向上輸入的累計的綜合代價 。TableScan 的下層輸入就是磁盤IO的代價,底層關係的RowCount相關 。這裏的加號表明代價計算要綜合考慮的因素並非簡單的算數相加 。好比hashJion和sortMergeJoin自身算法的代價不同(CPU, 內存代價), join的兩路輸入方式不一樣代價不同(IO代價)。
SELECT * from store_sales join date_dim on store_sales.ss_sold_date_sk = date_dim.d_date_sk and date_dim.d_year=2002
1 |
BestExp(join.BATCH_PHYSICAL.ANY.[]) = argmin( cost(shuffledHashjoin), cost(broadcastHashJoin), cost(shuffledSortMergeJoin),... ) cost(broadcastHashJoin)= cost(BestExp(tablenscan_store_sales.BATCH_PHYSICAL.forward.[])) + cost(BestExp(tablenscan_date_dim.BATCH_PHYSICAL.broadcast.[])) + algoCost(HashJoin)
畫個圖表示一下 relSet, relSubset, 和besExp 還有trait之間的關係吧, 還有這個相似火山噴發的形狀。
序號 | 描述 | 連接 |
1 | Calcite 論文 | dl.acm.org/doi/10.1145/3183713.3190662 |
2 | Calcite 官方文檔 | calcite.apache.org/docs/ |
3 | 關係代數 | en.wikipedia.org/wiki/Relational_algebra |
4 | 關係演算 | https://en.wikipedia.org/wiki/Relational_calculus |
5 | 數據庫系統概念第6版 Abraham Silberschatz 等著 | |
6 | 官渡之戰 | https://zh.wikipedia.org/wiki/%E5%AE%98%E6%B8%A1%E4%B9%8B%E6%88%98 |
7 | SQL on everything, in memory by Julian Hyde | www.slideshare.net/julianhyde/calcite-stratany2014 |
8 | 哈斯圖 | zh.wikipedia.org/wiki/%E5%93%88%E6%96%AF%E5%9C%96 |