[源碼分析] 帶你梳理 Flink SQL / Table API內部執行流程

[源碼分析] 帶你梳理 Flink SQL / Table API內部執行流程

0x00 摘要

本文將簡述Flink SQL / Table API的內部實現,爲你們把 "從SQL語句到具體執行" 這個流程串起來。而且儘可能多提供調用棧,這樣你們在遇到問題時就知道應該從什麼地方設置斷點,對總體架構理解也能更加深刻。html

SQL流程中涉及到幾個重要的節點舉例以下:java

// NOTE : 執行順序是從上至下, " -----> " 表示生成的實例類型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules      
*    VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
*        |      
*        |     
*    StreamExecJoin.translateToPlanInternal  // 做用是生成 StreamOperator, 即Flink算子  
*        |     
*        |     
*        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask   
*        |     
*        |       
*    StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask調用StreamingJoinOperator,真實的執行 
*        |
*        |

後續咱們會以這個圖爲脈絡進行講解node

0x01 Apache Calcite

Flink Table API&SQL 爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Apache Calcite的查詢優化框架和SQL parser。mysql

爲何Flink要使用Table API呢?總結來講,關係型API的好處以下:算法

  • 關係型API是聲明式的
  • 查詢可以被有效的優化
  • 查詢能夠高效的執行
  • 「Everybody」 knows SQL

Calcite是這裏面的核心成員。Apache Calcite是面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和鏈接各類數據源的能力。sql

1. Calcite 概念

下面是 Calcite 概念梳理:數據庫

  • 關係代數(Relational algebra):即關係表達式。它們一般以動詞命名,例如 Sort, Join, Project, Filter, Scan, Sample.
  • 表達式有各類特徵(Trait):使用 Trait 的 satisfies() 方法來測試某個表達式是否符合某 Trait 或 Convention.
  • 規則(Rules):用於將一個表達式轉換(Transform)爲另外一個表達式。它有一個由 RelOptRuleOperand 組成的列表來決定是否可將規則應用於樹的某部分。
  • 規劃器(Planner) :即請求優化器,它能夠根據一系列規則和成本模型(例如基於成本的優化模型 VolcanoPlanner、啓發式優化模型 HepPlanner)來將一個表達式轉爲語義等價(但效率更優)的另外一個表達式。
  • RelNode :表明了對數據的一個處理操做,常見的操做有 Sort、Join、Project、Filter、Scan 等。它蘊含的是對整個 Relation 的操做,而不是對具體數據的處理邏輯。RelNode 會標識其 input RelNode 信息,這樣就構成了一棵 RelNode 樹。
  • RexNode : 行表達式(標量表達式),蘊含的是對一行數據的處理邏輯。每一個行表達式都有數據的類型。這是由於在 Valdiation 的過程當中,編譯器會推導出表達式的結果類型。常見的行表達式包括字面量 RexLiteral, 變量 RexVariable,函數或操做符調用 RexCall 等。RexNode 經過 RexBuilder 進行構建。
  • RelTrait : 用來定義邏輯表的物理相關屬性(physical property),三種主要的 trait 類型是:Convention、RelCollation、RelDistribution;

2. Calcite 處理流程

Sql 的執行過程通常能夠分爲四個階段,Calcite 與這個很相似,但Calcite是分紅五個階段 :express

  1. SQL 解析階段,生成AST(抽象語法樹)(SQL–>SqlNode)apache

  2. SqlNode 驗證(SqlNode–>SqlNode)編程

  3. 語義分析,生成邏輯計劃(Logical Plan)(SqlNode–>RelNode/RexNode)

  4. 優化階段,按照相應的規則(Rule)進行優化(RelNode–>RelNode)

  5. 生成ExecutionPlan,生成物理執行計劃(DataStream Plan)

1. Flink關係型API執行原理

Flink承載了 Table API 和 SQL API 兩套表達方式。它以Apache Calcite這個SQL解析器作SQL語義解析,統一輩子成爲 Calcite Logical Plan(SqlNode 樹);隨後驗證;再利用 Calcite的優化器優化轉換規則和logical plan,根據數據源的性質(流和批)使用不一樣的規則進行優化,優化爲 RelNode 邏輯執行計劃樹;最終優化後的plan轉成常規的Flink DataSet 或 DataStream 程序。任何對於DataStream API和DataSet API的性能調優提高都可以自動地提高Table API或者SQL查詢的效率。

一條stream sql從提交到calcite解析、優化最後到Flink引擎執行,通常分爲如下幾個階段:

  1. Sql Parser: 將sql語句經過java cc解析成AST(語法樹),在calcite中用SqlNode表示AST;
  2. Sql Validator: 結合數字字典(catalog)去驗證sql語法;
  3. 生成Logical Plan: 將sqlNode表示的AST轉換成LogicalPlan, 用relNode表示;
  4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan, 再基於Flink定製的一些優化rules去優化logical Plan;
  5. 生成Flink PhysicalPlan: 這裏也是基於Flink裏頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃;
  6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各類算子。

而若是是經過table api來提交任務的話,也會通過calcite優化等階段,基本流程和直接運行sql相似:

  1. table api parser: Flink會把table api表達的計算邏輯也表示成一顆樹,用treeNode去表式;
    在這棵樹上的每一個節點的計算邏輯用Expression來表示。
  2. Validate: 會結合數字字典(catalog)將樹的每一個節點的Unresolved Expression進行綁定,生成Resolved Expression;
  3. 生成Logical Plan: 依次遍歷數的每一個節點,調用construct方法將原先用treeNode表達的節點轉成成用calcite 內部的數據結構relNode 來表達。即生成了LogicalPlan, 用relNode表示;
  4. 生成 optimized LogicalPlan: 先基於calcite rules 去優化logical Plan,
    再基於Flink定製的一些優化rules去優化logical Plan;
  5. 生成Flink PhysicalPlan: 這裏也是基於Flink裏頭的rules,將optimized LogicalPlan轉成成Flink的物理執行計劃;
  6. 將物理執行計劃轉成Flink ExecutionPlan: 就是調用相應的tanslateToPlan方法轉換和利用CodeGen元編程成Flink的各類算子。

能夠看出來,Table API 與 SQL 在獲取 RelNode 以後是同樣的流程,只是獲取 RelNode 的方式有所區別:

  • Table API :經過使用 RelBuilder來拿到RelNode(LogicalNode與Expression分別轉換成RelNode與RexNode);
  • SQL :經過使用Planner。首先經過parse方法將用戶使用的SQL文本轉換成由SqlNode表示的parse tree。接着經過validate方法,使用元信息來resolve字段,肯定類型,驗證有效性等等。最後經過rel方法將SqlNode轉換成RelNode;

1. TableEnvironment對象

TableEnvironment對象是Table API和SQL集成的一個核心,支持如下場景:

  • 註冊一個Table。
  • 將一個TableSource註冊給TableEnvironment,這裏的TableSource指的是將數據存儲系統的做爲Table,例如mysql,hbase,CSV,Kakfa,RabbitMQ等等。
  • 註冊一個外部的catalog,能夠訪問外部系統的數據或文件。
  • 執行SQL查詢。
  • 註冊一個用戶自定義的function。
  • 將DataStream或DataSet轉成Table。

一個查詢中只能綁定一個指定的TableEnvironment,TableEnvironment能夠經過來配置TableConfig來配置,經過TableConfig能夠自定義查詢優化以及translation的進程。

TableEnvironment執行過程以下:

  • TableEnvironment.sql()爲調用入口;

  • Flink實現了FlinkPlannerImpl,執行parse(sql),validate(sqlNode),rel(sqlNode)操做;

  • 生成Table;

具體代碼摘要以下

package org.apache.Flink.table.api.internal;

@Internal
public class TableEnvironmentImpl implements TableEnvironment {
	private final CatalogManager catalogManager;
	private final ModuleManager moduleManager;
	private final OperationTreeBuilder operationTreeBuilder;
	private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();

	protected final TableConfig tableConfig;
	protected final Executor execEnv;
	protected final FunctionCatalog functionCatalog;
	protected final Planner planner;
	protected final Parser parser;  
}  

// 在程序中打印類內容以下

this = {StreamTableEnvironmentImpl@4701} 
 functionCatalog = {FunctionCatalog@4702} 
 scalaExecutionEnvironment = {StreamExecutionEnvironment@4703} 
 planner = {StreamPlanner@4704} 
  config = {TableConfig@4708} 
  executor = {StreamExecutor@4709} 
  PlannerBase.config = {TableConfig@4708} 
  functionCatalog = {FunctionCatalog@4702} 
  catalogManager = {CatalogManager@1250} 
  isStreamingMode = true
  plannerContext = {PlannerContext@4711} 
  parser = {ParserImpl@4696} 
 catalogManager = {CatalogManager@1250} 
 moduleManager = {ModuleManager@4705} 
 operationTreeBuilder = {OperationTreeBuilder@4706} 
 bufferedModifyOperations = {ArrayList@4707}  size = 0
 tableConfig = {TableConfig@4708} 
 execEnv = {StreamExecutor@4709} 
 TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4702} 
 TableEnvironmentImpl.planner = {StreamPlanner@4704} 
 parser = {ParserImpl@4696} 
 registration = {TableEnvironmentImpl$1@4710}

2. Catalog

Catalog – 定義元數據和命名空間,包含 Schema(庫),Table(表),RelDataType(類型信息)。

全部對數據庫和表的元數據信息都存放在Flink CataLog內部目錄結構中,其存放了Flink內部全部與Table相關的元數據信息,包括表結構信息/數據源信息等。

// TableEnvironment裏面包含一個CatalogManager
public final class CatalogManager {
	// A map between names and catalogs.
	private Map<String, Catalog> catalogs;  
} 

