本文將簡述Flink SQL / Table API的內部實現,爲你們把 "從SQL語句到具體執行" 這個流程串起來。而且儘可能多提供調用棧,這樣你們在遇到問題時就知道應該從什麼地方設置斷點,對總體架構理解也能更加深刻。html
SQL流程中涉及到幾個重要的節點舉例以下:java
// NOTE : 執行順序是從上至下, " -----> " 表示生成的實例類型 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * | * SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode * | * | * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * | * FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules * VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan * | * | * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,邏輯執行計劃 * | * | * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin * VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃 * | * | * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃 * | * | * StreamExecJoin.translateToPlanInternal // 做用是生成 StreamOperator, 即Flink算子 * | * | * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask * | * | * StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask調用StreamingJoinOperator,真實的執行 * | * |
後續咱們會以這個圖爲脈絡進行講解。node
Flink Table API&SQL 爲流式數據和靜態數據的關係查詢保留統一的接口,並且利用了Apache Calcite的查詢優化框架和SQL parser。mysql
爲何Flink要使用Table API呢?總結來講,關係型API的好處以下:算法
Calcite是這裏面的核心成員。Apache Calcite是面向Hadoop新的sql引擎,它提供了標準的SQL語言、多種查詢優化和鏈接各類數據源的能力。sql
下面是 Calcite 概念梳理:數據庫
Sql 的執行過程通常能夠分爲四個階段,Calcite 與這個很相似,但Calcite是分紅五個階段 :express
SQL 解析階段,生成AST(抽象語法樹)(SQL–>SqlNode)apache
SqlNode 驗證(SqlNode–>SqlNode)編程
語義分析,生成邏輯計劃(Logical Plan)(SqlNode–>RelNode/RexNode)
優化階段,按照相應的規則(Rule)進行優化(RelNode–>RelNode)
生成ExecutionPlan,生成物理執行計劃(DataStream Plan)
Flink承載了 Table API 和 SQL API 兩套表達方式。它以Apache Calcite這個SQL解析器作SQL語義解析,統一輩子成爲 Calcite Logical Plan(SqlNode 樹);隨後驗證;再利用 Calcite的優化器優化轉換規則和logical plan,根據數據源的性質(流和批)使用不一樣的規則進行優化,優化爲 RelNode 邏輯執行計劃樹;最終優化後的plan轉成常規的Flink DataSet 或 DataStream 程序。任何對於DataStream API和DataSet API的性能調優提高都可以自動地提高Table API或者SQL查詢的效率。
一條stream sql從提交到calcite解析、優化最後到Flink引擎執行,通常分爲如下幾個階段:
而若是是經過table api來提交任務的話,也會通過calcite優化等階段,基本流程和直接運行sql相似:
能夠看出來,Table API 與 SQL 在獲取 RelNode 以後是同樣的流程,只是獲取 RelNode 的方式有所區別:
TableEnvironment對象是Table API和SQL集成的一個核心,支持如下場景:
一個查詢中只能綁定一個指定的TableEnvironment,TableEnvironment能夠經過來配置TableConfig來配置,經過TableConfig能夠自定義查詢優化以及translation的進程。
TableEnvironment執行過程以下:
TableEnvironment.sql()爲調用入口;
Flink實現了FlinkPlannerImpl,執行parse(sql),validate(sqlNode),rel(sqlNode)操做;
生成Table;
具體代碼摘要以下
package org.apache.Flink.table.api.internal; @Internal public class TableEnvironmentImpl implements TableEnvironment { private final CatalogManager catalogManager; private final ModuleManager moduleManager; private final OperationTreeBuilder operationTreeBuilder; private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>(); protected final TableConfig tableConfig; protected final Executor execEnv; protected final FunctionCatalog functionCatalog; protected final Planner planner; protected final Parser parser; } // 在程序中打印類內容以下 this = {StreamTableEnvironmentImpl@4701} functionCatalog = {FunctionCatalog@4702} scalaExecutionEnvironment = {StreamExecutionEnvironment@4703} planner = {StreamPlanner@4704} config = {TableConfig@4708} executor = {StreamExecutor@4709} PlannerBase.config = {TableConfig@4708} functionCatalog = {FunctionCatalog@4702} catalogManager = {CatalogManager@1250} isStreamingMode = true plannerContext = {PlannerContext@4711} parser = {ParserImpl@4696} catalogManager = {CatalogManager@1250} moduleManager = {ModuleManager@4705} operationTreeBuilder = {OperationTreeBuilder@4706} bufferedModifyOperations = {ArrayList@4707} size = 0 tableConfig = {TableConfig@4708} execEnv = {StreamExecutor@4709} TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4702} TableEnvironmentImpl.planner = {StreamPlanner@4704} parser = {ParserImpl@4696} registration = {TableEnvironmentImpl$1@4710}
Catalog – 定義元數據和命名空間,包含 Schema(庫),Table(表),RelDataType(類型信息)。
全部對數據庫和表的元數據信息都存放在Flink CataLog內部目錄結構中,其存放了Flink內部全部與Table相關的元數據信息,包括表結構信息/數據源信息等。
// TableEnvironment裏面包含一個CatalogManager public final class CatalogManager { // A map between names and catalogs. private Map<String, Catalog> catalogs; } // Catalog接口 public interface Catalog { ...... default Optional<TableFactory> getTableFactory() { return Optional.empty(); } ...... } // 當數據來源是在程序裏面自定義的時候,對應是GenericInMemoryCatalog public class GenericInMemoryCatalog extends AbstractCatalog { public static final String DEFAULT_DB = "default"; private final Map<String, CatalogDatabase> databases; private final Map<ObjectPath, CatalogBaseTable> tables; private final Map<ObjectPath, CatalogFunction> functions; private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions; private final Map<ObjectPath, CatalogTableStatistics> tableStats; private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats; private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats; private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats; } // 程序中調試的內容 catalogManager = {CatalogManager@4646} catalogs = {LinkedHashMap@4652} size = 1 "default_catalog" -> {GenericInMemoryCatalog@4659} key = "default_catalog" value = {char[15]@4668} hash = 552406043 value = {GenericInMemoryCatalog@4659} databases = {LinkedHashMap@4660} size = 1 tables = {LinkedHashMap@4661} size = 0 functions = {LinkedHashMap@4662} size = 0 partitions = {LinkedHashMap@4663} size = 0 tableStats = {LinkedHashMap@4664} size = 0 tableColumnStats = {LinkedHashMap@4665} size = 0 partitionStats = {LinkedHashMap@4666} size = 0 partitionColumnStats = {LinkedHashMap@4667} size = 0 catalogName = "default_catalog" defaultDatabase = "default_database" temporaryTables = {HashMap@4653} size = 2 currentCatalogName = "default_catalog" currentDatabaseName = "default_database" builtInCatalogName = "default_catalog"
StreamPlanner是新的Blink Planner一種。
Flink Table 的新架構實現了查詢處理器的插件化,社區完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,用戶能夠自行選擇使用 Old Planner 仍是 Blink Planner。
在模型上,Old Planner 沒有考慮流計算做業和批處理做業的統一,針對流計算做業和批處理做業的實現不盡相同,在底層會分別翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 將批數據集看做 bounded DataStream (有界流式數據) ,流計算做業和批處理做業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分別實現了BatchPlanner 和 StreamPlanner ,二者共用了大部分代碼,共享了不少優化邏輯。 Old Planner 針對批處理和流計算的代碼實現的是徹底獨立的兩套體系,基本沒有實現代碼和優化邏輯複用。
除了模型和架構上的優勢外,Blink Planner 沉澱了許多實用功能,集中在三個方面:
具體對應代碼來看,StreamPlanner體如今translateToPlan會調用到不一樣的 StreamOperator 生成系統上。
class StreamPlanner( executor: Executor, config: TableConfig, functionCatalog: FunctionCatalog, catalogManager: CatalogManager) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) { override protected def translateToPlan( execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = { execNodes.map { case node: StreamExecNode[_] => node.translateToPlan(this) case _ => throw new TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") } } } @Internal public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl implements StreamTableEnvironment { private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) { // 在轉換回DataStream時候進行調用 planner 生成plan的操做。 List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation)); Transformation<T> transformation = getTransformation(table, transformations); executionEnvironment.addOperator(transformation); return new DataStream<>(executionEnvironment, transformation); } } // 程序中調試打印的運行棧 translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec) translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec) translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation) apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) map:234, TraversableLike$class (scala.collection) map:104, AbstractTraversable (scala.collection) translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation) translate:153, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport)
Flink實現了FlinkPlannerImpl,作爲和Calcite 聯繫的橋樑,執行parse(sql),validate(sqlNode),rel(sqlNode)操做。
class FlinkPlannerImpl( config: FrameworkConfig, catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader], typeFactory: FlinkTypeFactory, cluster: RelOptCluster) { val operatorTable: SqlOperatorTable = config.getOperatorTable val parser: CalciteParser = new CalciteParser(config.getParserConfig) val convertletTable: SqlRexConvertletTable = config.getConvertletTable val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig } // 這裏會有使用 FlinkPlannerImpl public class ParserImpl implements Parser { private final CatalogManager catalogManager; private final Supplier<FlinkPlannerImpl> validatorSupplier; private final Supplier<CalciteParser> calciteParserSupplier; @Override public List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); // 這裏會有使用 FlinkPlannerImpl FlinkPlannerImpl planner = validatorSupplier.get(); // parse the sql query SqlNode parsed = parser.parse(statement); Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement)); return Collections.singletonList(operation); } } // 程序中調試的內容 planner = {FlinkPlannerImpl@4659} config = {Frameworks$StdFrameworkConfig@4685} catalogReaderSupplier = {PlannerContext$lambda@4686} typeFactory = {FlinkTypeFactory@4687} cluster = {FlinkRelOptCluster@4688} operatorTable = {ChainedSqlOperatorTable@4689} parser = {CalciteParser@4690} convertletTable = {StandardConvertletTable@4691} sqlToRelConverterConfig = {SqlToRelConverter$ConfigImpl@4692} validator = null // 程序調用棧之一 validate:104, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite) convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations) parse:66, ParserImpl (org.apache.Flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal) main:82, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // 程序調用棧之二 rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite) toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations) convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations) convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations) parse:66, ParserImpl (org.apache.Flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal) main:82, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport)
從代碼中能看出,這就是個把各類相關操做和信息封裝起來類而已,並不涉及太多實際邏輯。
@Internal public class TableImpl implements Table { private static final AtomicInteger uniqueId = new AtomicInteger(0); private final TableEnvironment tableEnvironment; private final QueryOperation operationTree; private final OperationTreeBuilder operationTreeBuilder; private final LookupCallResolver lookupResolver; private TableImpl joinInternal( Table right, Optional<Expression> joinPredicate, JoinType joinType) { verifyTableCompatible(right); return createTable(operationTreeBuilder.join( this.operationTree, right.getQueryOperation(), joinType, joinPredicate, false)); } } // 程序中調試的內容 view = {TableImpl@4583} "UnnamedTable$0" tableEnvironment = {StreamTableEnvironmentImpl@4580} functionCatalog = {FunctionCatalog@4646} scalaExecutionEnvironment = {StreamExecutionEnvironment@4579} planner = {StreamPlanner@4647} catalogManager = {CatalogManager@4644} moduleManager = {ModuleManager@4648} operationTreeBuilder = {OperationTreeBuilder@4649} bufferedModifyOperations = {ArrayList@4650} size = 0 tableConfig = {TableConfig@4651} execEnv = {StreamExecutor@4652} TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4646} TableEnvironmentImpl.planner = {StreamPlanner@4647} parser = {ParserImpl@4653} registration = {TableEnvironmentImpl$1@4654} operationTree = {ScalaDataStreamQueryOperation@4665} identifier = null dataStream = {DataStreamSource@4676} fieldIndices = {int[2]@4677} tableSchema = {TableSchema@4678} "root\n |-- orderId: STRING\n |-- productName: STRING\n" operationTreeBuilder = {OperationTreeBuilder@4649} config = {TableConfig@4651} functionCatalog = {FunctionCatalog@4646} tableReferenceLookup = {TableEnvironmentImpl$lambda@4668} lookupResolver = {LookupCallResolver@4669} projectionOperationFactory = {ProjectionOperationFactory@4670} sortOperationFactory = {SortOperationFactory@4671} calculatedTableFactory = {CalculatedTableFactory@4672} setOperationFactory = {SetOperationFactory@4673} aggregateOperationFactory = {AggregateOperationFactory@4674} joinOperationFactory = {JoinOperationFactory@4675} lookupResolver = {LookupCallResolver@4666} functionLookup = {FunctionCatalog@4646} tableName = "UnnamedTable$0" value = {char[14]@4667} hash = 1355882650
這裏對應前面脈絡圖,做用是生成了 SqlJoin 這樣的 SqlNode
// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * |
Calcite 使用 JavaCC 作 SQL 解析,JavaCC 根據 Calcite 中定義的 Parser.jj 文件,生成一系列的 java 代碼,生成的 Java 代碼會把 SQL 轉換成 AST 的數據結構(這裏是 SqlNode 類型)。
即:把 SQL 轉換成爲 AST (抽象語法樹),在 Calcite 中用 SqlNode 來表示;
package org.apache.Flink.table.planner.delegation; public class ParserImpl implements Parser { @Override public List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); // parse the sql query SqlNode parsed = parser.parse(statement); Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement)); return Collections.singletonList(operation); } } // 打印出來解析以後 parsed 的內容,咱們能看到 SqlNode 的基本格式。 parsed = {SqlBasicCall@4690} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2\nUNION ALL\nSELECT *\nFROM `OrderB`\nWHERE `amount` < 2" operator = {SqlSetOperator@4716} "UNION ALL" all = true name = "UNION ALL" kind = {SqlKind@4742} "UNION" leftPrec = 14 rightPrec = 15 returnTypeInference = {ReturnTypes$lambda@4743} operandTypeInference = null operandTypeChecker = {SetopOperandTypeChecker@4744} operands = {SqlNode[2]@4717} 0 = {SqlSelect@4746} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2" 1 = {SqlSelect@4747} "SELECT *\nFROM `OrderB`\nWHERE `amount` < 2" functionQuantifier = null expanded = false pos = {SqlParserPos@4719} "line 2, column 1" // 下面是調試相關Stack,能夠幫助你們深刻理解 SqlStmt:3208, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl) SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl) parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl) parseQuery:160, SqlParser (org.apache.calcite.sql.parser) parseStmt:187, SqlParser (org.apache.calcite.sql.parser) parse:48, CalciteParser (org.apache.Flink.table.planner.calcite) parse:64, ParserImpl (org.apache.Flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal) main:82, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // 另外一個參考 in FlinkSqlParserImpl.FromClause e = {SqlJoin@4709} "`Orders` AS `o`\nLEFT JOIN `Payment` AS `p` ON `o`.`orderId` = `p`.`orderId`" left = {SqlBasicCall@4676} "`Orders` AS `o`" operator = {SqlAsOperator@4752} "AS" operands = {SqlNode[2]@4753} functionQuantifier = null expanded = false pos = {SqlParserPos@4755} "line 7, column 3" natural = {SqlLiteral@4677} "FALSE" typeName = {SqlTypeName@4775} "BOOLEAN" value = {Boolean@4776} false pos = {SqlParserPos@4777} "line 7, column 13" joinType = {SqlLiteral@4678} "LEFT" typeName = {SqlTypeName@4758} "SYMBOL" value = {JoinType@4759} "LEFT" pos = {SqlParserPos@4724} "line 7, column 26" right = {SqlBasicCall@4679} "`Payment` AS `p`" operator = {SqlAsOperator@4752} "AS" operands = {SqlNode[2]@4763} functionQuantifier = null expanded = false pos = {SqlParserPos@4764} "line 7, column 31" conditionType = {SqlLiteral@4680} "ON" typeName = {SqlTypeName@4758} "SYMBOL" value = {JoinConditionType@4771} "ON" pos = {SqlParserPos@4772} "line 7, column 44" condition = {SqlBasicCall@4681} "`o`.`orderId` = `p`.`orderId`" operator = {SqlBinaryOperator@4766} "=" operands = {SqlNode[2]@4767} functionQuantifier = null expanded = false pos = {SqlParserPos@4768} "line 7, column 47" pos = {SqlParserPos@4724} "line 7, column 26" // 下面是調試相關Stack,能夠幫助你們深刻理解 FromClause:10192, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) SqlSelect:5918, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) LeafQuery:630, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) LeafQueryOrExpr:15651, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) QueryOrExpr:15118, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) OrderedQueryOrExpr:504, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) SqlStmt:3693, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl) parseQuery:160, SqlParser (org.apache.calcite.sql.parser) parseStmt:187, SqlParser (org.apache.calcite.sql.parser) parse:48, CalciteParser (org.apache.flink.table.planner.calcite) parse:64, ParserImpl (org.apache.flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal) main:73, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport)
通過上面的第一步,會生成一個 SqlNode 對象,它是一個未經驗證的抽象語法樹,下面就進入了一個語法檢查階段,語法檢查前須要知道元數據信息,這個檢查會包括表名、字段名、函數名、數據類型的檢查。
即:語法檢查,根據元數據信息進行語法驗證,驗證以後仍是用 SqlNode 表示 AST 語法樹;
package org.apache.Flink.table.planner.operations; public class SqlToOperationConverter { public static Optional<Operation> convert( // 這裏進行validate的調用 final SqlNode validated = FlinkPlanner.validate(sqlNode); SqlToOperationConverter converter = new SqlToOperationConverter(FlinkPlanner, catalogManager); } } // 打印出來解析以後 validated 的內容。 validated = {SqlBasicCall@4675} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2\nUNION ALL\nSELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2" operator = {SqlSetOperator@5000} "UNION ALL" all = true name = "UNION ALL" kind = {SqlKind@5029} "UNION" leftPrec = 14 rightPrec = 15 returnTypeInference = {ReturnTypes$lambda@5030} operandTypeInference = null operandTypeChecker = {SetopOperandTypeChecker@5031} operands = {SqlNode[2]@5001} 0 = {SqlSelect@4840} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2" 1 = {SqlSelect@5026} "SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2" functionQuantifier = null expanded = false pos = {SqlParserPos@5003} "line 2, column 1" // 下面是調試相關Stack,能夠幫助你們深刻理解 validate:81, AbstractNamespace (org.apache.calcite.sql.validate) validateNamespace:1008, SqlValidatorImpl (org.apache.calcite.sql.validate) validateQuery:968, SqlValidatorImpl (org.apache.calcite.sql.validate) validateCall:90, SqlSetOperator (org.apache.calcite.sql) validateCall:5304, SqlValidatorImpl (org.apache.calcite.sql.validate) validate:116, SqlCall (org.apache.calcite.sql) validateScopedExpression:943, SqlValidatorImpl (org.apache.calcite.sql.validate) validate:650, SqlValidatorImpl (org.apache.calcite.sql.validate) org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$validate:126, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite) validate:105, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite) convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations) parse:66, ParserImpl (org.apache.Flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal) main:82, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport)
脈絡圖中,這時候來到了
// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * | * SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode * | * | * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * |
通過第二步以後,這裏的 SqlNode 就是通過語法校驗的 SqlNode 樹,接下來這一步就是將 SqlNode 轉換成 RelNode/RexNode,也就是生成相應的邏輯計劃(Logical Plan)
即:語義分析,根據 SqlNode及元信息構建 RelNode 樹,也就是最第一版本的邏輯計劃(Logical Plan);
根據這個已經生成的Flink的logical Plan,將它轉換成calcite的logicalPlan,這樣咱們才能用到calcite強大的優化規則。
Flink由上往下依次調用各個節點的construct方法,將Flink節點轉換成calcite的RelNode節點。真正的實現是在 convertQueryRecursive()
方法中完成的。
好比生成 LogicalProject 調用關係大概以下:
createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core) createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel) convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel) convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel) convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel) convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel) convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel) org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite) rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite) toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations) convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations) convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations) parse:66, ParserImpl (org.apache.flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal) main:73, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport)
具體詳細源碼以下:
SqlToRelConverter 中的 convertQuery() 將 SqlNode 轉換爲 RelRoot public class SqlToRelConverter { public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) { if (needsValidation) { query = this.validator.validate(query); } RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(this.cluster.getMetadataProvider())); RelNode result = this.convertQueryRecursive(query, top, (RelDataType)null).rel; if (top && isStream(query)) { result = new LogicalDelta(this.cluster, ((RelNode)result).getTraitSet(), (RelNode)result); } RelCollation collation = RelCollations.EMPTY; if (!query.isA(SqlKind.DML) && isOrdered(query)) { collation = this.requiredCollation((RelNode)result); } this.checkConvertedType(query, (RelNode)result); RelDataType validatedRowType = this.validator.getValidatedNodeType(query); // 這裏設定了Root return RelRoot.of((RelNode)result, validatedRowType, query.getKind()).withCollation(collation); } } // 在這裏打印 toQueryOperation:523, SqlToOperationConverter (org.apache.Flink.table.planner.operations) // 獲得以下內容,能夠看到一個RelRoot的真實結構 relational = {RelRoot@5248} "Root {kind: UNION, rel: LogicalUnion#6, rowType: RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount), fields: [<0, user>, <1, product>, <2, amount>], collation: []}" rel = {LogicalUnion@5227} "LogicalUnion#6" inputs = {RegularImmutableList@5272} size = 2 kind = {SqlKind@5029} "UNION" all = true desc = "LogicalUnion#6" rowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)" digest = "LogicalUnion#6" cluster = {FlinkRelOptCluster@4800} id = 6 traitSet = {RelTraitSet@5273} size = 5 validatedRowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)" kind = {StructKind@5268} "FULLY_QUALIFIED" nullable = false fieldList = {RegularImmutableList@5269} size = 3 digest = "RecordType(BIGINT user, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, INTEGER amount) NOT NULL" kind = {SqlKind@5029} "UNION" lowerName = "union" sql = "UNION" name = "UNION" ordinal = 18 fields = {RegularImmutableList@5254} size = 3 {Integer@5261} 0 -> "user" {Integer@5263} 1 -> "product" {Integer@5265} 2 -> "amount" collation = {RelCollationImpl@5237} "[]" fieldCollations = {RegularImmutableList@5256} size = 0 // 調用棧內容 convertQuery:561, SqlToRelConverter (org.apache.calcite.sql2rel) org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite) rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite) toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations) convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations) convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations) parse:66, ParserImpl (org.apache.Flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal) main:82, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // 再次舉例,生成了LogicalProject bb = {SqlToRelConverter$Blackboard@4978} scope = {SelectScope@4977} nameToNodeMap = null root = {LogicalProject@5100} "LogicalProject#4" exps = {RegularImmutableList@5105} size = 3 input = {LogicalJoin@5106} "LogicalJoin#3" desc = "LogicalProject#4" rowType = {RelRecordType@5107} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) payType)" digest = "LogicalProject#4" cluster = {FlinkRelOptCluster@4949} id = 4 traitSet = {RelTraitSet@5108} size = 5 inputs = {Collections$SingletonList@5111} size = 1 mapCorrelateToRex = {HashMap@5112} size = 0 isPatternVarRef = false cursors = {ArrayList@5113} size = 0 subQueryList = {LinkedHashSet@5114} size = 0 agg = null window = null mapRootRelToFieldProjection = {HashMap@5115} size = 0 columnMonotonicities = {ArrayList@5116} size = 3 systemFieldList = {ArrayList@5117} size = 0 top = true initializerExpressionFactory = {NullInitializerExpressionFactory@5118} this$0 = {SqlToRelConverter@4926} // 舉例,LogicalProject是在這裏生成的。 protected void convertFrom(SqlToRelConverter.Blackboard bb, SqlNode from) { case JOIN: RelNode joinRel = this.createJoin(fromBlackboard, leftRel, rightRel, conditionExp, convertedJoinType); bb.setRoot(joinRel, false); } // 相關調用棧 createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core) createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel) convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel) convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel) convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel) convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel) convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel) org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite) rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite) toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations) convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations) convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations) parse:66, ParserImpl (org.apache.flink.table.planner.delegation) sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal) main:73, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport)
這時候,脈絡圖到了這裏
// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * | * SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode * | * | * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * | * FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules * VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan * | * | * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,邏輯執行計劃 * | * | * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin * VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃 * | * | * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃 * | * |
第四階段,也就是 Calcite 的核心所在。
即:邏輯計劃優化,優化器的核心,根據前面生成的邏輯計劃按照相應的規則(Rule)進行優化;
Flink的這部分實現統一封裝在optimize方法裏頭。這部分涉及到多個階段,每一個階段都是用Rule對邏輯計劃進行優化和改進。
在 Calcite 架構中,最核心地方就是 Optimizer,也就是優化器,一個 Optimization Engine 包含三個組成部分:
優化器的做用是將解析器生成的關係代數表達式轉換成執行計劃,供執行引擎執行,在這個過程當中,會應用一些規則優化,以幫助生成更高效的執行計劃。優化器進行優化的地方如過濾條件的下壓(push down),在進行 join 操做前,先進行 filter 操做,這樣的話就不須要在 join 時進行全量 join,減小參與 join 的數據量等。
Calcite 中 RelOptPlanner 是 Calcite 中優化器的基類。Calcite 中關於優化器提供了兩種實現:
基於代價的優化器(Cost-Based Optimizer,CBO) 是根據優化規則對關係表達式進行轉換。這裏的轉換是說一個關係表達式通過優化規則後會生成另一個關係表達式,同時原有表達式也會保留,通過一系列轉換後會生成多個執行計劃,而後 CBO 會根據統計信息和代價模型 (Cost Model) 計算每一個執行計劃的 Cost,從中挑選 Cost 最小的執行計劃。
由上可知,CBO 中有兩個依賴:統計信息和代價模型。統計信息的準確與否、代價模型的合理與否都會影響 CBO 選擇最優計劃。 從上述描述可知,CBO 是優於 RBO 的,緣由是 RBO 是一種只認規則,對數據不敏感的呆板的優化器,而在實際過程當中,數據每每是有變化的,經過 RBO 生成的執行計劃頗有可能不是最優的。事實上目前各大數據庫和大數據計算引擎都傾向於使用 CBO,可是對於流式計算引擎來講,使用 CBO 仍是有很大難度的,由於並不能提早預知數據量等信息,這會極大地影響優化效果,CBO 主要仍是應用在離線的場景。
VolcanoPlanner就是 CBO 的實現,它會一直迭代 rules,直到找到 cost 最小的 paln。其部分相關概念以下:
rels
中;best
)和最佳 plan 的 cost(bestCost
)信息。在應用 VolcanoPlanner 時,總體分爲如下四步:
Convention
);setRoot()
方法註冊相應的 RelNode,並進行相應的初始化操做;下面經過這個 示例 來詳細看下 VolcanoPlanner 內部的實現邏輯。
//1. 初始化 VolcanoPlanner 對象,並添加相應的 Rule VolcanoPlanner planner = new VolcanoPlanner(); planner.addRelTraitDef(ConventionTraitDef.INSTANCE); planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE); // 添加相應的 rule planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN); planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE); planner.addRule(PruneEmptyRules.PROJECT_INSTANCE); // 添加相應的 ConverterRule planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE); planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE); planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE); planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE); planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE); //2. Changes a relational expression to an equivalent one with a different set of traits. RelTraitSet desiredTraits = relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE); relNode = planner.changeTraits(relNode, desiredTraits); //3. 經過 VolcanoPlanner 的 setRoot 方法註冊相應的 RelNode,並進行相應的初始化操做 planner.setRoot(relNode); //4. 經過動態規劃算法找到 cost 最小的 plan relNode = planner.findBestExp();
Flink 中相關代碼以下:
public PlannerContext( TableConfig tableConfig, FunctionCatalog functionCatalog, CatalogManager catalogManager, CalciteSchema rootSchema, List<RelTraitDef> traitDefs) { this.tableConfig = tableConfig; this.context = new FlinkContextImpl( tableConfig, functionCatalog, catalogManager, this::createSqlExprToRexConverter); this.rootSchema = rootSchema; this.traitDefs = traitDefs; // Make a framework config to initialize the RelOptCluster instance, // caution that we can only use the attributes that can not be overwrite/configured // by user. this.frameworkConfig = createFrameworkConfig(); // 這裏使用了VolcanoPlanner RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext()); planner.setExecutor(frameworkConfig.getExecutor()); for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) { planner.addRelTraitDef(traitDef); } this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory)); } //初始化 <init>:119, PlannerContext (org.apache.Flink.table.planner.delegation) <init>:86, PlannerBase (org.apache.Flink.table.planner.delegation) <init>:44, StreamPlanner (org.apache.Flink.table.planner.delegation) create:50, BlinkPlannerFactory (org.apache.Flink.table.planner.delegation) create:325, StreamTableEnvironmentImpl$ (org.apache.Flink.table.api.scala.internal) create:425, StreamTableEnvironment$ (org.apache.Flink.table.api.scala) main:56, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends FlinkRuleSetProgram[OC] { override def optimize(root: RelNode, context: OC): RelNode = { val targetTraits = root.getTraitSet.plusAll(requiredOutputTraits.get).simplify() // VolcanoPlanner limits that the planer a RelNode tree belongs to and // the VolcanoPlanner used to optimize the RelNode tree should be same instance. // see: VolcanoPlanner#registerImpl // here, use the planner in cluster directly // 這裏也使用了VolcanoPlanner val planner = root.getCluster.getPlanner.asInstanceOf[VolcanoPlanner] val optProgram = Programs.ofRules(rules) } } // 其調用棧 optimize:60, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // 下面所有是 VolcanoPlanner 相關代碼和調用棧 // VolcanoPlanner添加Rule,篩選出來的優化規則會封裝成VolcanoRuleMatch,而後扔到RuleQueue裏,而這個RuleQueue正是接下來執行動態規劃算法要用到的核心類。 public class VolcanoPlanner extends AbstractRelOptPlanner { public boolean addRule(RelOptRule rule) { ...... } } addRule:438, VolcanoPlanner (org.apache.calcite.plan.volcano) run:315, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // VolcanoPlanner修改Traits public class VolcanoPlanner extends AbstractRelOptPlanner { public RelNode changeTraits(RelNode rel, RelTraitSet toTraits) { assert !rel.getTraitSet().equals(toTraits); assert toTraits.allSimple(); RelSubset rel2 = this.ensureRegistered(rel, (RelNode)null); return rel2.getTraitSet().equals(toTraits) ? rel2 : rel2.set.getOrCreateSubset(rel.getCluster(), toTraits.simplify()); } } changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano) run:324, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // VolcanoPlanner設定Root public class VolcanoPlanner extends AbstractRelOptPlanner { public void setRoot(RelNode rel) { this.registerMetadataRels(); this.root = this.registerImpl(rel, (RelSet)null); if (this.originalRoot == null) { this.originalRoot = rel; } this.ruleQueue.recompute(this.root); this.ensureRootConverters(); } } setRoot:294, VolcanoPlanner (org.apache.calcite.plan.volcano) run:326, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport) // VolcanoPlanner找到最小cost,本質上就是一個動態規劃算法的實現。 public class VolcanoPlanner extends AbstractRelOptPlanner { public RelNode findBestExp() { this.ensureRootConverters(); this.registerMaterializations(); int cumulativeTicks = 0; VolcanoPlannerPhase[] var2 = VolcanoPlannerPhase.values(); int var3 = var2.length; for(int var4 = 0; var4 < var3; ++var4) { VolcanoPlannerPhase phase = var2[var4]; this.setInitialImportance(); RelOptCost targetCost = this.costFactory.makeHugeCost(); int tick = 0; int firstFiniteTick = -1; int splitCount = 0; int giveUpTick = 2147483647; while(true) { ++tick; ++cumulativeTicks; if (this.root.bestCost.isLe(targetCost)) { if (firstFiniteTick < 0) { firstFiniteTick = cumulativeTicks; this.clearImportanceBoost(); } if (!this.ambitious) { break; } targetCost = this.root.bestCost.multiplyBy(0.9D); ++splitCount; if (this.impatient) { if (firstFiniteTick < 10) { giveUpTick = cumulativeTicks + 25; } else { giveUpTick = cumulativeTicks + Math.max(firstFiniteTick / 10, 25); } } } else { if (cumulativeTicks > giveUpTick) { break; } if (this.root.bestCost.isInfinite() && tick % 10 == 0) { this.injectImportanceBoost(); } } VolcanoRuleMatch match = this.ruleQueue.popMatch(phase); if (match == null) { break; } assert match.getRule().matches(match); match.onMatch(); this.root = this.canonize(this.root); } this.ruleQueue.phaseCompleted(phase); } RelNode cheapest = this.root.buildCheapestPlan(this); return cheapest; } } // VolcanoPlanner獲得的Flink邏輯節點 cheapest,就是最終選擇的結點 cheapest = {FlinkLogicalUnion@6487} "FlinkLogicalUnion#443" cluster = {FlinkRelOptCluster@6224} inputs = {RegularImmutableList@6493} size = 2 0 = {FlinkLogicalCalc@6498} "FlinkLogicalCalc#441" cluster = {FlinkRelOptCluster@6224} calcProgram = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])" program = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])" input = {FlinkLogicalDataStreamTableScan@6510} "rel#437:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, UnnamedTable$0])" desc = "FlinkLogicalCalc#441" rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)" digest = "FlinkLogicalCalc#441" AbstractRelNode.cluster = {FlinkRelOptCluster@6224} id = 441 traitSet = {RelTraitSet@5942} size = 5 1 = {FlinkLogicalCalc@6499} "FlinkLogicalCalc#442" cluster = {FlinkRelOptCluster@6224} calcProgram = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])" program = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])" input = {FlinkLogicalDataStreamTableScan@6503} "rel#435:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, OrderB])" desc = "FlinkLogicalCalc#442" rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)" digest = "FlinkLogicalCalc#442" AbstractRelNode.cluster = {FlinkRelOptCluster@6224} id = 442 traitSet = {RelTraitSet@5942} size = 5 kind = {SqlKind@6494} "UNION" lowerName = "union" sql = "UNION" name = "UNION" ordinal = 18 all = true desc = "FlinkLogicalUnion#443" rowType = null digest = "FlinkLogicalUnion#443" AbstractRelNode.cluster = {FlinkRelOptCluster@6224} id = 443 traitSet = {RelTraitSet@5942} size = 5 findBestExp:572, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport)
如下是Join的優化 。
class FlinkLogicalJoin( cluster: RelOptCluster, traitSet: RelTraitSet, left: RelNode, right: RelNode, condition: RexNode, joinType: JoinRelType) extends FlinkLogicalJoinBase( override def convert(rel: RelNode): RelNode = { val join = rel.asInstanceOf[LogicalJoin] val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL) val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL) val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL) new FlinkLogicalJoin( rel.getCluster, traitSet, newLeft, newRight, join.getCondition, join.getJoinType) } } call = {VolcanoRuleMatch@6191} "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]" targetSet = {RelSet@6193} targetSubset = null digest = "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]" cachedImportance = 0.8019000000000001 volcanoPlanner = {VolcanoPlanner@6194} generatedRelList = null id = 71 operand0 = {RelOptRule$ConverterRelOptRuleOperand@6186} parent = null rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)" predicate = {ConverterRule$lambda@6246} solveOrder = {int[1]@6247} ordinalInParent = 0 ordinalInRule = 0 trait = {Convention$Impl@6184} "NONE" clazz = {Class@5010} "class org.apache.calcite.rel.logical.LogicalJoin" children = {RegularImmutableList@6230} size = 0 childPolicy = {RelOptRuleOperandChildPolicy@6248} "ANY" nodeInputs = {RegularImmutableBiMap@6195} size = 0 rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)" rels = {RelNode[1]@6196} 0 = {LogicalJoin@6181} "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)" semiJoinDone = false systemFieldList = {RegularImmutableList@6230} size = 0 condition = {RexCall@6231} "=($0, $2)" variablesSet = {RegularImmutableSet@6232} size = 0 joinType = {JoinRelType@6233} "LEFT" joinInfo = {JoinInfo@6234} left = {RelSubset@6235} "rel#98:Subset#0.NONE.any.None: 0.false.UNKNOWN" right = {RelSubset@6236} "rel#99:Subset#1.NONE.any.None: 0.false.UNKNOWN" desc = "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)" rowType = {RelRecordType@6237} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) orderId0, VARCHAR(2147483647) payType)" digest = "LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)" cluster = {FlinkRelOptCluster@6239} id = 100 traitSet = {RelTraitSet@6240} size = 5 planner = {VolcanoPlanner@6194} parents = null // 生成時候的調用棧 create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical) convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical) onMatch:144, ConverterRule (org.apache.calcite.rel.convert) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.flink.table.planner.delegation) translate:151, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:146, TableConversions (org.apache.flink.table.api.scala) main:75, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport) abstract class FlinkLogicalJoinBase( cluster: RelOptCluster, traitSet: RelTraitSet, left: RelNode, right: RelNode, condition: RexNode, joinType: JoinRelType) extends Join( cluster, traitSet, left, right, condition, Set.empty[CorrelationId].asJava, joinType) with FlinkLogicalRel { // 這裏也會計算cost override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = { val leftRowCnt = mq.getRowCount(getLeft) val leftRowSize = mq.getAverageRowSize(getLeft) val rightRowCnt = mq.getRowCount(getRight) joinType match { case JoinRelType.SEMI | JoinRelType.ANTI => val rightRowSize = mq.getAverageRowSize(getRight) val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize) val cpuCost = leftRowCnt + rightRowCnt val rowCnt = leftRowCnt + rightRowCnt planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost) case _ => val cpuCost = leftRowCnt + rightRowCnt val ioCost = (leftRowCnt * leftRowSize) + rightRowCnt planner.getCostFactory.makeCost(leftRowCnt, cpuCost, ioCost) } } } // 調用棧 computeSelfCost:63, FlinkLogicalJoin (org.apache.flink.table.planner.plan.nodes.logical) getNonCumulativeCost:41, FlinkRelMdNonCumulativeCost (org.apache.flink.table.planner.plan.metadata) getNonCumulativeCost_$:-1, GeneratedMetadataHandler_NonCumulativeCost getNonCumulativeCost:-1, GeneratedMetadataHandler_NonCumulativeCost getNonCumulativeCost:301, RelMetadataQuery (org.apache.calcite.rel.metadata) getCost:936, VolcanoPlanner (org.apache.calcite.plan.volcano) propagateCostImprovements0:347, RelSubset (org.apache.calcite.plan.volcano) propagateCostImprovements:330, RelSubset (org.apache.calcite.plan.volcano) addRelToSet:1828, VolcanoPlanner (org.apache.calcite.plan.volcano) registerImpl:1764, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:1939, VolcanoPlanner (org.apache.calcite.plan.volcano) transformTo:129, VolcanoRuleCall (org.apache.calcite.plan.volcano) transformTo:236, RelOptRuleCall (org.apache.calcite.plan) onMatch:146, ConverterRule (org.apache.calcite.rel.convert) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.flink.table.planner.delegation) translate:151, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:146, TableConversions (org.apache.flink.table.api.scala) main:75, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport)
Calcite 會基於優化規則來優化這些 Logical Plan,根據運行環境的不一樣會應用不一樣的優化規則(Flink提供了批的優化規則 和 流的優化規則)。
優化規則分爲兩類,一類是Calcite提供的內置優化規則(如條件下推,剪枝等),另外一類是是將Logical Node轉變成 Flink Node 的規則。
這兩步驟都屬於 Calcite 的優化階段。獲得的 DataStream Plan 封裝瞭如何將節點翻譯成對應 DataStream / DataSet 程序的邏輯。其步驟就是將不一樣的 DataStream/DataSet Node 經過代碼生成(CodeGen)翻譯成最終可執行的 DataStream/DataSet 程序。
下面就列舉了不一樣的Rule,每條規則會對應生成一個物理節點。好比節點內根據Calcite生成的sql的執行步驟,會進行codegen出DataSet的執行Function代碼,
package org.apache.Flink.table.plan.rules /** * RuleSet to optimize plans for batch / DataSet execution */ val DATASET_OPT_RULES: RuleSet = RuleSets.ofList( // translate to Flink DataSet nodes DataSetWindowAggregateRule.INSTANCE, DataSetAggregateRule.INSTANCE, DataSetDistinctRule.INSTANCE, DataSetCalcRule.INSTANCE, DataSetPythonCalcRule.INSTANCE, DataSetJoinRule.INSTANCE, DataSetSingleRowJoinRule.INSTANCE, DataSetScanRule.INSTANCE, DataSetUnionRule.INSTANCE, DataSetIntersectRule.INSTANCE, DataSetMinusRule.INSTANCE, DataSetSortRule.INSTANCE, DataSetValuesRule.INSTANCE, DataSetCorrelateRule.INSTANCE, BatchTableSourceScanRule.INSTANCE ) /** * RuleSet to optimize plans for stream / DataStream execution */ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( // translate to DataStream nodes DataStreamSortRule.INSTANCE, DataStreamGroupAggregateRule.INSTANCE, DataStreamOverAggregateRule.INSTANCE, DataStreamGroupWindowAggregateRule.INSTANCE, DataStreamCalcRule.INSTANCE, DataStreamScanRule.INSTANCE, DataStreamUnionRule.INSTANCE, DataStreamValuesRule.INSTANCE, DataStreamCorrelateRule.INSTANCE, DataStreamWindowJoinRule.INSTANCE, DataStreamJoinRule.INSTANCE, DataStreamTemporalTableJoinRule.INSTANCE, StreamTableSourceScanRule.INSTANCE, DataStreamMatchRule.INSTANCE, DataStreamTableAggregateRule.INSTANCE, DataStreamGroupWindowTableAggregateRule.INSTANCE, DataStreamPythonCalcRule.INSTANCE ) package org.apache.Flink.table.planner.plan.rules /** * RuleSet to do physical optimize for stream */ val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList( FlinkExpandConversionRule.STREAM_INSTANCE, // source StreamExecDataStreamScanRule.INSTANCE, StreamExecTableSourceScanRule.INSTANCE, StreamExecIntermediateTableScanRule.INSTANCE, StreamExecWatermarkAssignerRule.INSTANCE, StreamExecValuesRule.INSTANCE, // calc StreamExecCalcRule.INSTANCE, StreamExecPythonCalcRule.INSTANCE, // union StreamExecUnionRule.INSTANCE, // sort StreamExecSortRule.INSTANCE, StreamExecLimitRule.INSTANCE, StreamExecSortLimitRule.INSTANCE, StreamExecTemporalSortRule.INSTANCE, // rank StreamExecRankRule.INSTANCE, StreamExecDeduplicateRule.RANK_INSTANCE, // expand StreamExecExpandRule.INSTANCE, // group agg StreamExecGroupAggregateRule.INSTANCE, StreamExecGroupTableAggregateRule.INSTANCE, // over agg StreamExecOverAggregateRule.INSTANCE, // window agg StreamExecGroupWindowAggregateRule.INSTANCE, StreamExecGroupWindowTableAggregateRule.INSTANCE, // join StreamExecJoinRule.INSTANCE, StreamExecWindowJoinRule.INSTANCE, StreamExecTemporalJoinRule.INSTANCE, StreamExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN, StreamExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN, // CEP StreamExecMatchRule.INSTANCE, // correlate StreamExecConstantTableFunctionScanRule.INSTANCE, StreamExecCorrelateRule.INSTANCE, // sink StreamExecSinkRule.INSTANCE )
一個具體的Rule舉例 ,這裏是 Union 的 Rule :
package org.apache.Flink.table.planner.plan.rules.physical.stream class StreamExecUnionRule extends ConverterRule( classOf[FlinkLogicalUnion], FlinkConventions.LOGICAL, FlinkConventions.STREAM_PHYSICAL, "StreamExecUnionRule") { def convert(rel: RelNode): RelNode = { val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion] val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL) val newInputs = union.getInputs.map(RelOptRule.convert(_, FlinkConventions.STREAM_PHYSICAL)) // 這裏本條規則會對應生成一個物理節點。節點內根據Calcite生成的sql的執行步驟,會進行codegen出Stream的執行Function代碼, new StreamExecUnion( rel.getCluster, traitSet, newInputs, union.all, rel.getRowType) } } } public class VolcanoPlanner extends AbstractRelOptPlanner { public RelNode findBestExp() { // 在這裏會對Rule進行匹配調用 match.onMatch(); return cheapest; } } match = {VolcanoRuleMatch@6252} "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]" targetSet = {RelSet@6298} targetSubset = null digest = "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]" cachedImportance = 0.81 volcanoPlanner = {VolcanoPlanner@6259} generatedRelList = null id = 521 operand0 = {RelOptRule$ConverterRelOptRuleOperand@6247} nodeInputs = {RegularImmutableBiMap@6299} size = 0 rule = {StreamExecUnionRule@6241} "StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)" rels = {RelNode[1]@6300} planner = {VolcanoPlanner@6259} parents = null // 調用棧 create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical) convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical) onMatch:144, ConverterRule (org.apache.calcite.rel.convert) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.flink.table.planner.delegation) translate:151, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:146, TableConversions (org.apache.flink.table.api.scala) main:75, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport) // 調用棧 convert:46, StreamExecUnionRule (org.apache.Flink.table.planner.plan.rules.physical.stream) onMatch:144, ConverterRule (org.apache.calcite.rel.convert) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport)
另外一個具體的Rule舉例 ,這裏是 Join的優化,StreamExecJoin的生成
class StreamExecJoinRule { override def onMatch(call: RelOptRuleCall): Unit = { val newJoin = new StreamExecJoin( join.getCluster, providedTraitSet, newLeft, newRight, join.getCondition, join.getJoinType) call.transformTo(newJoin) } } newJoin = {StreamExecJoin@6326} "StreamExecJoin#152" cluster = {FlinkRelOptCluster@5072} joinType = {JoinRelType@5038} "LEFT" LOG = null transformation = null bitmap$trans$0 = false CommonPhysicalJoin.joinType = {JoinRelType@5038} "LEFT" filterNulls = null keyPairs = null flinkJoinType = null inputRowType = null bitmap$0 = 0 condition = {RexCall@5041} "=($0, $2)" variablesSet = {RegularImmutableSet@6342} size = 0 Join.joinType = {JoinRelType@5038} "LEFT" joinInfo = {JoinInfo@6343} left = {RelSubset@6328} "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN" bestCost = {FlinkCost$$anon$1@6344} "{inf}" set = {RelSet@6348} best = null timestamp = 0 boosted = false desc = "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN" rowType = {RelRecordType@6349} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName)" digest = "Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN" cluster = {FlinkRelOptCluster@5072} id = 150 traitSet = {RelTraitSet@6336} size = 5 right = {RelSubset@6329} "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN" bestCost = {FlinkCost$$anon$1@6344} "{inf}" set = {RelSet@6345} best = null timestamp = 0 boosted = false desc = "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN" rowType = null digest = "Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN" cluster = {FlinkRelOptCluster@5072} id = 151 traitSet = {RelTraitSet@6336} size = 5 desc = "StreamExecJoin#152" rowType = null digest = "StreamExecJoin#152" AbstractRelNode.cluster = {FlinkRelOptCluster@5072} id = 152 traitSet = {RelTraitSet@6327} size = 5 // 調用棧 <init>:58, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) onMatch:128, StreamExecJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream) onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano) findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano) run:327, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.flink.table.planner.delegation) translate:151, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:146, TableConversions (org.apache.flink.table.api.scala) main:75, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport)
這時候脈絡圖以下
// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * | * SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode * | * | * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * | * FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules * VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan * | * | * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,邏輯執行計劃 * | * | * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin * VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃 * | * | * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃 * | * | * StreamExecJoin.translateToPlanInternal // 做用是生成 StreamOperator, 即Flink算子 * | * | * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask * | * |
Calcite 針對不一樣的大數據組件,將優化後的plan映射到最終的大數據引擎,如折射成Flink圖。
這一塊只要是遞歸調用各個節點DataStreamRel的translateToPlan方法,這個方法利用CodeGen元編程成Flink的各類算子。如今就至關於咱們直接利用Flink的DataSet或DataStream API開發的程序。
class StreamPlanner( executor: Executor, config: TableConfig, functionCatalog: FunctionCatalog, catalogManager: CatalogManager) extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) { override protected def translateToPlan( execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = { execNodes.map { case node: StreamExecNode[_] => node.translateToPlan(this) case _ => throw new TableException("Cannot generate DataStream due to an invalid logical plan. " + "This is a bug and should not happen. Please file an issue.") } } } package org.apache.Flink.table.planner.plan.nodes.physical.stream class StreamExecUnion( cluster: RelOptCluster, traitSet: RelTraitSet, inputRels: util.List[RelNode], all: Boolean, outputRowType: RelDataType) extends Union(cluster, traitSet, inputRels, all) with StreamPhysicalRel with StreamExecNode[BaseRow] { // 這裏就生成了Flink算子 override protected def translateToPlanInternal( planner: StreamPlanner): Transformation[BaseRow] = { val transformations = getInputNodes.map { input => input.translateToPlan(planner).asInstanceOf[Transformation[BaseRow]] } new UnionTransformation(transformations) } } // 調用棧 translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec) translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec) translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream) apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation) apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) map:234, TraversableLike$class (scala.collection) map:104, AbstractTraversable (scala.collection) translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation) translate:153, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala) main:89, StreamSQLExample$ (spendreport) main:-1, StreamSQLExample (spendreport)
此時脈絡圖補充徹底。
// NOTE : 執行順序是從上至下," -----> " 表示生成的實例類型 * * +-----> "left outer JOIN" (SQL statement) * | * | * SqlParser.parseQuery // SQL 解析階段,生成AST(抽象語法樹),做用是SQL–>SqlNode * | * | * +-----> SqlJoin (SqlNode) * | * | * SqlToRelConverter.convertQuery // 語義分析,生成邏輯計劃,做用是SqlNode–>RelNode * | * | * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未優化的RelNode * | * | * FlinkLogicalJoinConverter (RelOptRule) // Flink定製的優化rules * VolcanoRuleCall.onMatch // 基於Flink定製的一些優化rules去優化 Logical Plan * | * | * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,邏輯執行計劃 * | * | * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin * VolcanoRuleCall.onMatch // 基於Flink rules將optimized LogicalPlan轉成Flink物理執行計劃 * | * | * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理執行計劃 * | * | * StreamExecJoin.translateToPlanInternal // 做用是生成 StreamOperator, 即Flink算子 * | * | * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask * | * | * StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask調用StreamingJoinOperator,真實的執行 * | * |
運行時候,則會在StreamTask中進行業務操做,這就是咱們熟悉的操做了。調用棧舉例以下
processElement:150, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io) emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io) processInput:69, StreamOneInputProcessor (org.apache.Flink.streaming.runtime.io) processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks) runDefaultAction:-1, 354713989 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$710) runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox) runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks) invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks) doRun:707, Task (org.apache.Flink.runtime.taskmanager) run:532, Task (org.apache.Flink.runtime.taskmanager) run:748, Thread (java.lang)
下面是如何具體生成各類執行計劃的代碼
import org.apache.Flink.api.java.utils.ParameterTool import org.apache.Flink.api.scala._ import org.apache.Flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.Flink.table.api.EnvironmentSettings import org.apache.Flink.table.api.scala._ object StreamSQLExample { // ************************************************************************* // PROGRAM // ************************************************************************* def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) val planner = if (params.has("planner")) params.get("planner") else "Flink" // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = if (planner == "blink") { // use blink planner in streaming mode val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() StreamTableEnvironment.create(env, settings) } else if (planner == "Flink") { // use Flink planner in streaming mode StreamTableEnvironment.create(env) } else { System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " + "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " + "example uses Flink planner or blink planner.") return } val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) val orderB: DataStream[Order] = env.fromCollection(Seq( Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1))) // convert DataStream to Table val tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount) // register DataStream as Table tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount) // union the two tables val result = tEnv.sqlQuery( s""" |SELECT * FROM $tableA WHERE amount > 2 |UNION ALL |SELECT * FROM OrderB WHERE amount < 2 """.stripMargin) result.toAppendStream[Order].print() print(tEnv.explain(result)) env.execute() } // ************************************************************************* // USER DATA TYPES // ************************************************************************* case class Order(user: Long, product: String, amount: Int) }
整個流程的轉換大致就像這樣:
== Abstract Syntax Tree == LogicalUnion(all=[true]) :- LogicalProject(user=[$0], product=[$1], amount=[$2]) : +- LogicalFilter(condition=[>($2, 2)]) : +- LogicalTableScan(table=[[default_catalog, default_database, UnnamedTable$0]]) +- LogicalProject(user=[$0], product=[$1], amount=[$2]) +- LogicalFilter(condition=[<($2, 2)]) +- LogicalTableScan(table=[[default_catalog, default_database, OrderB]]) == Optimized Logical Plan == Union(all=[true], union=[user, product, amount]) :- Calc(select=[user, product, amount], where=[>(amount, 2)]) : +- DataStreamScan(table=[[default_catalog, default_database, UnnamedTable$0]], fields=[user, product, amount]) +- Calc(select=[user, product, amount], where=[<(amount, 2)]) +- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Collection Source Stage 2 : Data Source content : Source: Collection Source Stage 10 : Operator content : SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[user, product, amount]) ship_strategy : FORWARD Stage 11 : Operator content : Calc(select=[user, product, amount], where=[(amount > 2)]) ship_strategy : FORWARD Stage 12 : Operator content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount]) ship_strategy : FORWARD Stage 13 : Operator content : Calc(select=[user, product, amount], where=[(amount < 2)]) ship_strategy : FORWARD
import java.sql.Timestamp import org.apache.Flink.api.java.utils.ParameterTool import org.apache.Flink.api.scala._ import org.apache.Flink.streaming.api.TimeCharacteristic import org.apache.Flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.Flink.table.api.{EnvironmentSettings, TableEnvironment} import org.apache.Flink.table.api.scala._ import org.apache.Flink.types.Row import scala.collection.mutable object SimpleOuterJoin { def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) val planner = if (params.has("planner")) params.get("planner") else "Flink" val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = if (planner == "blink") { // use blink planner in streaming mode val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() StreamTableEnvironment.create(env, settings) } else if (planner == "Flink") { // use Flink planner in streaming mode StreamTableEnvironment.create(env) } else { System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " + "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " + "example uses Flink planner or blink planner.") return } env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 構造訂單數據 val ordersData = new mutable.MutableList[(String, String)] ordersData.+=(("001", "iphone")) ordersData.+=(("002", "mac")) ordersData.+=(("003", "book")) ordersData.+=(("004", "cup")) // 構造付款表 val paymentData = new mutable.MutableList[(String, String)] paymentData.+=(("001", "alipay")) paymentData.+=(("002", "card")) paymentData.+=(("003", "card")) paymentData.+=(("004", "alipay")) val orders = env .fromCollection(ordersData) .toTable(tEnv, 'orderId, 'productName) val ratesHistory = env .fromCollection(paymentData) .toTable(tEnv, 'orderId, 'payType) tEnv.registerTable("Orders", orders) tEnv.registerTable("Payment", ratesHistory) var sqlQuery = """ |SELECT | o.orderId, | o.productName, | p.payType |FROM | Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId |""".stripMargin tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) val result = tEnv.scan("TemporalJoinResult").toRetractStream[Row] result.print() print(tEnv.explain(tEnv.sqlQuery(sqlQuery))) env.execute() } }
整個流程的轉換以下:
== Abstract Syntax Tree == LogicalProject(orderId=[$0], productName=[$1], payType=[$3]) +- LogicalJoin(condition=[=($0, $2)], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) +- LogicalTableScan(table=[[default_catalog, default_database, Payment]]) == Optimized Logical Plan == Calc(select=[orderId, productName, payType]) +- Join(joinType=[LeftOuterJoin], where=[=(orderId, orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[orderId]]) : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName]) +- Exchange(distribution=[hash[orderId]]) +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Collection Source Stage 2 : Data Source content : Source: Collection Source Stage 11 : Operator content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName]) ship_strategy : FORWARD Stage 13 : Operator content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType]) ship_strategy : FORWARD Stage 15 : Operator content : Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) ship_strategy : HASH Stage 16 : Operator content : Calc(select=[orderId, productName, payType]) ship_strategy : FORWARD 輸出結果是 (true,001,iphone,null) (false,001,iphone,null) (true,001,iphone,alipay) (true,002,mac,null) (false,002,mac,null) (true,002,mac,card) (true,003,book,null) (false,003,book,null) (true,003,book,card) (true,004,cup,null) (false,004,cup,null) (true,004,cup,alipay)
下面是調試時候的調用棧,這個能夠給你們參考
// 調用Rule進行優化 matches:49, StreamExecJoinRule (org.apache.Flink.table.planner.plan.rules.physical.stream) matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano) matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano) matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano) match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano) fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano) registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano) onRegister:329, AbstractRelNode (org.apache.calcite.rel) registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano) onRegister:329, AbstractRelNode (org.apache.calcite.rel) registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano) run:324, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation) translate:151, PlannerBase (org.apache.Flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toRetractStream:127, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal) toRetractStream:146, TableConversions (org.apache.Flink.table.api.scala) main:75, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport) // 調用Rule進行轉換到Flink邏輯算子 translateToPlanInternal:140, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec) translateToPlan:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec) translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec) translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation) apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) map:234, TraversableLike$class (scala.collection) map:104, AbstractTraversable (scala.collection) translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation) translate:153, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toRetractStream:146, TableConversions (org.apache.flink.table.api.scala) main:75, SimpleOuterJoin$ (spendreport) main:-1, SimpleOuterJoin (spendreport) // 運行時候 @Internal public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor { private void processRecord2( StreamRecord<IN2> record, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, Counter numRecordsIn) throws Exception { streamOperator.setKeyContextElement2(record); streamOperator.processElement2(record); postProcessRecord(numRecordsIn); } } // 能看出來,streamOperator就是StreamingJoinOperator streamOperator = {StreamingJoinOperator@10943} leftIsOuter = true rightIsOuter = false outRow = {JoinedRow@10948} "JoinedRow{row1=org.apache.flink.table.dataformat.BinaryRow@dc6a1b67, row2=(+|null,null)}" leftNullRow = {GenericRow@10949} "(+|null,null)" rightNullRow = {GenericRow@10950} "(+|null,null)" leftRecordStateView = {OuterJoinRecordStateViews$InputSideHasNoUniqueKey@10945} rightRecordStateView = {JoinRecordStateViews$InputSideHasNoUniqueKey@10946} generatedJoinCondition = {GeneratedJoinCondition@10951} leftType = {BaseRowTypeInfo@10952} "BaseRow(orderId: STRING, productName: STRING)" rightType = {BaseRowTypeInfo@10953} "BaseRow(orderId: STRING, payType: STRING)" leftInputSideSpec = {JoinInputSideSpec@10954} "NoUniqueKey" rightInputSideSpec = {JoinInputSideSpec@10955} "NoUniqueKey" nullFilterKeys = {int[1]@10956} nullSafe = false filterAllNulls = true minRetentionTime = 0 stateCleaningEnabled = false joinCondition = {AbstractStreamingJoinOperator$JoinConditionWithNullFilters@10947} collector = {TimestampedCollector@10957} chainingStrategy = {ChainingStrategy@10958} "HEAD" container = {TwoInputStreamTask@10959} "Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[orderId, productName, payType]) -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1)" config = {StreamConfig@10960} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 2\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])-7 -> Calc(select=[orderId, productName, payType])-8, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: SimpleOperatorFactory\nBuffer timeout: 100\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{8=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Calc(select=[orderId, productName, payType])-8 -> SinkConversionToTuple2-9, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: CodeGenOperatorFactory\nBuffer timeout: " output = {AbstractStreamOperator$CountingOutput@10961} runtimeContext = {StreamingRuntimeContext@10962} stateKeySelector1 = {BinaryRowKeySelector@10963} stateKeySelector2 = {BinaryRowKeySelector@10964} keyedStateBackend = {HeapKeyedStateBackend@10965} "HeapKeyedStateBackend" keyedStateStore = {DefaultKeyedStateStore@10966} operatorStateBackend = {DefaultOperatorStateBackend@10967} metrics = {OperatorMetricGroup@10968} latencyStats = {LatencyStats@10969} processingTimeService = {ProcessingTimeServiceImpl@10970} timeServiceManager = {InternalTimeServiceManager@10971} combinedWatermark = -9223372036854775808 input1Watermark = -9223372036854775808 input2Watermark = -9223372036854775808 // 處理table 1 processElement1:118, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream) processRecord1:135, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io) lambda$new$0:100, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io) accept:-1, 169462196 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$733) emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io) processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io) emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io) processInput:182, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io) processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks) runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713) runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox) runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks) invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks) doRun:707, Task (org.apache.Flink.runtime.taskmanager) run:532, Task (org.apache.Flink.runtime.taskmanager) run:748, Thread (java.lang) // 處理table 2 processElement2:123, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream) processRecord2:145, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io) lambda$new$1:107, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io) accept:-1, 76811487 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$734) emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io) processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io) emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io) processInput:185, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io) processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks) runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713) runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox) runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks) invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks) doRun:707, Task (org.apache.Flink.runtime.taskmanager) run:532, Task (org.apache.Flink.runtime.taskmanager) run:748, Thread (java.lang) // 處理table 1 processRecord1:134, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io) lambda$new$0:100, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io) accept:-1, 230607815 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$735) emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io) processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io) emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io) processInput:182, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io) processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks) runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718) runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox) runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks) invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang) // 處理table 2 processRecord2:144, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io) lambda$new$1:107, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io) accept:-1, 212261435 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$736) emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io) processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io) emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io) processInput:185, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io) processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks) runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718) runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox) runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks) invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks) doRun:707, Task (org.apache.flink.runtime.taskmanager) run:532, Task (org.apache.flink.runtime.taskmanager) run:748, Thread (java.lang)
import java.sql.Timestamp import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment} import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import scala.collection.mutable import java.sql.Timestamp import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row import scala.collection.mutable object SimpleTimeIntervalJoinA { def main(args: Array[String]): Unit = { val params = ParameterTool.fromArgs(args) val planner = if (params.has("planner")) params.get("planner") else "flink" val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = if (planner == "blink") { // use blink planner in streaming mode val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() StreamTableEnvironment.create(env, settings) } else if (planner == "flink") { // use flink planner in streaming mode StreamTableEnvironment.create(env) } else { System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " + "where planner (it is either flink or blink, and the default is flink) indicates whether the " + "example uses flink planner or blink planner.") return } env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 構造訂單數據 val ordersData = new mutable.MutableList[(String, String, Timestamp)] ordersData.+=(("001", "iphone", new Timestamp(1545800002000L))) ordersData.+=(("002", "mac", new Timestamp(1545800003000L))) ordersData.+=(("003", "book", new Timestamp(1545800004000L))) ordersData.+=(("004", "cup", new Timestamp(1545800018000L))) // 構造付款表 val paymentData = new mutable.MutableList[(String, String, Timestamp)] paymentData.+=(("001", "alipay", new Timestamp(1545803501000L))) paymentData.+=(("002", "card", new Timestamp(1545803602000L))) paymentData.+=(("003", "card", new Timestamp(1545803610000L))) paymentData.+=(("004", "alipay", new Timestamp(1545803611000L))) val orders = env .fromCollection(ordersData) .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime) val ratesHistory = env .fromCollection(paymentData) .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime) tEnv.registerTable("Orders", orders) tEnv.registerTable("Payment", ratesHistory) var sqlQuery = """ |SELECT | o.orderId, | o.productName, | p.payType, | o.orderTime, | cast(payTime as timestamp) as payTime |FROM | Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId AND | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR |""".stripMargin tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] result.print() print(tEnv.explain(tEnv.sqlQuery(sqlQuery))) env.execute() } } class TimestampExtractor[T1, T2] extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { element._3.getTime } }
輸出以下
== Abstract Syntax Tree == LogicalProject(orderId=[$0], productName=[$1], payType=[$4], orderTime=[$2], payTime=[CAST($5):TIMESTAMP(6)]) +- LogicalJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, +($2, 3600000:INTERVAL HOUR)))], joinType=[left]) :- LogicalTableScan(table=[[default_catalog, default_database, Orders]]) +- LogicalTableScan(table=[[default_catalog, default_database, Payment]]) == Optimized Logical Plan == Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime]) +- WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(orderId, orderId0), >=(payTime, orderTime), <=(payTime, +(orderTime, 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime]) :- Exchange(distribution=[hash[orderId]]) : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName, orderTime]) +- Exchange(distribution=[hash[orderId]]) +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType, payTime]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Collection Source Stage 2 : Operator content : Timestamps/Watermarks ship_strategy : FORWARD Stage 3 : Data Source content : Source: Collection Source Stage 4 : Operator content : Timestamps/Watermarks ship_strategy : FORWARD Stage 13 : Operator content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName, orderTime]) ship_strategy : FORWARD Stage 15 : Operator content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType, payTime]) ship_strategy : FORWARD Stage 17 : Operator content : WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[((orderId = orderId0) AND (payTime >= orderTime) AND (payTime <= (orderTime + 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime]) ship_strategy : HASH Stage 18 : Operator content : Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime]) ship_strategy : FORWARD 001,iphone,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41 002,mac,card,2018-12-26T04:53:23,2018-12-26T05:53:22 004,cup,alipay,2018-12-26T04:53:38,2018-12-26T05:53:31 003,book,null,2018-12-26T04:53:24,null
相關類以及調用棧
class StreamExecWindowJoin { } class StreamExecWindowJoinRule extends ConverterRule( classOf[FlinkLogicalJoin], FlinkConventions.LOGICAL, FlinkConventions.STREAM_PHYSICAL, "StreamExecWindowJoinRule") { } matches:54, StreamExecWindowJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream) matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano) match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano) fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano) registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano) onRegister:329, AbstractRelNode (org.apache.calcite.rel) registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano) onRegister:329, AbstractRelNode (org.apache.calcite.rel) registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano) register:846, VolcanoPlanner (org.apache.calcite.plan.volcano) ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano) changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano) run:324, Programs$RuleSetProgram (org.apache.calcite.tools) optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program) apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) foldLeft:157, TraversableOnce$class (scala.collection) foldLeft:104, AbstractTraversable (scala.collection) optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program) optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize) optimize:248, PlannerBase (org.apache.flink.table.planner.delegation) translate:151, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.flink.table.api.scala) main:93, SimpleTimeIntervalJoinA$ (spendreport) main:-1, SimpleTimeIntervalJoinA (spendreport) translateToPlanInternal:136, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec) translateToPlan:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec) translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec) translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream) apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation) apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) apply:234, TraversableLike$$anonfun$map$1 (scala.collection) foreach:891, Iterator$class (scala.collection) foreach:1334, AbstractIterator (scala.collection) foreach:72, IterableLike$class (scala.collection) foreach:54, AbstractIterable (scala.collection) map:234, TraversableLike$class (scala.collection) map:104, AbstractTraversable (scala.collection) translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation) translate:153, PlannerBase (org.apache.flink.table.planner.delegation) toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal) toAppendStream:101, TableConversions (org.apache.flink.table.api.scala) main:93, SimpleTimeIntervalJoinA$ (spendreport) main:-1, SimpleTimeIntervalJoinA (spendreport)
基於Flink1.8 深刻理解Flink Sql執行流程 + Flink Sql語法擴展
使用Flink Table &Sql api來構建批量和流式應用(3)Flink Sql 使用
【Flink SQL引擎】:Calcite 功能簡析及在 Flink 的應用