flink sql

StreamTableEnvironment

該類包含sql解析、驗證、優化、執行等各環節須要的元數據管理器CatalogManager,模塊管理器(模塊包含函數集、類型集、規則集)moduleManager,用戶自定義函數管理器FunctionCatalog,線程池、sql解析器Plannernode

StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig)

  def create(
      executionEnvironment: StreamExecutionEnvironment,
      settings: EnvironmentSettings,
      tableConfig: TableConfig)
    : StreamTableEnvironmentImpl = {

    val catalogManager = new CatalogManager(
      settings.getBuiltInCatalogName,
      new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))

    val moduleManager = new ModuleManager
    val functionCatalog = new FunctionCatalog(catalogManager, moduleManager)

    val executorProperties = settings.toExecutorProperties
    val executor = lookupExecutor(executorProperties, executionEnvironment)

    val plannerProperties = settings.toPlannerProperties
    val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
      .create(
        plannerProperties,
        executor,
        tableConfig,
        functionCatalog,
        catalogManager)

    new StreamTableEnvironmentImpl(
      catalogManager,
      moduleManager,
      functionCatalog,
      tableConfig,
      executionEnvironment,
      planner,
      executor,
      settings.isStreamingMode
    )
  }

DataType

定義了邏輯類型,而且對其底層實際物理類型進行暗示。sql

LogicalType

邏輯類型有點相似標準SQL的數據類型,其子類作了具體的約束。express

TableSchema

表結構定義,包含各字段名稱和各字段類型c#

DataStream -> Table

override def fromDataStream[T](dataStream: DataStream[T], fields: Expression*): Table = {
    val queryOperation = asQueryOperation(dataStream, Some(fields.toList.asJava))
    createTable(queryOperation)
  }

ScalaDataStreamQueryOperation

private final DataStream<E> dataStream;
	private final int[] fieldIndices;
	private final TableSchema tableSchema;

Table

Table類是sql api的核心組件,定義了轉換數據的方法如filtergroupByjoin等。使用TableEnvironment類能夠把Table轉換成DataStream或者DataSetapi

private TableImpl(
			TableEnvironment tableEnvironment,
			QueryOperation operationTree,
			OperationTreeBuilder operationTreeBuilder,
			LookupCallResolver lookupResolver) {
		this.tableEnvironment = tableEnvironment;
		this.operationTree = operationTree;
		this.operationTreeBuilder = operationTreeBuilder;
		this.lookupResolver = lookupResolver;
	}

註冊表信息

private void createTemporaryView(UnresolvedIdentifier identifier, Table view) {
		if (((TableImpl) view).getTableEnvironment() != this) {
			throw new TableException(
				"Only table API objects that belong to this TableEnvironment can be registered.");
		}

		CatalogBaseTable tableTable = new QueryOperationCatalogView(view.getQueryOperation());

		ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(identifier);
		catalogManager.createTemporaryTable(tableTable, tableIdentifier, false);
	}

Expression

Expression表明字面量、函數調用或者field引用。ide

ExpressionVisitor

轉換Expressionvisitor函數

IndexedExprToFieldInfo

ExpressionVisitor的子類把Expression解析成FieldInfo優化

@Override
		public FieldInfo visit(UnresolvedReferenceExpression unresolvedReference) {
			String fieldName = unresolvedReference.getName();
			return new FieldInfo(fieldName, index, fromLegacyInfoToDataType(getTypeAt(unresolvedReference)));
		}

應用舉例,把Expression轉換成FieldInfo:ui

private static List<FieldInfo> extractFieldInfosFromTupleType(TupleTypeInfoBase<?> inputType, Expression[] exprs) {
		boolean isRefByPos = isReferenceByPosition(inputType, exprs);

		if (isRefByPos) {
			return IntStream.range(0, exprs.length)
				.mapToObj(idx -> exprs[idx].accept(new IndexedExprToFieldInfo(inputType, idx)))
				.collect(Collectors.toList());
		} else {
			return extractFieldInfosByNameReference(inputType, exprs);
		}
	}

FieldInfo

private final String fieldName;
		private final int index;
		private final DataType type;

Row & RowTypeInfo

表明一行數據,能夠包含任意數量的列,而且各列可能包含不一樣的數據類型.Row不是強類型的因此須要配合RowTypeInfo類獲取各列具體的類型.this

Row:

/** The array to store actual values. */
	private final Object[] fields;

RowTypeInfo:

protected final String[] fieldNames;
	protected final TypeInformation<?>[] types;

Table -> DataStream

override def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
    val returnType = createTypeInformation[T]

    val modifyOperation = new OutputConversionModifyOperation(
      table.getQueryOperation,
      TypeConversions.fromLegacyInfoToDataType(returnType),
      OutputConversionModifyOperation.UpdateMode.APPEND)
    toDataStream[T](table, modifyOperation)
  }

Operation

Parser.parse(sql)的返回結果。

  • ModifyOperation (DML)
  • QueryOperation (DQL)
  • CreateOperation & DropOperation (DDL)

FlinkStreamRuleSets

定義了sql解析優化規則集合,包含把calcite節點轉換成flink節點的規則,好比FlinkLogicalTableSourceScan,把flink邏輯節點轉換成物理執行節點的規則,好比StreamExecTableSourceScanRule,條件過濾下推的規則PushFilterIntoTableSourceScanRule等.