// Catalog接口
public interface Catalog {
  ......
  	default Optional<TableFactory> getTableFactory() {
		return Optional.empty();
	}
  ......
}   

// 當數據來源是在程序裏面自定義的時候,對應是GenericInMemoryCatalog
public class GenericInMemoryCatalog extends AbstractCatalog {
	public static final String DEFAULT_DB = "default";
	private final Map<String, CatalogDatabase> databases;
	private final Map<ObjectPath, CatalogBaseTable> tables;
	private final Map<ObjectPath, CatalogFunction> functions;
	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;

	private final Map<ObjectPath, CatalogTableStatistics> tableStats;
	private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
	private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
}  

// 程序中調試的內容  

catalogManager = {CatalogManager@4646} 
 catalogs = {LinkedHashMap@4652}  size = 1
  "default_catalog" -> {GenericInMemoryCatalog@4659} 
   key = "default_catalog"
    value = {char[15]@4668} 
    hash = 552406043
   value = {GenericInMemoryCatalog@4659} 
    databases = {LinkedHashMap@4660}  size = 1
    tables = {LinkedHashMap@4661}  size = 0
    functions = {LinkedHashMap@4662}  size = 0
    partitions = {LinkedHashMap@4663}  size = 0
    tableStats = {LinkedHashMap@4664}  size = 0
    tableColumnStats = {LinkedHashMap@4665}  size = 0
    partitionStats = {LinkedHashMap@4666}  size = 0
    partitionColumnStats = {LinkedHashMap@4667}  size = 0
    catalogName = "default_catalog"
    defaultDatabase = "default_database"
 temporaryTables = {HashMap@4653}  size = 2
 currentCatalogName = "default_catalog"
 currentDatabaseName = "default_database"
 builtInCatalogName = "default_catalog"

3. StreamPlanner

StreamPlanner是新的Blink Planner一種。

Flink Table 的新架構實現了查詢處理器的插件化,社區完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶能夠自行選擇使用 Old Planner 仍是 Blink Planner。

在模型上,Old Planner 沒有考慮流計算做業和批處理做業的統一,針對流計算做業和批處理做業的實現不盡相同,在底層會分別翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 將批數據集看做 bounded DataStream (有界流式數據) ,流計算做業和批處理做業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,二者共用了大部分代碼,共享了不少優化邏輯。 Old Planner 針對批處理和流計算的代碼實現的是徹底獨立的兩套體系,基本沒有實現代碼和優化邏輯複用。

除了模型和架構上的優勢外,Blink Planner 沉澱了許多實用功能,集中在三個方面:

  • Blink Planner 對代碼生成機制作了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的數據傾斜優化等新功能。
  • Blink Planner 的優化策略是基於公共子圖的優化算法,包含了基於成本的優化(CBO)和基於規則的優化(CRO)兩種策略,優化更爲全面。同時,Blink Planner 支持從 catalog 中獲取數據源的統計信息,這對CBO優化很是重要。
  • Blink Planner 提供了更多的內置函數,更標準的 SQL 支持,在 Flink 1.9 版本中已經完整支持 TPC-H ,對高階的 TPC-DS 支持也計劃在下一個版本實現。

具體對應代碼來看,StreamPlanner體如今translateToPlan會調用到不一樣的 StreamOperator 生成系統上。

class StreamPlanner(
    executor: Executor,
    config: TableConfig,
    functionCatalog: FunctionCatalog,
    catalogManager: CatalogManager)
  extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {

  override protected def translateToPlan(
      execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
    execNodes.map {
      case node: StreamExecNode[_] => node.translateToPlan(this)
      case _ =>
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
          "This is a bug and should not happen. Please file an issue.")
    }
  }
}

@Internal
public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl implements StreamTableEnvironment {

	private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) {
    
    // 在轉換回DataStream時候進行調用 planner 生成plan的操做。
    
		List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

		Transformation<T> transformation = getTransformation(table, transformations);

		executionEnvironment.addOperator(transformation);
		return new DataStream<>(executionEnvironment, transformation);
	}
}

// 程序中調試打印的運行棧 

translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
map:234, TraversableLike$class (scala.collection)
map:104, AbstractTraversable (scala.collection)
translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)

4. FlinkPlannerImpl

Flink實現了FlinkPlannerImpl,作爲和Calcite 聯繫的橋樑,執行parse(sql),validate(sqlNode),rel(sqlNode)操做。

class FlinkPlannerImpl(
    config: FrameworkConfig,
    catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
    typeFactory: FlinkTypeFactory,
    cluster: RelOptCluster) {

  val operatorTable: SqlOperatorTable = config.getOperatorTable
  val parser: CalciteParser = new CalciteParser(config.getParserConfig)
  val convertletTable: SqlRexConvertletTable = config.getConvertletTable
  val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig
}

// 這裏會有使用 FlinkPlannerImpl
public class ParserImpl implements Parser {
	private final CatalogManager catalogManager;
	private final Supplier<FlinkPlannerImpl> validatorSupplier;
	private final Supplier<CalciteParser> calciteParserSupplier;
  
	@Override
	public List<Operation> parse(String statement) {
		CalciteParser parser = calciteParserSupplier.get();
    // 這裏會有使用 FlinkPlannerImpl
		FlinkPlannerImpl planner = validatorSupplier.get();
		// parse the sql query
		SqlNode parsed = parser.parse(statement);
		Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
		return Collections.singletonList(operation);
	}  
}

// 程序中調試的內容  

planner = {FlinkPlannerImpl@4659} 
 config = {Frameworks$StdFrameworkConfig@4685} 
 catalogReaderSupplier = {PlannerContext$lambda@4686} 
 typeFactory = {FlinkTypeFactory@4687} 
 cluster = {FlinkRelOptCluster@4688} 
 operatorTable = {ChainedSqlOperatorTable@4689} 
 parser = {CalciteParser@4690} 
 convertletTable = {StandardConvertletTable@4691} 
 sqlToRelConverterConfig = {SqlToRelConverter$ConfigImpl@4692} 
 validator = null
   
// 程序調用棧之一   
   
validate:104, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
  
// 程序調用棧之二       
  
rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)

5. Table 和 TableImpl

從代碼中能看出,這就是個把各類相關操做和信息封裝起來類而已,並不涉及太多實際邏輯。

@Internal
public class TableImpl implements Table {

	private static final AtomicInteger uniqueId = new AtomicInteger(0);

	private final TableEnvironment tableEnvironment;
	private final QueryOperation operationTree;
	private final OperationTreeBuilder operationTreeBuilder;
	private final LookupCallResolver lookupResolver;
  
	private TableImpl joinInternal(
			Table right,
			Optional<Expression> joinPredicate,
			JoinType joinType) {
		verifyTableCompatible(right);

		return createTable(operationTreeBuilder.join(
			this.operationTree,
			right.getQueryOperation(),
			joinType,
			joinPredicate,
			false));
	}
}

// 程序中調試的內容 

view = {TableImpl@4583} "UnnamedTable$0"
 tableEnvironment = {StreamTableEnvironmentImpl@4580} 
  functionCatalog = {FunctionCatalog@4646} 
  scalaExecutionEnvironment = {StreamExecutionEnvironment@4579} 
  planner = {StreamPlanner@4647} 
  catalogManager = {CatalogManager@4644} 
  moduleManager = {ModuleManager@4648} 
  operationTreeBuilder = {OperationTreeBuilder@4649} 
  bufferedModifyOperations = {ArrayList@4650}  size = 0
  tableConfig = {TableConfig@4651} 
  execEnv = {StreamExecutor@4652} 
  TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4646} 
  TableEnvironmentImpl.planner = {StreamPlanner@4647} 
  parser = {ParserImpl@4653} 
  registration = {TableEnvironmentImpl$1@4654} 
 operationTree = {ScalaDataStreamQueryOperation@4665} 
  identifier = null
  dataStream = {DataStreamSource@4676} 
  fieldIndices = {int[2]@4677} 
  tableSchema = {TableSchema@4678} "root\n |-- orderId: STRING\n |-- productName: STRING\n"
 operationTreeBuilder = {OperationTreeBuilder@4649} 
  config = {TableConfig@4651} 
  functionCatalog = {FunctionCatalog@4646} 
  tableReferenceLookup = {TableEnvironmentImpl$lambda@4668} 
  lookupResolver = {LookupCallResolver@4669} 
  projectionOperationFactory = {ProjectionOperationFactory@4670} 
  sortOperationFactory = {SortOperationFactory@4671} 
  calculatedTableFactory = {CalculatedTableFactory@4672} 
  setOperationFactory = {SetOperationFactory@4673} 
  aggregateOperationFactory = {AggregateOperationFactory@4674} 
  joinOperationFactory = {JoinOperationFactory@4675} 
 lookupResolver = {LookupCallResolver@4666} 
  functionLookup = {FunctionCatalog@4646} 
 tableName = "UnnamedTable$0"
  value = {char[14]@4667} 
  hash = 1355882650

1. SQL 解析階段(SQL–>SqlNode)

這裏對應前面脈絡圖,做用是生成了 SqlJoin 這樣的 SqlNode

// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode      
*        |   
*        |  
*        +-----> SqlJoin (SqlNode)
*        |   
*        |

Calcite 使用 JavaCC 作 SQL 解析,JavaCC 根據 Calcite 中定義的 Parser.jj 文件,生成一系列的 java 代碼,生成的 Java 代碼會把 SQL 轉換成 AST 的數據結構(這裏是 SqlNode 類型)。

即:把 SQL 轉換成爲 AST (抽象語法樹),在 Calcite 中用 SqlNode 來表示;

package org.apache.Flink.table.planner.delegation;

