Apache Calcite 簡介

1. 什麼是Apache Calcite ?

Apache Calcite 是一款開源SQL解析工具, 能夠將各類SQL語句解析成抽象語法術AST(Abstract Syntax Tree), 以後經過操做AST就能夠把SQL中所要表達的算法與關係體如今具體代碼之中。前端

Calcite的生前爲Optiq(也爲Farrago), 爲Java語言編寫, 經過十多年的發展, 在2013年成爲Apache旗下頂級項目,並還在持續發展中, 該項目的創始人爲Julian Hyde, 其擁有多年的SQL引擎開發經驗, 目前在Hortonworks工做, 主要負責Calcite項目的開發與維護。java

目前, 使用Calcite做爲SQL解析與處理引擎有Hive
、Drill、Flink、Phoenix、Phoenix和Storm,能夠確定的是還會有愈來愈多的數據處理引擎採用Caclite做爲SQL解析工具。node

2. Calcite 主要功能

總結來講Calcite有如下主要功能:git

  • SQL 解析
  • SQL 校驗
  • 查詢優化
  • SQL 生成器
  • 數據鏈接

3. Calcite 解析SQl的步驟

11825694-db566ba259960fc1.png
Calcite 解析步驟

如上圖中所述,通常來講Calcite解析SQL有如下幾步:github

  • Parser. 此步中Calcite經過Java CC將SQL解析成未經校驗的AST
  • Validate. 該步驟主要做用是校證Parser步驟中的AST是否合法,如驗證SQL scheme、字段、函數等是否存在; SQL語句是否合法等. 此步完成以後就生成了RelNode樹(關於RelNode樹, 請參考下文)
  • Optimize. 該步驟主要的做用優化RelNode樹, 並將其轉化成物理執行計劃。主要涉及SQL規則優化如:基於規則優化(RBO)及基於代價(CBO)優化; Optimze 這一步原則上來講是可選的, 經過Validate後的RelNode樹已經能夠直接轉化物理執行計劃,但現代的SQL解析器基本上都包括有這一步,目的是優化SQL執行計劃。此步獲得的結果爲物理執行計劃。
  • Execute,即執行階段。此階段主要作的是:將物理執行計劃轉化成可在特定的平臺執行的程序。如Hive與Flink都在在此階段將物理執行計劃CodeGen生成相應的可執行代碼。

4. Calcite相關組件

Calcite主要有如下概念:算法

  • Catalog: 主要定義SQL語義相關的元數據與命名空間。
  • SQL parser: 主要是把SQL轉化成AST.
  • SQL validator: 經過Catalog來校證AST.
  • Query optimizer: 將AST轉化成物理執行計劃、優化物理執行計劃.
  • SQL generator: 反向將物理執行計劃轉化成SQL語句.

4.1 catagory

Catalog:主要定義被SQL訪問的命名空間,主要包括如下幾點:sql

  1. schema: 主要定義schema與表的集合,schame 並非強制必定須要的,好比說有兩張同名的表T1, T2,就須要schema要區分這兩張表,如A.T1, B.T1
  2. 表:對應關係數據庫的表,表明一類數據,在calcite中由RelDataType定義
  3. RelDataType 表明表的數據定義,如表的數據列名稱、類型等。

Schema:數據庫

public interface Schema {
  
  Table getTable(String name);

  Set<String> getTableNames();

  Set<String> getFunctionNames();

  Schema getSubSchema(String name);

  Set<String> getSubSchemaNames();
  
  Expression getExpression(SchemaPlus parentSchema, String name);
  
  boolean isMutable();

Table:apache

public interface Table {
  
  RelDataType getRowType(RelDataTypeFactory typeFactory);

  Statistic getStatistic();
  
  Schema.TableType getJdbcTableType();
}

其中RelDataType表明Row的數據類型, Statistic 用於統計表的相關數據、特別是在CBO用於計表計算表的代價。編程

一句Sql

selcct id, name, cast(age as bigint) from A.INFO
  • id, name則爲data type field
  • bigint爲 data type
  • A 爲schema
  • INFO 爲表

4.2 SQL Parser

由Java CC編寫,將SQL轉化成AST.

