Flink table&Sql中使用Calcite

 

Apache Calcite是什麼東東

Apache Calcite面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和鏈接各類數據源的能力。除此以外,Calcite還提供了OLAP和流處理的查詢引擎。它2013年成爲了Apache孵化項目以來,在Hadoop中愈來愈引人注目,並被衆多項目集成。好比Flink/Storm/Drill/Phoenix都依賴它作sql解析和優化。html

Flink 結合 Calcite

Flink Table API&SQL 爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Calcite的查詢優化框架和SQL parser。該設計是基於Flink已構建好的API構建的,DataStream API 提供低延時高吞吐的流處理能力並且就有exactly-once語義並且能夠基於event-time進行處理。並且DataSet擁有穩定高效的內存算子和流水線式的數據交換。Flink的core API和引擎的全部改進都會自動應用到Table API和SQL上。
一條stream sql從提交到calcite解析、優化最後到flink引擎執行,通常分爲如下幾個階段:java

1 1. Sql Parser: 將sql語句經過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
2 2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
3 3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
4 4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan,
5 再基於flink定製的一些優化rules去優化logical Plan;
6 5. 生成Flink PhysicalPlan: 這裏也是基於flink裏頭的rules將,將optimized LogicalPlan轉成成Flink的物理執行計劃;
7 6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各類算子。

而若是是經過table api來提交任務的話,也會通過calcite優化等階段,基本流程和直接運行sql相似:sql

1. table api parser: flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式;
在這棵樹上的每一個節點的計算邏輯用Expression來表示。
2. Validate: 會結合數字字典(catalog)將樹的每一個節點的Unresolved Expression進行綁定,生成Resolved Expression;
3. 生成Logical Plan: 依次遍歷數的每一個節點,調用construct方法將原先用treeNode表達的節點轉成成用calcite 內部的數據結構relNode 來表達。即生成了LogicalPlan, 用relNode表示;
4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan,
再基於flink定製的一些優化rules去優化logical Plan;
5. 生成Flink PhysicalPlan: 這裏也是基於flink裏頭的rules將,將optimized LogicalPlan轉成成Flink的物理執行計劃;
6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各類算子。

因此在flink提供兩種API進行關係型查詢,Table API 和 SQL。這兩種API的查詢都會用包含註冊過的Table的catalog進行驗證,除了在開始階段從計算邏輯轉成logical plan有點差異之外,以後都差很少。同時在stream和batch的查詢看起來也是徹底同樣。只不過flink會根據數據源的性質(流式和靜態)使用不一樣的規則進行優化, 最終優化後的plan轉傳成常規的Flink DataSet 或 DataStream 程序。因此咱們下面統一用table api來舉例講解flink是如何用calcite作解析優化,再轉換成回DataStream。編程

Table api任務的解析執行過程