public class ParserImpl implements Parser {
	@Override
	public List<Operation> parse(String statement) {
		CalciteParser parser = calciteParserSupplier.get();
		FlinkPlannerImpl planner = validatorSupplier.get();
    
		// parse the sql query
		SqlNode parsed = parser.parse(statement);

		Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
			.orElseThrow(() -> new TableException("Unsupported query: " + statement));
		return Collections.singletonList(operation);
	}  
}  

// 打印出來解析以後 parsed 的內容,咱們能看到 SqlNode 的基本格式。
  
parsed = {SqlBasicCall@4690} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2\nUNION ALL\nSELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
 operator = {SqlSetOperator@4716} "UNION ALL"
  all = true
  name = "UNION ALL"
  kind = {SqlKind@4742} "UNION"
  leftPrec = 14
  rightPrec = 15
  returnTypeInference = {ReturnTypes$lambda@4743} 
  operandTypeInference = null
  operandTypeChecker = {SetopOperandTypeChecker@4744} 
 operands = {SqlNode[2]@4717} 
  0 = {SqlSelect@4746} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2"
  1 = {SqlSelect@4747} "SELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
 functionQuantifier = null
 expanded = false
 pos = {SqlParserPos@4719} "line 2, column 1"  
    
// 下面是調試相關Stack,能夠幫助你們深刻理解
    
SqlStmt:3208, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
parse:48, CalciteParser (org.apache.Flink.table.planner.calcite)
parse:64, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
  
// 另外一個參考 in FlinkSqlParserImpl.FromClause     
    
e = {SqlJoin@4709} "`Orders` AS `o`\nLEFT JOIN `Payment` AS `p` ON `o`.`orderId` = `p`.`orderId`"
 left = {SqlBasicCall@4676} "`Orders` AS `o`"
  operator = {SqlAsOperator@4752} "AS"
  operands = {SqlNode[2]@4753} 
  functionQuantifier = null
  expanded = false
  pos = {SqlParserPos@4755} "line 7, column 3"
 natural = {SqlLiteral@4677} "FALSE"
  typeName = {SqlTypeName@4775} "BOOLEAN"
  value = {Boolean@4776} false
  pos = {SqlParserPos@4777} "line 7, column 13"
 joinType = {SqlLiteral@4678} "LEFT"
  typeName = {SqlTypeName@4758} "SYMBOL"
  value = {JoinType@4759} "LEFT"
  pos = {SqlParserPos@4724} "line 7, column 26"
 right = {SqlBasicCall@4679} "`Payment` AS `p`"
  operator = {SqlAsOperator@4752} "AS"
  operands = {SqlNode[2]@4763} 
  functionQuantifier = null
  expanded = false
  pos = {SqlParserPos@4764} "line 7, column 31"
 conditionType = {SqlLiteral@4680} "ON"
  typeName = {SqlTypeName@4758} "SYMBOL"
  value = {JoinConditionType@4771} "ON"
  pos = {SqlParserPos@4772} "line 7, column 44"
 condition = {SqlBasicCall@4681} "`o`.`orderId` = `p`.`orderId`"
  operator = {SqlBinaryOperator@4766} "="
  operands = {SqlNode[2]@4767} 
  functionQuantifier = null
  expanded = false
  pos = {SqlParserPos@4768} "line 7, column 47"
 pos = {SqlParserPos@4724} "line 7, column 26" 
        
// 下面是調試相關Stack,能夠幫助你們深刻理解
    
FromClause:10192, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlSelect:5918, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
LeafQuery:630, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
LeafQueryOrExpr:15651, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
QueryOrExpr:15118, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
OrderedQueryOrExpr:504, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlStmt:3693, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
parse:48, CalciteParser (org.apache.flink.table.planner.calcite)
parse:64, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)

2. SqlNode 驗證(SqlNode–>SqlNode)

通過上面的第一步,會生成一個 SqlNode 對象,它是一個未經驗證的抽象語法樹,下面就進入了一個語法檢查階段,語法檢查前須要知道元數據信息,這個檢查會包括表名、字段名、函數名、數據類型的檢查。

即:語法檢查,根據元數據信息進行語法驗證,驗證以後仍是用 SqlNode 表示 AST 語法樹;

package org.apache.Flink.table.planner.operations;

public class SqlToOperationConverter {
	public static Optional<Operation> convert(
    // 這裏進行validate的調用
		final SqlNode validated = FlinkPlanner.validate(sqlNode);
		SqlToOperationConverter converter = new SqlToOperationConverter(FlinkPlanner, catalogManager);
	}    
}
    
// 打印出來解析以後 validated 的內容。    
    
validated = {SqlBasicCall@4675} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2\nUNION ALL\nSELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
 operator = {SqlSetOperator@5000} "UNION ALL"
  all = true
  name = "UNION ALL"
  kind = {SqlKind@5029} "UNION"
  leftPrec = 14
  rightPrec = 15
  returnTypeInference = {ReturnTypes$lambda@5030} 
  operandTypeInference = null
  operandTypeChecker = {SetopOperandTypeChecker@5031} 
 operands = {SqlNode[2]@5001} 
  0 = {SqlSelect@4840} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2"
  1 = {SqlSelect@5026} "SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
 functionQuantifier = null
 expanded = false
 pos = {SqlParserPos@5003} "line 2, column 1"    
    
// 下面是調試相關Stack,能夠幫助你們深刻理解    
    
validate:81, AbstractNamespace (org.apache.calcite.sql.validate)
validateNamespace:1008, SqlValidatorImpl (org.apache.calcite.sql.validate)
validateQuery:968, SqlValidatorImpl (org.apache.calcite.sql.validate)
validateCall:90, SqlSetOperator (org.apache.calcite.sql)
validateCall:5304, SqlValidatorImpl (org.apache.calcite.sql.validate)
validate:116, SqlCall (org.apache.calcite.sql)
validateScopedExpression:943, SqlValidatorImpl (org.apache.calcite.sql.validate)
validate:650, SqlValidatorImpl (org.apache.calcite.sql.validate)
org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$validate:126, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
validate:105, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)

3. 語義分析(SqlNode–>RelNode/RexNode)

脈絡圖中,這時候來到了

// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
*        |      
*        |

通過第二步以後,這裏的 SqlNode 就是通過語法校驗的 SqlNode 樹,接下來這一步就是將 SqlNode 轉換成 RelNode/RexNode,也就是生成相應的邏輯計劃(Logical Plan)

即:語義分析,根據 SqlNode及元信息構建 RelNode 樹,也就是最第一版本的邏輯計劃(Logical Plan);

根據這個已經生成的Flink的logical Plan,將它轉換成calcite的logicalPlan,這樣咱們才能用到calcite強大的優化規則

Flink由上往下依次調用各個節點的construct方法,將Flink節點轉換成calcite的RelNode節點。真正的實現是在 convertQueryRecursive() 方法中完成的。

好比生成 LogicalProject 調用關係大概以下:

createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)

具體詳細源碼以下:

SqlToRelConverter 中的 convertQuery() 將 SqlNode 轉換爲 RelRoot
  
public class SqlToRelConverter {  
    public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) {
        if (needsValidation) {
            query = this.validator.validate(query);
        }
              	             RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(this.cluster.getMetadataProvider()));
        RelNode result = this.convertQueryRecursive(query, top, (RelDataType)null).rel;
        if (top && isStream(query)) {
            result = new LogicalDelta(this.cluster, ((RelNode)result).getTraitSet(), (RelNode)result);
        }

        RelCollation collation = RelCollations.EMPTY;
        if (!query.isA(SqlKind.DML) && isOrdered(query)) {
            collation = this.requiredCollation((RelNode)result);
        }

        this.checkConvertedType(query, (RelNode)result);

        RelDataType validatedRowType = this.validator.getValidatedNodeType(query);
      
        // 這裏設定了Root
        return RelRoot.of((RelNode)result, validatedRowType, query.getKind()).withCollation(collation);
    }
}

// 在這裏打印
toQueryOperation:523, SqlToOperationConverter (org.apache.Flink.table.planner.operations)

// 獲得以下內容,能夠看到一個RelRoot的真實結構
  
relational = {RelRoot@5248} "Root {kind: UNION, rel: LogicalUnion#6, rowType: RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount), fields: [<0, user>, <1, product>, <2, amount>], collation: []}"
 rel = {LogicalUnion@5227} "LogicalUnion#6"
  inputs = {RegularImmutableList@5272}  size = 2
  kind = {SqlKind@5029} "UNION"
  all = true
  desc = "LogicalUnion#6"
  rowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
  digest = "LogicalUnion#6"
  cluster = {FlinkRelOptCluster@4800} 
  id = 6
  traitSet = {RelTraitSet@5273}  size = 5
 validatedRowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
  kind = {StructKind@5268} "FULLY_QUALIFIED"
  nullable = false
  fieldList = {RegularImmutableList@5269}  size = 3
  digest = "RecordType(BIGINT user, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, INTEGER amount) NOT NULL"
 kind = {SqlKind@5029} "UNION"
  lowerName = "union"
  sql = "UNION"
  name = "UNION"
  ordinal = 18
 fields = {RegularImmutableList@5254}  size = 3
  {Integer@5261} 0 -> "user"
  {Integer@5263} 1 -> "product"
  {Integer@5265} 2 -> "amount"
 collation = {RelCollationImpl@5237} "[]"
  fieldCollations = {RegularImmutableList@5256}  size = 0

// 調用棧內容
    
convertQuery:561, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
main:82, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
  
// 再次舉例,生成了LogicalProject 
  