  • Java CC 指的是Java Compiler Compiler, 能夠將一種特定域相關的語言轉化成Java語言
  • 在Calcite中將標記(Token)表示爲 SqlNode, 而且Sqlnode能夠經過unparse方法反向轉化成SQL
cast(id as float)

Java CC 可表示爲

<CAST>
<LPAREN>
e = Expression(ExprContext.ACCEPT_SUBQUERY)
<AS>
dt = DataType() {agrs.add(dt);}
<RPAREN>
....

4.3 Query Optimizer

首先看一下

INSERT INTO tmp_node
SELECT s1.id1, s1.id2, s2.val1
FROM source1 as s1 INNER JOIN source2 AS s2
ON s1.id1 = s2.id1 and s1.id2 = s2.id2 where s1.val1 > 5 and s2.val2 = 3;

經過Calcite轉化爲:

LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
  LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
    LogicalFilter(condition=[AND(>($2, 5), =($8, 3))])
      LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[INNER])
        LogicalTableScan(table=[[SOURCE1]])
        LogicalTableScan(table=[[SOURCE2]])

是未經優化的RelNode樹,能夠發現最底層是TableScan,也是讀取表的原始數據,緊接着是LogicalJoin,Joiner的類型爲INNER JOIN, LogicalJoin以後接下作LogicalFilter 操做,對應SQL中的WHERE條件,最後作Project也就是投影操做。

可是咱們能夠觀察到對於INNER JOIN而言, WHERE 條件是能夠下推,如