Table Example

 1 // set up execution environment 
 2 val env = StreamExecutionEnvironment.getExecutionEnvironment
 3 val tEnv = TableEnvironment.getTableEnvironment(env) 
 4 //定義數據源 
 5 val dataStream = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) 
 6 //將DataStream 轉換成 table,就是將數據源在TableEnvironment中註冊成表 
 7 val orderA = dataStream.toTable(tEnv) 
 8 //用table api執行業務邏輯, 生成tab裏頭包含了flink 本身的logicalPlan,用LogicalNode表示 
 9 val tab = orderA.groupBy('user).select('user, 'amount.sum) 
10       .filter('user < 2L) 
11 //將table轉成成DataStream, 這裏頭就是涉及到咱們calcite邏輯計劃生成 
12 // 優化、轉成可可執行的flink 算子等過程 
13 val result = tab.toDataStream[Order]

將數據源註冊成表

將DataStream 轉換成table的過程,其實就是將DataStream在TableEnvironment中註冊成表的過程當中,主要是經過調用tableEnv.fromDataStream方法完成。api

1 // 生成一個惟一性表名 val name = createUniqueTableName() 
2 //生成表的 scheme val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType) 
3 //傳入dataStream, 建立calcite能夠識別的表 
4 val dataStreamTable = new DataStreamTable[T]( 
5       dataStream, 
6       fieldIndexes, 
7       fieldNames, None, None ) 
8 //在數字字典裏頭註冊該表 registerTableInternal(name, dataStreamTable)

上面函數實現的最後會調用scan,這裏頭會建立一個CatalogNode對象,裏頭攜帶了能夠查找到數據源的表路徑。其實它是Flink 邏輯樹上的一個葉節點。數據結構

生成Flink 自身的邏輯計劃

1 val tab = orderA.groupBy('user).select('user, 'amount.sum)
2       .filter('user < 2L) 

上面每次調用table api,就會生成Flink 邏輯計劃的節點。好比grouBy和select的調用會生成節點Project、Aggregate、Project,而filter的調用會生成節點Filter。這些節點的邏輯關係,就會組成下圖的一個Flink 自身數據結構表達的一顆邏輯樹:app

由於這個例子很簡單,節點都沒有兩個子節點。這裏的實現可能有的人會奇怪,filter函數的形參類型是Expression,而咱們傳進去的是"'user<2L",是否是不對呀? 其實這是scala比較牛逼的特性:隱式轉換,這些傳遞的表達式會先自動轉換成Expression。這些隱式轉換的定義基本都在接口類ImplicitExpressionOperations裏頭。其中user前面定義的'符號,則scala會將user字符串轉化成Symbol類型。經過隱式轉換"'user<2L"表示式會生成一個LessThan對象,它會有兩個孩子Expression,分別是UnresolvedFieldReference("user")和Liter("2")。這個LessThan對象會做爲Filter對象的condition。框架

Flink 自身的邏輯計劃 轉換成calcite可識別的邏輯計劃

根據上面分析咱們只是生成了Flink的 logical Plan,咱們必須將它轉換成calcite的logical Plan,這樣咱們才能用到calcite強大的優化規則。在Flink裏頭會由上往下一次調用各個節點的construct方法,將Flink節點轉換成calcite的RelNode節點。ide

 1 //-----Filter的construct建立Calcite 的 LogicalFilter節點---- 
 2     //先遍歷子節點 
 3     child.construct(relBuilder) 
 4     //建立LogicalFilter 
 5     relBuilder.filter(condition.toRexNode(relBuilder)) 
 6     
 7 //-----Project的construct建立Calcite的LogicalProject節點---- 
 8    //先遍歷子節點 
 9     child.construct(relBuilder) 
10    //建立LogicalProject 
11     relBuilder.project( 
12       projectList.map(_.toRexNode(relBuilder)).asJava, 
13       projectList.map(_.name).asJava, 
14       true) 
15       
16 //-----Aggregate的construct建立Calcite的LogicalAggregate節點---- 
17     child.construct(relBuilder) 
18     relBuilder.aggregate( 
19   relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava), 
20       aggregateExpressions.map { 
21         case Alias(agg: Aggregation, name, _) => agg.toAggCall(name)(relBuilder) 
22         case _ => throw new RuntimeException("This should never happen.") 
23       }.asJava) 
24   
25 //-----CatalogNode的construct建立Calcite的LogicalTableScan節點---- 
26     relBuilder.scan(tablePath.asJava)

經過以上轉換後,就生成了Calcite邏輯計劃:函數

優化邏輯計劃並轉換成Flink的物理計劃

這部分實現Flink統一封裝在optimize方法裏頭,這個方法具體的實現以下:

 1 // 去除關聯子查詢 
 2     val decorPlan = RelDecorrelator.decorrelateQuery(relNode) 
 3     // 轉換time的標識符,好比存在rowtime標識的話,咱們將會引入TimeMaterializationSqlFunction operator, 
 4     //這個operator咱們會在codeGen中會用到 
 5     val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder) 
 6     // 規範化logica計劃,好比一個Filter它的過濾條件都是true的話,那麼咱們能夠直接將這個filter去掉 
 7     val normRuleSet = getNormRuleSet
 8     val normalizedPlan = if (normRuleSet.iterator().hasNext) { 
 9       runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet) 
10     } else { 
11       convPlan
12     } 
13     // 優化邏輯計劃,調整節點間的上下游到達優化計算邏輯的效果,同時將 
14     //節點轉換成派生於FlinkLogicalRel的節點 
15     val logicalOptRuleSet = getLogicalOptRuleSet
16     //用FlinkConventions.LOGICAL替換traitSet,表示轉換後的樹節點要求派生與接口 
17     // FlinkLogicalRel 
18     val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify() 
19     val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) { 
20       runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps) 
21     } else { 
22       normalizedPlan
23     } 
24     // 將優化後的邏輯計劃轉換成Flink的物理計劃,同時將 
25     //節點轉換成派生於DataStreamRel的節點 
26     val physicalOptRuleSet = getPhysicalOptRuleSet
27     val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify() 
28     val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) { 
29       runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps) 
30     } else { 
31       logicalPlan
32     }