bb = {SqlToRelConverter$Blackboard@4978} 
 scope = {SelectScope@4977} 
 nameToNodeMap = null
 root = {LogicalProject@5100} "LogicalProject#4"
  exps = {RegularImmutableList@5105}  size = 3
  input = {LogicalJoin@5106} "LogicalJoin#3"
  desc = "LogicalProject#4"
  rowType = {RelRecordType@5107} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) payType)"
  digest = "LogicalProject#4"
  cluster = {FlinkRelOptCluster@4949} 
  id = 4
  traitSet = {RelTraitSet@5108}  size = 5
 inputs = {Collections$SingletonList@5111}  size = 1
 mapCorrelateToRex = {HashMap@5112}  size = 0
 isPatternVarRef = false
 cursors = {ArrayList@5113}  size = 0
 subQueryList = {LinkedHashSet@5114}  size = 0
 agg = null
 window = null
 mapRootRelToFieldProjection = {HashMap@5115}  size = 0
 columnMonotonicities = {ArrayList@5116}  size = 3
 systemFieldList = {ArrayList@5117}  size = 0
 top = true
 initializerExpressionFactory = {NullInitializerExpressionFactory@5118} 
 this$0 = {SqlToRelConverter@4926} 

// 舉例,LogicalProject是在這裏生成的。

   protected void convertFrom(SqlToRelConverter.Blackboard bb, SqlNode from) {
            case JOIN:
                RelNode joinRel = this.createJoin(fromBlackboard, leftRel, rightRel, conditionExp, convertedJoinType);
                bb.setRoot(joinRel, false);  
   }
  
// 相關調用棧

createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
main:73, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)

4. 優化階段(RelNode–>RelNode)

這時候,脈絡圖到了這裏

// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules      
*    VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
*        |      
*        |

第四階段,也就是 Calcite 的核心所在。

即:邏輯計劃優化,優化器的核心,根據前面生成的邏輯計劃按照相應的規則(Rule)進行優化;

Flink的這部分實現統一封裝在optimize方法裏頭。這部分涉及到多個階段,每一個階段都是用Rule對邏輯計劃進行優化和改進。

優化器的做用

在 Calcite 架構中,最核心地方就是 Optimizer,也就是優化器,一個 Optimization Engine 包含三個組成部分:

  • rules:也就是匹配規則,Calcite 內置上百種 Rules 來優化 relational expression,固然也支持自定義 rules;
  • metadata providers:主要是向優化器提供信息,這些信息會有助於指導優化器向着目標(減小總體 cost)進行優化,信息能夠包括行數、table 哪一列是惟一列等,也包括計算 RelNode 樹中執行 subexpression cost 的函數;
  • planner engines:它的主要目標是進行觸發 rules 來達到指定目標,好比像 cost-based optimizer(CBO)的目標是減小cost(Cost 包括處理的數據行數、CPU cost、IO cost 等)。

優化器的做用是將解析器生成的關係代數表達式轉換成執行計劃,供執行引擎執行,在這個過程當中,會應用一些規則優化,以幫助生成更高效的執行計劃。優化器進行優化的地方如過濾條件的下壓(push down),在進行 join 操做前,先進行 filter 操做,這樣的話就不須要在 join 時進行全量 join,減小參與 join 的數據量等。

Calcite 中 RelOptPlanner 是 Calcite 中優化器的基類。Calcite 中關於優化器提供了兩種實現:

  • HepPlanner:就是基於規則優化RBO 的實現,它是一個啓發式的優化器,按照規則進行匹配,直到達到次數限制(match 次數限制)或者遍歷一遍後再也不出現 rule match 的狀況纔算完成;
  • VolcanoPlanner:就是基於成本優化CBO 的實現,它會一直迭代 rules,直到找到 cost 最小的 paln。

基於成本優化(CBO)

基於代價的優化器(Cost-Based Optimizer,CBO) 是根據優化規則對關係表達式進行轉換。這裏的轉換是說一個關係表達式通過優化規則後會生成另一個關係表達式,同時原有表達式也會保留,通過一系列轉換後會生成多個執行計劃,而後 CBO 會根據統計信息和代價模型 (Cost Model) 計算每一個執行計劃的 Cost,從中挑選 Cost 最小的執行計劃。

由上可知,CBO 中有兩個依賴:統計信息和代價模型。統計信息的準確與否、代價模型的合理與否都會影響 CBO 選擇最優計劃。 從上述描述可知,CBO 是優於 RBO 的,緣由是 RBO 是一種只認規則,對數據不敏感的呆板的優化器,而在實際過程當中,數據每每是有變化的,經過 RBO 生成的執行計劃頗有可能不是最優的。事實上目前各大數據庫和大數據計算引擎都傾向於使用 CBO,可是對於流式計算引擎來講,使用 CBO 仍是有很大難度的,由於並不能提早預知數據量等信息,這會極大地影響優化效果,CBO 主要仍是應用在離線的場景。

VolcanoPlanner相關概念

VolcanoPlanner就是 CBO 的實現,它會一直迭代 rules,直到找到 cost 最小的 paln。其部分相關概念以下:

  • RelSet 描述一組等價 Relation Expression,全部的 RelNode 會記錄在 rels 中;
  • RelSubset 描述一組物理屬性相同的等價 Relation Expression,即它們具備相同的 Physical Properties;每一個 RelSubset 都會記錄其所屬的 RelSet;RelSubset 繼承自 AbstractRelNode,它也是一種 RelNode,物理屬性記錄在其成員變量 traitSet 中。每一個 RelSubset 都將會記錄其最佳 plan(best)和最佳 plan 的 cost(bestCost)信息。
  • RuleMatch 是對 Rule 和 RelSubset 關係的一個抽象,它會記錄這二者的信息。
  • importance 決定了在進行 Rule 優化時 Rule 應用的順序,它是一個相對概念,在 VolcanoPlanner 中有兩個 importance,分別是 RelSubset 和 RuleMatch 的 importance

VolcanoPlanner執行步驟

在應用 VolcanoPlanner 時,總體分爲如下四步:

  1. 初始化 VolcanoPlanner,並向 Rule Match Queue 中添加相應的 Rule Match(包括 ConverterRule);
  2. 對 RelNode 作等價轉換:應用 Rule Match 對 plan graph 作 transformation 優化(Rule specifies an Operator sub-graph to match and logic to generate equivalent better sub-graph);這裏只是改變其物理屬性(Convention);
  3. 經過 VolcanoPlanner 的 setRoot() 方法註冊相應的 RelNode,並進行相應的初始化操做;
  4. 經過動態規劃算法進行相應的迭代,直到 cost 再也不變化或者 Rule Match Queue 中 rule match 已經所有應用完成;這樣找到 cost 最小的 plan;Rule Match 的 importance 依賴於 RelNode 的 cost 和深度。

下面經過這個 示例 來詳細看下 VolcanoPlanner 內部的實現邏輯。

//1. 初始化 VolcanoPlanner 對象,並添加相應的 Rule
VolcanoPlanner planner = new VolcanoPlanner();
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
// 添加相應的 rule
  planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
// 添加相應的 ConverterRule
planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
//2. Changes a relational expression to an equivalent one with a different set of traits.
RelTraitSet desiredTraits =
    relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE);
relNode = planner.changeTraits(relNode, desiredTraits);
//3. 經過 VolcanoPlanner 的 setRoot 方法註冊相應的 RelNode,並進行相應的初始化操做
planner.setRoot(relNode);
//4. 經過動態規劃算法找到 cost 最小的 plan
relNode = planner.findBestExp();

Flink 中相關代碼以下:

public PlannerContext(
			TableConfig tableConfig,
			FunctionCatalog functionCatalog,
			CatalogManager catalogManager,
			CalciteSchema rootSchema,
			List<RelTraitDef> traitDefs) {
		this.tableConfig = tableConfig;

		this.context = new FlinkContextImpl(
				tableConfig,
				functionCatalog,
				catalogManager,
				this::createSqlExprToRexConverter);

		this.rootSchema = rootSchema;
		this.traitDefs = traitDefs;
		// Make a framework config to initialize the RelOptCluster instance,
		// caution that we can only use the attributes that can not be overwrite/configured
		// by user.
		this.frameworkConfig = createFrameworkConfig();

    // 這裏使用了VolcanoPlanner
		RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
		planner.setExecutor(frameworkConfig.getExecutor());
		for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) {
			planner.addRelTraitDef(traitDef);
		}
		this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
	}


//初始化
<init>:119, PlannerContext (org.apache.Flink.table.planner.delegation)
<init>:86, PlannerBase (org.apache.Flink.table.planner.delegation)
<init>:44, StreamPlanner (org.apache.Flink.table.planner.delegation)
create:50, BlinkPlannerFactory (org.apache.Flink.table.planner.delegation)
create:325, StreamTableEnvironmentImpl$ (org.apache.Flink.table.api.scala.internal)
create:425, StreamTableEnvironment$ (org.apache.Flink.table.api.scala)
main:56, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)


class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends FlinkRuleSetProgram[OC] {

 override def optimize(root: RelNode, context: OC): RelNode = {
    val targetTraits = root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
    // VolcanoPlanner limits that the planer a RelNode tree belongs to and
    // the VolcanoPlanner used to optimize the RelNode tree should be same instance.
    // see: VolcanoPlanner#registerImpl
    // here, use the planner in cluster directly
      
    // 這裏也使用了VolcanoPlanner 
    val planner = root.getCluster.getPlanner.asInstanceOf[VolcanoPlanner]
    val optProgram = Programs.ofRules(rules)
  }
}      
  
// 其調用棧

optimize:60, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)

// 下面所有是 VolcanoPlanner 相關代碼和調用棧 
  
// VolcanoPlanner添加Rule,篩選出來的優化規則會封裝成VolcanoRuleMatch,而後扔到RuleQueue裏,而這個RuleQueue正是接下來執行動態規劃算法要用到的核心類。   
public class VolcanoPlanner extends AbstractRelOptPlanner {
      public boolean addRule(RelOptRule rule) {
        ......
      }
}    
        
