本文主要研究一下flink TableEnvironment的scan操做html
//Scanning a directly registered table val tab: Table = tableEnv.scan("tableName") //Scanning a table from a registered catalog val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scalaapache
abstract class TableEnvironment(val config: TableConfig) { private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false) private val rootSchema: SchemaPlus = internalSchema.plus() //...... @throws[TableException] @varargs def scan(tablePath: String*): Table = { scanInternal(tablePath.toArray) match { case Some(table) => table case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.") } } private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = { require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.") val schemaPaths = tablePath.slice(0, tablePath.length - 1) val schema = getSchema(schemaPaths) if (schema != null) { val tableName = tablePath(tablePath.length - 1) val table = schema.getTable(tableName) if (table != null) { return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory)))) } } None } private def getSchema(schemaPath: Array[String]): SchemaPlus = { var schema = rootSchema for (schemaName <- schemaPath) { schema = schema.getSubSchema(schemaName) if (schema == null) { return schema } } schema } //...... }
數組最後一個元素
),調用SchemaPlus的getTable方法查找Table數組最後一個元素
),調用SchemaPlus的getTable方法查找Table