該類包含sql解析、驗證、優化、執行等各環節須要的元數據管理器CatalogManager
,模塊管理器(模塊包含函數集、類型集、規則集)moduleManager
,用戶自定義函數管理器FunctionCatalog
,線程池、sql解析器Planner
。node
StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig) def create( executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings, tableConfig: TableConfig) : StreamTableEnvironmentImpl = { val catalogManager = new CatalogManager( settings.getBuiltInCatalogName, new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName)) val moduleManager = new ModuleManager val functionCatalog = new FunctionCatalog(catalogManager, moduleManager) val executorProperties = settings.toExecutorProperties val executor = lookupExecutor(executorProperties, executionEnvironment) val plannerProperties = settings.toPlannerProperties val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties) .create( plannerProperties, executor, tableConfig, functionCatalog, catalogManager) new StreamTableEnvironmentImpl( catalogManager, moduleManager, functionCatalog, tableConfig, executionEnvironment, planner, executor, settings.isStreamingMode ) }
定義了邏輯類型,而且對其底層實際物理類型進行暗示。sql
邏輯類型有點相似標準SQL的數據類型,其子類作了具體的約束。express
表結構定義,包含各字段名稱和各字段類型c#
override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = { val queryOperation = asQueryOperation(dataStream, Some(fields.toList.asJava)) createTable(queryOperation) }
private final DataStream<E> dataStream; private final int[] fieldIndices; private final TableSchema tableSchema;
Table
類是sql api的核心組件,定義了轉換數據的方法如filter
、groupBy
、join
等。使用TableEnvironment
類能夠把Table
轉換成DataStream
或者DataSet
。api
private TableImpl( TableEnvironment tableEnvironment, QueryOperation operationTree, OperationTreeBuilder operationTreeBuilder, LookupCallResolver lookupResolver) { this.tableEnvironment = tableEnvironment; this.operationTree = operationTree; this.operationTreeBuilder = operationTreeBuilder; this.lookupResolver = lookupResolver; }
private void createTemporaryView(UnresolvedIdentifier identifier, Table view) { if (((TableImpl) view).getTableEnvironment() != this) { throw new TableException( "Only table API objects that belong to this TableEnvironment can be registered."); } CatalogBaseTable tableTable = new QueryOperationCatalogView(view.getQueryOperation()); ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier); catalogManager.createTemporaryTable(tableTable, tableIdentifier, false); }
Expression
表明字面量、函數調用或者field引用。ide
轉換Expression
的visitor
函數
ExpressionVisitor的子類把Expression
解析成FieldInfo
優化
@Override public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) { String fieldName = unresolvedReference.getName(); return new FieldInfo(fieldName, index, fromLegacyInfoToDataType(getTypeAt(unresolvedReference))); }
應用舉例,把Expression轉換成FieldInfo:ui
private static List<FieldInfo> extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[] exprs) { boolean isRefByPos = isReferenceByPosition(inputType, exprs); if (isRefByPos) { return IntStream.range(0, exprs.length) .mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(inputType, idx))) .collect(Collectors.toList()); } else { return extractFieldInfosByNameReference(inputType, exprs); } }
private final String fieldName; private final int index; private final DataType type;
表明一行數據,能夠包含任意數量的列,而且各列可能包含不一樣的數據類型.Row
不是強類型的因此須要配合RowTypeInfo
類獲取各列具體的類型.this
Row:
/** The array to store actual values. */ private final Object[] fields;
RowTypeInfo:
protected final String[] fieldNames; protected final TypeInformation<?>[] types;
override def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = { val returnType = createTypeInformation[T] val modifyOperation = new OutputConversionModifyOperation( table.getQueryOperation, TypeConversions.fromLegacyInfoToDataType(returnType), OutputConversionModifyOperation.UpdateMode.APPEND) toDataStream[T](table, modifyOperation) }
Parser.parse(sql)
的返回結果。
定義了sql解析優化規則集合,包含把calcite節點轉換成flink節點的規則,好比FlinkLogicalTableSourceScan
,把flink邏輯節點轉換成物理執行節點的規則,好比StreamExecTableSourceScanRule
,條件過濾下推的規則PushFilterIntoTableSourceScanRule
等.
/** Converts a relational expression to the target trait(s) of this rule. * * <p>Returns null if conversion is not possible. */ public abstract RelNode convert(RelNode rel); public void onMatch(RelOptRuleCall call) { RelNode rel = call.rel(0); if (rel.getTraitSet().contains(inTrait)) { final RelNode converted = convert(rel); if (converted != null) { call.transformTo(converted); } } } class FlinkLogicalTableSourceScanConverter extends ConverterRule( classOf[LogicalTableScan], Convention.NONE, FlinkConventions.LOGICAL, "FlinkLogicalTableSourceScanConverter") { override def matches(call: RelOptRuleCall): Boolean = { val scan: TableScan = call.rel(0) isTableSourceScan(scan) } def convert(rel: RelNode): RelNode = { val scan = rel.asInstanceOf[TableScan] val table = scan.getTable.asInstanceOf[FlinkRelOptTable] FlinkLogicalTableSourceScan.create(rel.getCluster, table) } }
flink RelNode基類不只包含了RelNode
自己可表達的關係依賴邏輯,並且包含了各關係依賴的Flink體系中的額外信息。好比FlinkLogicalTableSourceScan
包含了TableSource
信息。
物理關係節點基類,其子類同時也會實現ExecNode
接口,用於把物理節點轉換成Transformation
/** * Internal method, translates this node into a Flink operator. * * @param planner The [[Planner]] of the translated Table. */ protected def translateToPlanInternal(planner: E): Transformation[T] def translateToPlan(planner: E): Transformation[T] = { if (transformation == null) { transformation = translateToPlanInternal(planner) } transformation }
ExecNode
轉換成Transformation
的過程當中部分邏輯會採用動態生成代碼的方式實現。動態生成的代碼保存在GeneratedClass
子類的實例中,會分發到各個TM節點而後由Janino
庫編譯執行。好比聚合查詢生成聚合處理函數NamespaceTableAggsHandleFunction
的子類。
public T newInstance(ClassLoader classLoader, Object... args) { try { return (T) compile(classLoader).getConstructors()[0].newInstance(args); } catch (Exception e) { throw new RuntimeException( "Could not instantiate generated class '" + className + "'", e); } } /** * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}. */ public Class<T> compile(ClassLoader classLoader) { if (compiledClass == null) { // cache the compiled class compiledClass = CompileUtils.compile(classLoader, className, code); } return compiledClass; }
val sql = """ |SELECT | `string`, | HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND), | COUNT(1), | SUM(1), | COUNT(`int`), | COUNT(DISTINCT `float`), | concat_distinct_agg(name) |FROM T1 |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND) """.stripMargin
LogicalProject#3 LogicalAggregate#2 LogicalProject#1 LogicalTableScan#0
rel#271:StreamExecSink.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecCalc#269,name=DataStreamTableSink,fields=string, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7) rel#269:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecGroupWindowAggregate#267,select=string, w$start AS EXPR$1, w$rowtime AS EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7) rel#267:StreamExecGroupWindowAggregate.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecExchange#265,groupBy=string,window=SlidingGroupWindow('w$, rowtime, 5, 4),properties=w$start, w$end, w$rowtime, w$proctime,select=string, COUNT(*) AS EXPR$3, $SUM0($f2) AS EXPR$4, COUNT(int) AS EXPR$5, COUNT(DISTINCT float) AS EXPR$6, concat_distinct_agg(name) AS EXPR$7, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) rel#265:StreamExecExchange.STREAM_PHYSICAL.hash[0]true.None: -1.true.Acc(input=StreamExecCalc#263,distribution=hash[string]) rel#263:StreamExecCalc.STREAM_PHYSICAL.any.None: -1.true.Acc(input=StreamExecDataStreamScan#257,select=string, rowtime, 1 AS $f2, int, float, name) rel#257:StreamExecDataStreamScan.STREAM_PHYSICAL.any.None: -1.true.Acc(table=[Unregistered_DataStream_2],fields=rowtime, int, double, float, bigdec, string, name)
代碼生成:
StreamExecGroupWindowAggregateBase->translateToPlanInternal StreamExecGroupWindowAggregateBase ->createAggsHandler AggsHandlerCodeGenerator->generateNamespaceAggsHandler new OneInputTransformation 任務提交中會把 OneInputTransformation -> OneInputStreamTask Task->run Task->doRun StreamTask->invoke StreamTask->openAllOperators AggregateWindowOperator->open WindowOperator->compileGeneratedCode GeneratedNamespaceAggsHandleFunction->newInstance SimpleCompiler->cook