addRule:438, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:315, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)        
 
// VolcanoPlanner修改Traits   
public class VolcanoPlanner extends AbstractRelOptPlanner {
    public RelNode changeTraits(RelNode rel, RelTraitSet toTraits) {
        assert !rel.getTraitSet().equals(toTraits);

        assert toTraits.allSimple();

        RelSubset rel2 = this.ensureRegistered(rel, (RelNode)null);
        return rel2.getTraitSet().equals(toTraits) ? rel2 : rel2.set.getOrCreateSubset(rel.getCluster(), toTraits.simplify());
    }
}

changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
  
// VolcanoPlanner設定Root 
public class VolcanoPlanner extends AbstractRelOptPlanner {  
    public void setRoot(RelNode rel) {
        this.registerMetadataRels();
        this.root = this.registerImpl(rel, (RelSet)null);
        if (this.originalRoot == null) {
            this.originalRoot = rel;
        }

        this.ruleQueue.recompute(this.root);
        this.ensureRootConverters();
    }
}

setRoot:294, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:326, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
  
// VolcanoPlanner找到最小cost,本質上就是一個動態規劃算法的實現。
  
public class VolcanoPlanner extends AbstractRelOptPlanner {
    public RelNode findBestExp() {
        this.ensureRootConverters();
        this.registerMaterializations();
        int cumulativeTicks = 0;
        VolcanoPlannerPhase[] var2 = VolcanoPlannerPhase.values();
        int var3 = var2.length;

        for(int var4 = 0; var4 < var3; ++var4) {
            VolcanoPlannerPhase phase = var2[var4];
            this.setInitialImportance();
            RelOptCost targetCost = this.costFactory.makeHugeCost();
            int tick = 0;
            int firstFiniteTick = -1;
            int splitCount = 0;
            int giveUpTick = 2147483647;

            while(true) {
                ++tick;
                ++cumulativeTicks;
                if (this.root.bestCost.isLe(targetCost)) {
                    if (firstFiniteTick < 0) {
                        firstFiniteTick = cumulativeTicks;
                        this.clearImportanceBoost();
                    }

                    if (!this.ambitious) {
                        break;
                    }

                    targetCost = this.root.bestCost.multiplyBy(0.9D);
                    ++splitCount;
                    if (this.impatient) {
                        if (firstFiniteTick < 10) {
                            giveUpTick = cumulativeTicks + 25;
                        } else {
                            giveUpTick = cumulativeTicks + Math.max(firstFiniteTick / 10, 25);
                        }
                    }
                } else {
                    if (cumulativeTicks > giveUpTick) {
                        break;
                    }

                    if (this.root.bestCost.isInfinite() && tick % 10 == 0) {
                        this.injectImportanceBoost();
                    }
                }

                VolcanoRuleMatch match = this.ruleQueue.popMatch(phase);
                if (match == null) {
                    break;
                }

                assert match.getRule().matches(match);

                match.onMatch();
                this.root = this.canonize(this.root);
            }

            this.ruleQueue.phaseCompleted(phase);
        }

        RelNode cheapest = this.root.buildCheapestPlan(this);

        return cheapest;
    }
}

// VolcanoPlanner獲得的Flink邏輯節點 cheapest,就是最終選擇的結點

cheapest = {FlinkLogicalUnion@6487} "FlinkLogicalUnion#443"
 cluster = {FlinkRelOptCluster@6224} 
 inputs = {RegularImmutableList@6493}  size = 2
  0 = {FlinkLogicalCalc@6498} "FlinkLogicalCalc#441"
   cluster = {FlinkRelOptCluster@6224} 
   calcProgram = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
   program = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
   input = {FlinkLogicalDataStreamTableScan@6510} "rel#437:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, UnnamedTable$0])"
   desc = "FlinkLogicalCalc#441"
   rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
   digest = "FlinkLogicalCalc#441"
   AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
   id = 441
   traitSet = {RelTraitSet@5942}  size = 5
  1 = {FlinkLogicalCalc@6499} "FlinkLogicalCalc#442"
   cluster = {FlinkRelOptCluster@6224} 
   calcProgram = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
   program = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
   input = {FlinkLogicalDataStreamTableScan@6503} "rel#435:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, OrderB])"
   desc = "FlinkLogicalCalc#442"
   rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
   digest = "FlinkLogicalCalc#442"
   AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
   id = 442
   traitSet = {RelTraitSet@5942}  size = 5
 kind = {SqlKind@6494} "UNION"
  lowerName = "union"
  sql = "UNION"
  name = "UNION"
  ordinal = 18
 all = true
 desc = "FlinkLogicalUnion#443"
 rowType = null
 digest = "FlinkLogicalUnion#443"
 AbstractRelNode.cluster = {FlinkRelOptCluster@6224} 
 id = 443
 traitSet = {RelTraitSet@5942}  size = 5


findBestExp:572, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)

如下是Join的優化

class FlinkLogicalJoin(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    left: RelNode,
    right: RelNode,
    condition: RexNode,
    joinType: JoinRelType)
  extends FlinkLogicalJoinBase(

  override def convert(rel: RelNode): RelNode = {
    val join = rel.asInstanceOf[LogicalJoin]
    val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
    val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
    val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)

    new FlinkLogicalJoin(
      rel.getCluster,
      traitSet,
      newLeft,
      newRight,
      join.getCondition,
      join.getJoinType)
  }
}  

call = {VolcanoRuleMatch@6191} "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]"
 targetSet = {RelSet@6193} 
 targetSubset = null
 digest = "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]"
 cachedImportance = 0.8019000000000001
 volcanoPlanner = {VolcanoPlanner@6194} 
 generatedRelList = null
 id = 71
 operand0 = {RelOptRule$ConverterRelOptRuleOperand@6186} 
  parent = null
  rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
  predicate = {ConverterRule$lambda@6246} 
  solveOrder = {int[1]@6247} 
  ordinalInParent = 0
  ordinalInRule = 0
  trait = {Convention$Impl@6184} "NONE"
  clazz = {Class@5010} "class org.apache.calcite.rel.logical.LogicalJoin"
  children = {RegularImmutableList@6230}  size = 0
  childPolicy = {RelOptRuleOperandChildPolicy@6248} "ANY"
 nodeInputs = {RegularImmutableBiMap@6195}  size = 0
 rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
 rels = {RelNode[1]@6196} 
  0 = {LogicalJoin@6181} "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
   semiJoinDone = false
   systemFieldList = {RegularImmutableList@6230}  size = 0
   condition = {RexCall@6231} "=($0, $2)"
   variablesSet = {RegularImmutableSet@6232}  size = 0
   joinType = {JoinRelType@6233} "LEFT"
   joinInfo = {JoinInfo@6234} 
   left = {RelSubset@6235} "rel#98:Subset#0.NONE.any.None: 0.false.UNKNOWN"
   right = {RelSubset@6236} "rel#99:Subset#1.NONE.any.None: 0.false.UNKNOWN"
   desc = "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
   rowType = {RelRecordType@6237} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) orderId0, VARCHAR(2147483647) payType)"
   digest = "LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
   cluster = {FlinkRelOptCluster@6239} 
   id = 100
   traitSet = {RelTraitSet@6240}  size = 5
 planner = {VolcanoPlanner@6194} 
 parents = null  
  
// 生成時候的調用棧   
   
create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
main:75, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)
  
abstract class FlinkLogicalJoinBase(  
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    left: RelNode,
    right: RelNode,
    condition: RexNode,
    joinType: JoinRelType)
  extends Join(
    cluster,
    traitSet,
    left,
    right,
    condition,
    Set.empty[CorrelationId].asJava,
    joinType)
  with FlinkLogicalRel {
    
  // 這裏也會計算cost
  override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
    val leftRowCnt = mq.getRowCount(getLeft)
    val leftRowSize = mq.getAverageRowSize(getLeft)
    val rightRowCnt = mq.getRowCount(getRight)

    joinType match {
      case JoinRelType.SEMI | JoinRelType.ANTI =>
        val rightRowSize = mq.getAverageRowSize(getRight)
        val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
        val cpuCost = leftRowCnt + rightRowCnt
        val rowCnt = leftRowCnt + rightRowCnt
        planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
      case _ =>
        val cpuCost = leftRowCnt + rightRowCnt
        val ioCost = (leftRowCnt * leftRowSize) + rightRowCnt
        planner.getCostFactory.makeCost(leftRowCnt, cpuCost, ioCost)
    }
  }  
}  
  
// 調用棧    
  
computeSelfCost:63, FlinkLogicalJoin (org.apache.flink.table.planner.plan.nodes.logical)
getNonCumulativeCost:41, FlinkRelMdNonCumulativeCost (org.apache.flink.table.planner.plan.metadata)
getNonCumulativeCost_$:-1, GeneratedMetadataHandler_NonCumulativeCost
getNonCumulativeCost:-1, GeneratedMetadataHandler_NonCumulativeCost
getNonCumulativeCost:301, RelMetadataQuery (org.apache.calcite.rel.metadata)
getCost:936, VolcanoPlanner (org.apache.calcite.plan.volcano)
propagateCostImprovements0:347, RelSubset (org.apache.calcite.plan.volcano)
propagateCostImprovements:330, RelSubset (org.apache.calcite.plan.volcano)
addRelToSet:1828, VolcanoPlanner (org.apache.calcite.plan.volcano)
registerImpl:1764, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:1939, VolcanoPlanner (org.apache.calcite.plan.volcano)
transformTo:129, VolcanoRuleCall (org.apache.calcite.plan.volcano)
transformTo:236, RelOptRuleCall (org.apache.calcite.plan)
onMatch:146, ConverterRule (org.apache.calcite.rel.convert)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
main:75, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)