LogicalTableModify(table=[[TMP_NODE]], operation=[INSERT], flattened=[false])
  LogicalProject(ID1=[$0], ID2=[$1], VAL1=[$7])
      LogicalJoin(condition=[AND(=($0, $5), =($1, $6))], joinType=[inner])
        LogicalFilter(condition=[=($4, 3)])  
          LogicalProject(ID1=[$0], ID2=[$1],      ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
            LogicalTableScan(table=[[SOURCE1]])
        LogicalFilter(condition=[>($3,5)])    
          LogicalProject(ID1=[$0], ID2=[$1], ID3=[$2], VAL1=[$3], VAL2=[$4],VAL3=[$5])
            LogicalTableScan(table=[[SOURCE2]])

這樣能夠減小JOIN的數據量,提升SQL效率

實際過程當中能夠將JOIN 的中條件下推以較少Join的數據量

INSERT INTO tmp_node
SELECT s1.id1, s1.id2, s2.val1
FROM source1 as s1 LEFT JOIN source2 AS s2
ON s1.id1 = s2.id1 and s1.id2 = s2.id2 and s1.id3 = 5

s1.id3 = 5 這個條件能夠先下推過濾s1中的數據, 但在特定場景下,有些不能下推,以下sql:

INSERT INTO tmp_node
SELECT s1.id1, s1.id2, s2.val1
FROM source1 as s1 LEFT JOIN source2 AS s2
ON s1.id1 = s2.id1 and s1.id2 = s2.id2 and s2.id3 = 5

若是s1,s2是流式表(動態表,請參考Flink流式概念)的話,就不能下推,由於s1下推的話,因爲過濾後沒有數據驅動join操做,於是得不到想要的結果(詳見Flink/Sparking-Streaming)

那接下來咱們可能有一個疑問,在什麼狀況下能夠作相似下推、上推操做,又是根據什麼原則進行的呢?以下圖所示

11825694-685fac45369e303c.png
不一樣的JOIN順序
T1 JOIN T2 JOIN T3

相似於此種狀況JOIN的順序是上圖的前者仍是後者?這就涉及到Optimizer所使用的方法,Optimizer主要目的就是減少SQL所處理的數據量、減小所消耗的資源並最大程度提升SQL執行效率如:剪掉無用的列、合併投影、子查詢轉化成JOIN、JOIN重排序、下推投影、下推過濾等。目前主要有兩類優化方法:基於語法(RBO)與基於代價(CBO)的優化

  1. RBO(Rule Based Optimization)

通俗一點的話就是事先定義一系列的規則,而後根據這些規則來優化執行計劃。

  • ProjectFilterRule

    此Rule的使用場景爲Filter在Project之上,能夠將Filter下推。假如某一個RelNode樹

LogicalFilter
      LogicalProject
        LogicalTableScan

則可優化成

LogicalProject
      LogicalFilter
        LogicalTableScan
  • FilterJoinRule

    此Rule的使用場景爲Filter在Join之上,能夠先作Filter而後再作Join, 以減小Join的數量

等等,還有不少相似的規則。但RBO必定程度上是經驗試的優化方法,沒法有一個公式上的判斷哪一種優化更優。 在Calcite中實現方法爲 HepPlanner

  1. CBO(Cost Based Optimization)

通俗一點的說法是:經過某種算法計算SQL全部可能的執行計劃的「代價」,選擇某一個代價較低的執行計劃,如上文中三張表做JOIN, 通常來講RBO沒法判斷哪一種執行計劃優化更好,只有分別計算每一種JOIN方法的代價。

Calcite會將每一種操做(如LogicaJoin、LocialFilter、 LogicalProject、LogicalScan) 結合實際的Schema轉化成具體的代價數,比較不一樣的執行計劃所具備的代價,而後選擇相對小計劃做爲最終的結果,之因此說相對小,這是由於若是要徹底遍歷計算全部可能的代價可能得不償失,花費更多的人力與資源,所以只是說選擇相對最優的執行計劃。CBO目的是「避免使用最差的執行計劃,而不是找到最好的」

目前Calcite中就是採用CBO進行優化,實現方法爲VolcanoPlanner,有關此算法的具體內容能夠參考原碼

5. 如何使用Calcite

因爲Calcite是Java語言編寫,所以只須要在工程或項目中引入相應的Jar包便可,下面爲一個能夠運行的例子:

public class TestOne {
    public static class TestSchema {
        public final Triple[] rdf = {new Triple("s", "p", "o")};
    }

    public static void main(String[] args) {
        SchemaPlus schemaPlus = Frameworks.createRootSchema(true);
        
        //給schema T中添加表
        schemaPlus.add("T", new ReflectiveSchema(new TestSchema()));
        Frameworks.ConfigBuilder configBuilder = Frameworks.newConfigBuilder();
        //設置默認schema
        configBuilder.defaultSchema(schemaPlus);

        FrameworkConfig frameworkConfig = configBuilder.build();

        SqlParser.ConfigBuilder paresrConfig = SqlParser.configBuilder(frameworkConfig.getParserConfig());
        
        //SQL 大小寫不敏感
        paresrConfig.setCaseSensitive(false).setConfig(paresrConfig.build());

        Planner planner = Frameworks.getPlanner(frameworkConfig);

        SqlNode sqlNode;
        RelRoot relRoot = null;
        try {
            //parser階段
            sqlNode = planner.parse("select \"a\".\"s\", count(\"a\".\"s\") from \"T\".\"rdf\" \"a\" group by \"a\".\"s\"");
            //validate階段
            planner.validate(sqlNode);
            //獲取RelNode樹的根
            relRoot = planner.rel(sqlNode);
        } catch (Exception e) {
            e.printStackTrace();
        }

        RelNode relNode = relRoot.project();
        System.out.print(RelOptUtil.toString(relNode));
    }
}

類Triple 對應的表定義:

public class Triple {
    public String s;
    public String p;
    public String o;

    public Triple(String s, String p, String o) {
        super();
        this.s = s;
        this.p = p;
        this.o = o;
    }

}

詳細能夠代碼在這裏

6. Calcite 其它方面

Calcite的功能遠不止以上介紹,除了標準SQL的,還支持如下內容:

  • 對流相對概念支持,如在SQL層面支持Window概念,如Session Window, Hopping Window等。
  • 支持物化視圖等複雜概念。
  • 獨立於編程語言和數據源,能夠支持不一樣的前端和後端。

7. 總結

以上內容主要介紹上Calcite相關概念並經過相例子說明了Calcite使用方法, 但願經過上述內容,讀者能對Calcite有初步的瞭解。

因爲筆者使用和探索Calcite時間也不長,以上內容不免有錯誤與不許確之處,還望各位讀者不吝指正,相互學習。

參考文獻與網址:

  1. http://hbasefly.com/2017/05/04/bigdata%EF%BC%8Dcbo/
  2. http://www.infoq.com/cn/articles/new-big-data-hadoop-query-engine-apache-calcite