這段涉及到多個階段,每一個階段無非都是用Rule對邏輯計劃進行優化和改進。每一個Rule的邏輯你們本身去看,若是我想本身自定義一個Rule該如何作呢?首先聲明定義於派生RelOptRule的一個類,而後再構造函數中要求傳入RelOptRuleOperand對象,該對象須要傳入你這個Rule將要匹配的節點類型。若是你的自定義的Rule只用於LogicalTableScan節點,那麼你這個operand對象應該是operand(LogicalTableScan.class, any())。就像這樣同樣

 1 public class TableScanRule extends RelOptRule { 
 2   //~ Static fields/initializers --------------------------------------------- 
 3   public static final TableScanRule INSTANCE = new TableScanRule(); 
 4   //~ Constructors ----------------------------------------------------------- 
 5   private TableScanRule() { 
 6     super(operand(LogicalTableScan.class, any())); 
 7   } 
 8   //默認返回True, 能夠繼承matches,裏面實現邏輯是判斷是否進行轉換調用onMatch 
 9   @Override 
10   public boolean matches(RelOptRuleCall call) { 
11     return super.matches(call); 
12   } 
13   //~ Methods ---------------------------------------------------------------- 
14   //對當前節點進行轉換 
15   public void onMatch(RelOptRuleCall call) { 
16     final LogicalTableScan oldRel = call.rel(0); 
17     RelNode newRel = 
18         oldRel.getTable().toRel( 
19             RelOptUtil.getContext(oldRel.getCluster())); 
20     call.transformTo(newRel); 
21   } 
22 }
經過以上代碼對邏輯計劃進行了優化和轉換,最後會將邏輯計劃的每一個節點轉換成Flink Node,既可物理計劃。整個轉換過程最後的結果以下:
1 == Optimized pyhical Plan == DataStreamGroupAggregate(groupBy=[user], select=[user, SUM(amount) AS TMP_0])  
2         
3 DataStreamCalc(select=[user, amount], where=[<(user, 2)]) 
4         
5 DataStreamScan(table=[[_DataStreamTable_0]]) 

咱們發現Filter節點在樹結構中下移了,這樣對數據進行操做時如今過濾再作聚合,能夠減小計算量。

生成Flink 能夠執行的計劃

這一塊只要是遞歸調用各個節點DataStreamRel的translateToPlan方法,這個方法轉換和利用CodeGen元編程成Flink的各類算子。如今就至關於咱們直接利用Flink的DataSet或DataStream API開發的程序。整個流程的轉換大致就像這樣:

1 == Physical Execution Plan == 
2 Stage 1 : Data Source
3     content : collect elements with CollectionInputFormat
4 Stage 2 : Operator content : from: (user, product, amount) 
5         ship_strategy : REBALANCE 
6 Stage 3 : Operator content : where: (<(user, 2)), select: (user, amount) 
7             ship_strategy : FORWARD 
8 Stage 4 : Operator content : groupBy: (user), select: (user, SUM(amount) AS TMP_0) 
9                 ship_strategy : HASH

總結

不過這個樣例中忽略了流處理中最有趣的部分:window aggregate 和 join。這些操做如何用SQL表達呢?Apache Calcite社區提出了一個proposal來討論SQL on streams的語法和語義。社區將Calcite的stream SQL描述爲標準SQL的擴展而不是另外的 SQL-like語言。這有不少好處,首先,熟悉SQL標準的人可以在不學習新語法的狀況下分析流數據。靜態表和流表的查詢幾乎相同,能夠輕鬆地移植。此外,能夠同時在靜態表和流表上進行查詢,這和flink的願景是同樣的,將批處理看作特殊的流處理(批看做是有限的流)。最後,使用標準SQL進行流處理意味着有不少成熟的工具支持

 

此文轉載自http://blog.chinaunix.net/uid-29038263-id-5765791.html,感謝。

相關文章
相關標籤/搜索