優化規則

Calcite 會基於優化規則來優化這些 Logical Plan,根據運行環境的不一樣會應用不一樣的優化規則(Flink提供了批的優化規則 和 流的優化規則)。

優化規則分爲兩類,一類是Calcite提供的內置優化規則(如條件下推,剪枝等),另外一類是是將Logical Node轉變成 Flink Node 的規則。

這兩步驟都屬於 Calcite 的優化階段。獲得的 DataStream Plan 封裝瞭如何將節點翻譯成對應 DataStream / DataSet 程序的邏輯。其步驟就是將不一樣的 DataStream/DataSet Node 經過代碼生成(CodeGen)翻譯成最終可執行的 DataStream/DataSet 程序。

下面就列舉了不一樣的Rule,每條規則會對應生成一個物理節點。好比節點內根據Calcite生成的sql的執行步驟,會進行codegen出DataSet的執行Function代碼,

package org.apache.Flink.table.plan.rules
  
  /**
    * RuleSet to optimize plans for batch / DataSet execution
    */
  val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
    // translate to Flink DataSet nodes
    DataSetWindowAggregateRule.INSTANCE,
    DataSetAggregateRule.INSTANCE,
    DataSetDistinctRule.INSTANCE,
    DataSetCalcRule.INSTANCE,
    DataSetPythonCalcRule.INSTANCE,
    DataSetJoinRule.INSTANCE,
    DataSetSingleRowJoinRule.INSTANCE,
    DataSetScanRule.INSTANCE,
    DataSetUnionRule.INSTANCE,
    DataSetIntersectRule.INSTANCE,
    DataSetMinusRule.INSTANCE,
    DataSetSortRule.INSTANCE,
    DataSetValuesRule.INSTANCE,
    DataSetCorrelateRule.INSTANCE,
    BatchTableSourceScanRule.INSTANCE
  )
  
   /**
    * RuleSet to optimize plans for stream / DataStream execution
    */
  val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
    // translate to DataStream nodes
    DataStreamSortRule.INSTANCE,
    DataStreamGroupAggregateRule.INSTANCE,
    DataStreamOverAggregateRule.INSTANCE,
    DataStreamGroupWindowAggregateRule.INSTANCE,
    DataStreamCalcRule.INSTANCE,
    DataStreamScanRule.INSTANCE,
    DataStreamUnionRule.INSTANCE,
    DataStreamValuesRule.INSTANCE,
    DataStreamCorrelateRule.INSTANCE,
    DataStreamWindowJoinRule.INSTANCE,
    DataStreamJoinRule.INSTANCE,
    DataStreamTemporalTableJoinRule.INSTANCE,
    StreamTableSourceScanRule.INSTANCE,
    DataStreamMatchRule.INSTANCE,
    DataStreamTableAggregateRule.INSTANCE,
    DataStreamGroupWindowTableAggregateRule.INSTANCE,
    DataStreamPythonCalcRule.INSTANCE
  )
    
package org.apache.Flink.table.planner.plan.rules

  /**
    * RuleSet to do physical optimize for stream
    */
  val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList(
    FlinkExpandConversionRule.STREAM_INSTANCE,
    // source
    StreamExecDataStreamScanRule.INSTANCE,
    StreamExecTableSourceScanRule.INSTANCE,
    StreamExecIntermediateTableScanRule.INSTANCE,
    StreamExecWatermarkAssignerRule.INSTANCE,
    StreamExecValuesRule.INSTANCE,
    // calc
    StreamExecCalcRule.INSTANCE,
    StreamExecPythonCalcRule.INSTANCE,
    // union
    StreamExecUnionRule.INSTANCE,
    // sort
    StreamExecSortRule.INSTANCE,
    StreamExecLimitRule.INSTANCE,
    StreamExecSortLimitRule.INSTANCE,
    StreamExecTemporalSortRule.INSTANCE,
    // rank
    StreamExecRankRule.INSTANCE,
    StreamExecDeduplicateRule.RANK_INSTANCE,
    // expand
    StreamExecExpandRule.INSTANCE,
    // group agg
    StreamExecGroupAggregateRule.INSTANCE,
    StreamExecGroupTableAggregateRule.INSTANCE,
    // over agg
    StreamExecOverAggregateRule.INSTANCE,
    // window agg
    StreamExecGroupWindowAggregateRule.INSTANCE,
    StreamExecGroupWindowTableAggregateRule.INSTANCE,
    // join
    StreamExecJoinRule.INSTANCE,
    StreamExecWindowJoinRule.INSTANCE,
    StreamExecTemporalJoinRule.INSTANCE,
    StreamExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
    StreamExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
    // CEP
    StreamExecMatchRule.INSTANCE,
    // correlate
    StreamExecConstantTableFunctionScanRule.INSTANCE,
    StreamExecCorrelateRule.INSTANCE,
    // sink
    StreamExecSinkRule.INSTANCE
  )
StreamExecUnionRule

一個具體的Rule舉例 ,這裏是 Union 的 Rule :

package org.apache.Flink.table.planner.plan.rules.physical.stream

class StreamExecUnionRule
  extends ConverterRule(
    classOf[FlinkLogicalUnion],
    FlinkConventions.LOGICAL,
    FlinkConventions.STREAM_PHYSICAL,
    "StreamExecUnionRule") {
  
  def convert(rel: RelNode): RelNode = {
    val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
    val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
    val newInputs = union.getInputs.map(RelOptRule.convert(_, FlinkConventions.STREAM_PHYSICAL))

    // 這裏本條規則會對應生成一個物理節點。節點內根據Calcite生成的sql的執行步驟,會進行codegen出Stream的執行Function代碼,
    new StreamExecUnion(
      rel.getCluster,
      traitSet,
      newInputs,
      union.all,
      rel.getRowType)
    }
  }  
} 

public class VolcanoPlanner extends AbstractRelOptPlanner {
    public RelNode findBestExp() {
             // 在這裏會對Rule進行匹配調用
             match.onMatch();
        return cheapest;
    }
}

match = {VolcanoRuleMatch@6252} "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
 targetSet = {RelSet@6298} 
 targetSubset = null
 digest = "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
 cachedImportance = 0.81
 volcanoPlanner = {VolcanoPlanner@6259} 
 generatedRelList = null
 id = 521
 operand0 = {RelOptRule$ConverterRelOptRuleOperand@6247} 
 nodeInputs = {RegularImmutableBiMap@6299}  size = 0
 rule = {StreamExecUnionRule@6241} "StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)"
 rels = {RelNode[1]@6300} 
 planner = {VolcanoPlanner@6259} 
 parents = null

// 調用棧       
   
create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
main:75, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)
   
  
// 調用棧      
  
convert:46, StreamExecUnionRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)
StreamExecJoinRule

另外一個具體的Rule舉例 ,這裏是 Join的優化,StreamExecJoin的生成

class StreamExecJoinRule {
    override def onMatch(call: RelOptRuleCall): Unit = {
      val newJoin = new StreamExecJoin(
        join.getCluster,
        providedTraitSet,
        newLeft,
        newRight,
        join.getCondition,
        join.getJoinType)
      call.transformTo(newJoin)
    }
}

newJoin = {StreamExecJoin@6326} "StreamExecJoin#152"
 cluster = {FlinkRelOptCluster@5072} 
 joinType = {JoinRelType@5038} "LEFT"
 LOG = null
 transformation = null
 bitmap$trans$0 = false
 CommonPhysicalJoin.joinType = {JoinRelType@5038} "LEFT"
 filterNulls = null
 keyPairs = null
 flinkJoinType = null
 inputRowType = null
 bitmap$0 = 0
 condition = {RexCall@5041} "=($0, $2)"
 variablesSet = {RegularImmutableSet@6342}  size = 0
 Join.joinType = {JoinRelType@5038} "LEFT"
 joinInfo = {JoinInfo@6343} 
 left = {RelSubset@6328} "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  bestCost = {FlinkCost$$anon$1@6344} "{inf}"
  set = {RelSet@6348} 
  best = null
  timestamp = 0
  boosted = false
  desc = "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  rowType = {RelRecordType@6349} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName)"
  digest = "Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  cluster = {FlinkRelOptCluster@5072} 
  id = 150
  traitSet = {RelTraitSet@6336}  size = 5
 right = {RelSubset@6329} "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  bestCost = {FlinkCost$$anon$1@6344} "{inf}"
  set = {RelSet@6345} 
  best = null
  timestamp = 0
  boosted = false
  desc = "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  rowType = null
  digest = "Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  cluster = {FlinkRelOptCluster@5072} 
  id = 151
  traitSet = {RelTraitSet@6336}  size = 5
 desc = "StreamExecJoin#152"
 rowType = null
 digest = "StreamExecJoin#152"
 AbstractRelNode.cluster = {FlinkRelOptCluster@5072} 
 id = 152
 traitSet = {RelTraitSet@6327}  size = 5

// 調用棧       
   
<init>:58, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
onMatch:128, StreamExecJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
main:75, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)

5. 生成ExecutionPlan

這時候脈絡圖以下

// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules      
*    VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
*        |      
*        |   
*    StreamExecJoin.translateToPlanInternal  // 做用是生成 StreamOperator, 即Flink算子  
*        |     
*        |     
*        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask 
*        |     
*        |

Calcite 針對不一樣的大數據組件,將優化後的plan映射到最終的大數據引擎,如折射成Flink圖。

這一塊只要是遞歸調用各個節點DataStreamRel的translateToPlan方法,這個方法利用CodeGen元編程成Flink的各類算子。如今就至關於咱們直接利用Flink的DataSet或DataStream API開發的程序。