ConverterRule

/** Converts a relational expression to the target trait(s) of this rule.
   	*
   	* <p>Returns null if conversion is not possible. */
  	public abstract RelNode convert(RelNode rel);
  
    public void onMatch(RelOptRuleCall call) {
    RelNode rel = call.rel(0);
    if (rel.getTraitSet().contains(inTrait)) {
      final RelNode converted = convert(rel);
      if (converted != null) {
        call.transformTo(converted);
      }
    }
  }
  
  class FlinkLogicalTableSourceScanConverter
  extends ConverterRule(
    classOf[LogicalTableScan],
    Convention.NONE,
    FlinkConventions.LOGICAL,
    "FlinkLogicalTableSourceScanConverter") {

  override def matches(call: RelOptRuleCall): Boolean = {
    val scan: TableScan = call.rel(0)
    isTableSourceScan(scan)
  }

  def convert(rel: RelNode): RelNode = {
    val scan = rel.asInstanceOf[TableScan]
    val table = scan.getTable.asInstanceOf[FlinkRelOptTable]
    FlinkLogicalTableSourceScan.create(rel.getCluster, table)
  }
}

FlinkLogicalRel

flink RelNode基類不只包含了RelNode自己可表達的關係依賴邏輯,並且包含了各關係依賴的Flink體系中的額外信息。好比FlinkLogicalTableSourceScan包含了TableSource信息。

FlinkPhysicalRel

物理關係節點基類,其子類同時也會實現ExecNode接口,用於把物理節點轉換成Transformation

ExecNode

/**
    * Internal method, translates this node into a Flink operator.
    *
    * @param planner The [[Planner]] of the translated Table.
    */
  protected def translateToPlanInternal(planner: E): Transformation[T]
  
  
  def translateToPlan(planner: E): Transformation[T] = {
    if (transformation == null) {
      transformation = translateToPlanInternal(planner)
    }
    transformation
  }

調用時序圖

代碼生成gencode

ExecNode轉換成Transformation的過程當中部分邏輯會採用動態生成代碼的方式實現。動態生成的代碼保存在GeneratedClass子類的實例中,會分發到各個TM節點而後由Janino庫編譯執行。好比聚合查詢生成聚合處理函數NamespaceTableAggsHandleFunction的子類。

GeneratedClass

public T newInstance(ClassLoader classLoader, Object... args) {
		try {
			return (T) compile(classLoader).getConstructors()[0].newInstance(args);
		} catch (Exception e) {
			throw new RuntimeException(
					"Could not instantiate generated class '" + className + "'", e);
		}
	}

	/**
	 * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}.
	 */
	public Class<T> compile(ClassLoader classLoader) {
		if (compiledClass == null) {
			// cache the compiled class
			compiledClass = CompileUtils.compile(classLoader, className, code);
		}
		return compiledClass;
	}

示例

val sql =
      """
        |SELECT
        |  `string`,
        |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
        |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),
        |  COUNT(1),
        |  SUM(1),
        |  COUNT(`int`),
        |  COUNT(DISTINCT `float`),
        |  concat_distinct_agg(name)
        |FROM T1
        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)
      """.stripMargin
LogicalProject#3
	LogicalAggregate#2
		LogicalProject#1
			LogicalTableScan#0
rel#271:StreamExecSink.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecCalc#269,name=DataStreamTableSink,fields=string, EXPR$1, EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
	rel#269:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecGroupWindowAggregate#267,select=string, w$start AS EXPR$1, w$rowtime AS EXPR$2, EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7)
		rel#267:StreamExecGroupWindowAggregate.STREAM_PHYSICAL.any.None: 0.false.Acc(input=StreamExecExchange#265,groupBy=string,window=SlidingGroupWindow('w$, rowtime, 5, 4),properties=w$start, w$end, w$rowtime, w$proctime,select=string, COUNT(*) AS EXPR$3, $SUM0($f2) AS EXPR$4, COUNT(int) AS EXPR$5, COUNT(DISTINCT float) AS EXPR$6, concat_distinct_agg(name) AS EXPR$7, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
			rel#265:StreamExecExchange.STREAM_PHYSICAL.hash[0]true.None: -1.true.Acc(input=StreamExecCalc#263,distribution=hash[string])
				rel#263:StreamExecCalc.STREAM_PHYSICAL.any.None: -1.true.Acc(input=StreamExecDataStreamScan#257,select=string, rowtime, 1 AS $f2, int, float, name)
					rel#257:StreamExecDataStreamScan.STREAM_PHYSICAL.any.None: -1.true.Acc(table=[Unregistered_DataStream_2],fields=rowtime, int, double, float, bigdec, string, name)

代碼生成:

StreamExecGroupWindowAggregateBase->translateToPlanInternal
	StreamExecGroupWindowAggregateBase ->createAggsHandler
		AggsHandlerCodeGenerator->generateNamespaceAggsHandler
			new OneInputTransformation
			
任務提交中會把 OneInputTransformation -> OneInputStreamTask			
				
Task->run
	Task->doRun
		StreamTask->invoke
			StreamTask->openAllOperators
				AggregateWindowOperator->open
					WindowOperator->compileGeneratedCode
						GeneratedNamespaceAggsHandleFunction->newInstance
							SimpleCompiler->cook
相關文章
相關標籤/搜索