class StreamPlanner(
    executor: Executor,
    config: TableConfig,
    functionCatalog: FunctionCatalog,
    catalogManager: CatalogManager)
  extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
  override protected def translateToPlan(
      execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
    execNodes.map {
      case node: StreamExecNode[_] => node.translateToPlan(this)
      case _ =>
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
          "This is a bug and should not happen. Please file an issue.")
    }
  }
}

package org.apache.Flink.table.planner.plan.nodes.physical.stream	

class StreamExecUnion(
    cluster: RelOptCluster,
    traitSet: RelTraitSet,
    inputRels: util.List[RelNode],
    all: Boolean,
    outputRowType: RelDataType)
  extends Union(cluster, traitSet, inputRels, all)
  with StreamPhysicalRel
  with StreamExecNode[BaseRow] {

  // 這裏就生成了Flink算子
  override protected def translateToPlanInternal(
      planner: StreamPlanner): Transformation[BaseRow] = {
    val transformations = getInputNodes.map {
      input => input.translateToPlan(planner).asInstanceOf[Transformation[BaseRow]]
    }
    new UnionTransformation(transformations)
  }
}

 // 調用棧

translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
map:234, TraversableLike$class (scala.collection)
map:104, AbstractTraversable (scala.collection)
translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
main:89, StreamSQLExample$ (spendreport)
main:-1, StreamSQLExample (spendreport)

6. 運行時

此時脈絡圖補充徹底。

// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型
* 
*        +-----> "left outer JOIN" (SQL statement)
*        |   
*        |     
*     SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode      
*        |   
*        |      
*        +-----> SqlJoin (SqlNode)
*        |   
*        |     
*     SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode
*        |    
*        |     
*        +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode   
*        |      
*        |     
*    FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules      
*    VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan 
*        | 
*        |   
*        +-----> FlinkLogicalJoin (RelNode)  // Optimized Logical Plan,邏輯執行計劃
*        |  
*        |    
*    StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin     
*    VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃
*        |       
*        |   
*        +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃
*        |      
*        |     
*    StreamExecJoin.translateToPlanInternal  // 做用是生成 StreamOperator, 即Flink算子  
*        |     
*        |     
*        +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask 
*        |     
*        |       
*    StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask調用StreamingJoinOperator,真實的執行  
*        |
*        |

運行時候,則會在StreamTask中進行業務操做,這就是咱們熟悉的操做了。調用棧舉例以下

processElement:150, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
processInput:69, StreamOneInputProcessor (org.apache.Flink.streaming.runtime.io)
processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
runDefaultAction:-1, 354713989 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$710)
runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
doRun:707, Task (org.apache.Flink.runtime.taskmanager)
run:532, Task (org.apache.Flink.runtime.taskmanager)
run:748, Thread (java.lang)

0x05 代碼實例 UNION

下面是如何具體生成各類執行計劃的代碼

import org.apache.Flink.api.java.utils.ParameterTool
import org.apache.Flink.api.scala._
import org.apache.Flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.Flink.table.api.EnvironmentSettings
import org.apache.Flink.table.api.scala._

object StreamSQLExample {

  // *************************************************************************
  //     PROGRAM
  // *************************************************************************
  def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    val planner = if (params.has("planner")) params.get("planner") else "Flink"

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
      val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()
      StreamTableEnvironment.create(env, settings)
    } else if (planner == "Flink") {  // use Flink planner in streaming mode
      StreamTableEnvironment.create(env)
    } else {
      System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
        "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " +
        "example uses Flink planner or blink planner.")
      return
    }

    val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 3),
      Order(1L, "diaper", 4),
      Order(3L, "rubber", 2)))

    val orderB: DataStream[Order] = env.fromCollection(Seq(
      Order(2L, "pen", 3),
      Order(2L, "rubber", 3),
      Order(4L, "beer", 1)))

    // convert DataStream to Table
    val tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
    // register DataStream as Table
    tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)

    // union the two tables
    val result = tEnv.sqlQuery(
      s"""
         |SELECT * FROM $tableA WHERE amount > 2
         |UNION ALL
         |SELECT * FROM OrderB WHERE amount < 2
        """.stripMargin)

    result.toAppendStream[Order].print()
    print(tEnv.explain(result))
    env.execute()
  }

  // *************************************************************************
  //     USER DATA TYPES
  // *************************************************************************
  case class Order(user: Long, product: String, amount: Int)
}

整個流程的轉換大致就像這樣:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(user=[$0], product=[$1], amount=[$2])
:  +- LogicalFilter(condition=[>($2, 2)])
:     +- LogicalTableScan(table=[[default_catalog, default_database, UnnamedTable$0]])
+- LogicalProject(user=[$0], product=[$1], amount=[$2])
   +- LogicalFilter(condition=[<($2, 2)])
      +- LogicalTableScan(table=[[default_catalog, default_database, OrderB]])

== Optimized Logical Plan ==
Union(all=[true], union=[user, product, amount])
:- Calc(select=[user, product, amount], where=[>(amount, 2)])
:  +- DataStreamScan(table=[[default_catalog, default_database, UnnamedTable$0]], fields=[user, product, amount])
+- Calc(select=[user, product, amount], where=[<(amount, 2)])
   +- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 10 : Operator
		content : SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[user, product, amount])
		ship_strategy : FORWARD

		Stage 11 : Operator
			content : Calc(select=[user, product, amount], where=[(amount > 2)])
			ship_strategy : FORWARD

			Stage 12 : Operator
				content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount])
				ship_strategy : FORWARD

				Stage 13 : Operator
					content : Calc(select=[user, product, amount], where=[(amount < 2)])
					ship_strategy : FORWARD

0x06 代碼實例 OUTER JOIN

import java.sql.Timestamp
import org.apache.Flink.api.java.utils.ParameterTool
import org.apache.Flink.api.scala._
import org.apache.Flink.streaming.api.TimeCharacteristic
import org.apache.Flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.Flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.Flink.table.api.scala._
import org.apache.Flink.types.Row

import scala.collection.mutable

object SimpleOuterJoin {
  def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    val planner = if (params.has("planner")) params.get("planner") else "Flink"

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
      val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()
      StreamTableEnvironment.create(env, settings)
    } else if (planner == "Flink") {  // use Flink planner in streaming mode
      StreamTableEnvironment.create(env)
    } else {
      System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
        "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " +
        "example uses Flink planner or blink planner.")
      return
    }

    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 構造訂單數據
    val ordersData = new mutable.MutableList[(String, String)]
    ordersData.+=(("001", "iphone"))
    ordersData.+=(("002", "mac"))
    ordersData.+=(("003", "book"))
    ordersData.+=(("004", "cup"))

    // 構造付款表
    val paymentData = new mutable.MutableList[(String, String)]
    paymentData.+=(("001", "alipay"))
    paymentData.+=(("002", "card"))
    paymentData.+=(("003", "card"))
    paymentData.+=(("004", "alipay"))
    val orders = env
      .fromCollection(ordersData)
       .toTable(tEnv, 'orderId, 'productName)
    val ratesHistory = env
      .fromCollection(paymentData)
      .toTable(tEnv, 'orderId, 'payType)

    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("Payment", ratesHistory)

    var sqlQuery =
      """
        |SELECT
        |  o.orderId,
        |  o.productName,
        |  p.payType
        |FROM
        |  Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId
        |""".stripMargin
    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))

    val result = tEnv.scan("TemporalJoinResult").toRetractStream[Row]
    result.print()
    print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
    env.execute()
  }
}

整個流程的轉換以下:

== Abstract Syntax Tree ==
LogicalProject(orderId=[$0], productName=[$1], payType=[$3])
+- LogicalJoin(condition=[=($0, $2)], joinType=[left])
   :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
   +- LogicalTableScan(table=[[default_catalog, default_database, Payment]])

== Optimized Logical Plan ==
Calc(select=[orderId, productName, payType])
+- Join(joinType=[LeftOuterJoin], where=[=(orderId, orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[orderId]])
   :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName])
   +- Exchange(distribution=[hash[orderId]])
      +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

Stage 2 : Data Source
	content : Source: Collection Source

	Stage 11 : Operator
		content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName])
		ship_strategy : FORWARD

		Stage 13 : Operator
			content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType])
			ship_strategy : FORWARD

			Stage 15 : Operator
				content : Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
				ship_strategy : HASH

				Stage 16 : Operator
					content : Calc(select=[orderId, productName, payType])
					ship_strategy : FORWARD

輸出結果是
(true,001,iphone,null)
(false,001,iphone,null)
(true,001,iphone,alipay)
(true,002,mac,null)
(false,002,mac,null)
(true,002,mac,card)
(true,003,book,null)
(false,003,book,null)
(true,003,book,card)
(true,004,cup,null)
(false,004,cup,null)
(true,004,cup,alipay)

下面是調試時候的調用棧,這個能夠給你們參考

// 調用Rule進行優化

matches:49, StreamExecJoinRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano)
matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano)
matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano)
match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano)
fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano)
registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
onRegister:329, AbstractRelNode (org.apache.calcite.rel)
registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
onRegister:329, AbstractRelNode (org.apache.calcite.rel)
registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toRetractStream:127, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
toRetractStream:146, TableConversions (org.apache.Flink.table.api.scala)
main:75, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)
  
  
// 調用Rule進行轉換到Flink邏輯算子  
  
translateToPlanInternal:140, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
map:234, TraversableLike$class (scala.collection)
map:104, AbstractTraversable (scala.collection)
translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation)
translate:153, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
main:75, SimpleOuterJoin$ (spendreport)
main:-1, SimpleOuterJoin (spendreport)  
 
// 運行時候
  
@Internal
public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
	private void processRecord2(
			StreamRecord<IN2> record,
			TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
			Counter numRecordsIn) throws Exception {

		streamOperator.setKeyContextElement2(record);
		streamOperator.processElement2(record);
		postProcessRecord(numRecordsIn);
	}  
}    

// 能看出來,streamOperator就是StreamingJoinOperator

streamOperator = {StreamingJoinOperator@10943} 
 leftIsOuter = true
 rightIsOuter = false
 outRow = {JoinedRow@10948} "JoinedRow{row1=org.apache.flink.table.dataformat.BinaryRow@dc6a1b67, row2=(+|null,null)}"
 leftNullRow = {GenericRow@10949} "(+|null,null)"
 rightNullRow = {GenericRow@10950} "(+|null,null)"
 leftRecordStateView = {OuterJoinRecordStateViews$InputSideHasNoUniqueKey@10945} 
 rightRecordStateView = {JoinRecordStateViews$InputSideHasNoUniqueKey@10946} 
 generatedJoinCondition = {GeneratedJoinCondition@10951} 
 leftType = {BaseRowTypeInfo@10952} "BaseRow(orderId: STRING, productName: STRING)"
 rightType = {BaseRowTypeInfo@10953} "BaseRow(orderId: STRING, payType: STRING)"
 leftInputSideSpec = {JoinInputSideSpec@10954} "NoUniqueKey"
 rightInputSideSpec = {JoinInputSideSpec@10955} "NoUniqueKey"
 nullFilterKeys = {int[1]@10956} 
 nullSafe = false
 filterAllNulls = true
 minRetentionTime = 0
 stateCleaningEnabled = false
 joinCondition = {AbstractStreamingJoinOperator$JoinConditionWithNullFilters@10947} 
 collector = {TimestampedCollector@10957} 
 chainingStrategy = {ChainingStrategy@10958} "HEAD"
 container = {TwoInputStreamTask@10959} "Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[orderId, productName, payType]) -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1)"
 config = {StreamConfig@10960} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 2\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])-7 -> Calc(select=[orderId, productName, payType])-8, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: SimpleOperatorFactory\nBuffer timeout: 100\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{8=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Calc(select=[orderId, productName, payType])-8 -> SinkConversionToTuple2-9, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: CodeGenOperatorFactory\nBuffer timeout: "
 output = {AbstractStreamOperator$CountingOutput@10961} 
 runtimeContext = {StreamingRuntimeContext@10962} 
 stateKeySelector1 = {BinaryRowKeySelector@10963} 
 stateKeySelector2 = {BinaryRowKeySelector@10964} 
 keyedStateBackend = {HeapKeyedStateBackend@10965} "HeapKeyedStateBackend"
 keyedStateStore = {DefaultKeyedStateStore@10966} 
 operatorStateBackend = {DefaultOperatorStateBackend@10967} 
 metrics = {OperatorMetricGroup@10968} 
 latencyStats = {LatencyStats@10969} 
 processingTimeService = {ProcessingTimeServiceImpl@10970} 
 timeServiceManager = {InternalTimeServiceManager@10971} 
 combinedWatermark = -9223372036854775808
 input1Watermark = -9223372036854775808
 input2Watermark = -9223372036854775808
  
// 處理table 1 

processElement1:118, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
processRecord1:135, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
lambda$new$0:100, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
accept:-1, 169462196 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$733)
emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
processInput:182, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
doRun:707, Task (org.apache.Flink.runtime.taskmanager)
run:532, Task (org.apache.Flink.runtime.taskmanager)
run:748, Thread (java.lang)
  
// 處理table 2 

processElement2:123, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
processRecord2:145, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
lambda$new$1:107, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
accept:-1, 76811487 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$734)
emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
processInput:185, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
doRun:707, Task (org.apache.Flink.runtime.taskmanager)
run:532, Task (org.apache.Flink.runtime.taskmanager)
run:748, Thread (java.lang)
  
// 處理table 1   
  
processRecord1:134, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
lambda$new$0:100, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
accept:-1, 230607815 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$735)
emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
processInput:182, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks)
runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks)
invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)	
  
// 處理table 2   
  
processRecord2:144, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
lambda$new$1:107, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
accept:-1, 212261435 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$736)
emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
processInput:185, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks)
runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks)
invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

0x07 代碼實例 WINDOW JOIN

import java.sql.Timestamp
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.collection.mutable

import java.sql.Timestamp

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

import scala.collection.mutable

object SimpleTimeIntervalJoinA {
  def main(args: Array[String]): Unit = {
    val params = ParameterTool.fromArgs(args)
    val planner = if (params.has("planner")) params.get("planner") else "flink"

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val tEnv = if (planner == "blink") {  // use blink planner in streaming mode
      val settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build()
      StreamTableEnvironment.create(env, settings)
    } else if (planner == "flink") {  // use flink planner in streaming mode
      StreamTableEnvironment.create(env)
    } else {
      System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
        "where planner (it is either flink or blink, and the default is flink) indicates whether the " +
        "example uses flink planner or blink planner.")
      return
    }
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 構造訂單數據
    val ordersData = new mutable.MutableList[(String, String, Timestamp)]
    ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
    ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
    ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
    ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))

    // 構造付款表
    val paymentData = new mutable.MutableList[(String, String, Timestamp)]
    paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
    paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
    paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
    paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
      .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
    val ratesHistory = env
      .fromCollection(paymentData)
      .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
      .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)

    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("Payment", ratesHistory)

    var sqlQuery =
      """
        |SELECT
        |  o.orderId,
        |  o.productName,
        |  p.payType,
        |  o.orderTime,
        |  cast(payTime as timestamp) as payTime
        |FROM
        |  Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId AND
        | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
        |""".stripMargin
    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))

    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    result.print()
    print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
    env.execute()
  }
}

class TimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

輸出以下

== Abstract Syntax Tree ==
LogicalProject(orderId=[$0], productName=[$1], payType=[$4], orderTime=[$2], payTime=[CAST($5):TIMESTAMP(6)])
+- LogicalJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, +($2, 3600000:INTERVAL HOUR)))], joinType=[left])
   :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
   +- LogicalTableScan(table=[[default_catalog, default_database, Payment]])

== Optimized Logical Plan ==
Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime])
+- WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(orderId, orderId0), >=(payTime, orderTime), <=(payTime, +(orderTime, 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime])
   :- Exchange(distribution=[hash[orderId]])
   :  +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName, orderTime])
   +- Exchange(distribution=[hash[orderId]])
      +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType, payTime])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : Source: Collection Source

	Stage 2 : Operator
		content : Timestamps/Watermarks
		ship_strategy : FORWARD

Stage 3 : Data Source
	content : Source: Collection Source

	Stage 4 : Operator
		content : Timestamps/Watermarks
		ship_strategy : FORWARD

		Stage 13 : Operator
			content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName, orderTime])
			ship_strategy : FORWARD

			Stage 15 : Operator
				content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType, payTime])
				ship_strategy : FORWARD

				Stage 17 : Operator
					content : WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[((orderId = orderId0) AND (payTime >= orderTime) AND (payTime <= (orderTime + 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime])
					ship_strategy : HASH

					Stage 18 : Operator
						content : Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime])
						ship_strategy : FORWARD

001,iphone,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41
002,mac,card,2018-12-26T04:53:23,2018-12-26T05:53:22
004,cup,alipay,2018-12-26T04:53:38,2018-12-26T05:53:31
003,book,null,2018-12-26T04:53:24,null

相關類以及調用棧

class StreamExecWindowJoin {  
}

class StreamExecWindowJoinRule
  extends ConverterRule(
    classOf[FlinkLogicalJoin],
    FlinkConventions.LOGICAL,
    FlinkConventions.STREAM_PHYSICAL,
    "StreamExecWindowJoinRule") {
}


matches:54, StreamExecWindowJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano)
match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano)
fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano)
registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
onRegister:329, AbstractRelNode (org.apache.calcite.rel)
registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
onRegister:329, AbstractRelNode (org.apache.calcite.rel)
registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
foldLeft:157, TraversableOnce$class (scala.collection)
foldLeft:104, AbstractTraversable (scala.collection)
optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.flink.table.api.scala)
main:93, SimpleTimeIntervalJoinA$ (spendreport)
main:-1, SimpleTimeIntervalJoinA (spendreport)
  
  
translateToPlanInternal:136, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
foreach:891, Iterator$class (scala.collection)
foreach:1334, AbstractIterator (scala.collection)
foreach:72, IterableLike$class (scala.collection)
foreach:54, AbstractIterable (scala.collection)
map:234, TraversableLike$class (scala.collection)
map:104, AbstractTraversable (scala.collection)
translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation)
translate:153, PlannerBase (org.apache.flink.table.planner.delegation)
toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
toAppendStream:101, TableConversions (org.apache.flink.table.api.scala)
main:93, SimpleTimeIntervalJoinA$ (spendreport)
main:-1, SimpleTimeIntervalJoinA (spendreport)

0x08 參考

Flink table&Sql中使用Calcite

Flink sql的實現

Calcite 功能簡析及在 Flink 的應用

基於Flink1.8 深刻理解Flink Sql執行流程 + Flink Sql語法擴展

使用Flink Table &Sql api來構建批量和流式應用(3)Flink Sql 使用

Flink關係型API: Table API 與SQL

Flink sql的實現

Flink如何實現動態表與靜態表的Join操做

一文解析Flink SQL工做流程

Flink1.9-table/SQLAPI

【Flink SQL引擎】:Calcite 功能簡析及在 Flink 的應用

Apache Calcite 處理流程詳解(一)

Apache Calcite 優化器詳解(二)

揭祕 Flink 1.9 新架構,Blink Planner 你會用了嗎?

Flink 原理與實現:Table & SQL API | Jark's Blog

相關文章
相關標籤/搜索