本文主要研究一下flink的CsvTableSourcehtml
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/TableSource.scalaapache
trait TableSource[T] { /** Returns the [[TypeInformation]] for the return type of the [[TableSource]]. * The fields of the return type are mapped to the table schema based on their name. * * @return The type of the returned [[DataSet]] or [[DataStream]]. */ def getReturnType: TypeInformation[T] /** * Returns the schema of the produced table. * * @return The [[TableSchema]] of the produced table. */ def getTableSchema: TableSchema /** * Describes the table source. * * @return A String explaining the [[TableSource]]. */ def explainSource(): String = TableConnectorUtil.generateRuntimeName(getClass, getTableSchema.getFieldNames) }
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/BatchTableSource.scalaapp
trait BatchTableSource[T] extends TableSource[T] { /** * Returns the data of the table as a [[DataSet]]. * * NOTE: This method is for internal use only for defining a [[TableSource]]. * Do not use it in Table API programs. */ def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] }
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/StreamTableSource.scalaide
trait StreamTableSource[T] extends TableSource[T] { /** * Returns the data of the table as a [[DataStream]]. * * NOTE: This method is for internal use only for defining a [[TableSource]]. * Do not use it in Table API programs. */ def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] }
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/sources/CsvTableSource.scalathis
class CsvTableSource private ( private val path: String, private val fieldNames: Array[String], private val fieldTypes: Array[TypeInformation[_]], private val selectedFields: Array[Int], private val fieldDelim: String, private val rowDelim: String, private val quoteCharacter: Character, private val ignoreFirstLine: Boolean, private val ignoreComments: String, private val lenient: Boolean) extends BatchTableSource[Row] with StreamTableSource[Row] with ProjectableTableSource[Row] { def this( path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]], fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER, rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER, quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false) = { this( path, fieldNames, fieldTypes, fieldTypes.indices.toArray, // initially, all fields are returned fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) } def this(path: String, fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]) = { this(path, fieldNames, fieldTypes, CsvInputFormat.DEFAULT_FIELD_DELIMITER, CsvInputFormat.DEFAULT_LINE_DELIMITER, null, false, null, false) } if (fieldNames.length != fieldTypes.length) { throw new TableException("Number of field names and field types must be equal.") } private val selectedFieldTypes = selectedFields.map(fieldTypes(_)) private val selectedFieldNames = selectedFields.map(fieldNames(_)) private val returnType: RowTypeInfo = new RowTypeInfo(selectedFieldTypes, selectedFieldNames) override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { execEnv.createInput(createCsvInput(), returnType).name(explainSource()) } /** Returns the [[RowTypeInfo]] for the return type of the [[CsvTableSource]]. */ override def getReturnType: RowTypeInfo = returnType override def getDataStream(streamExecEnv: StreamExecutionEnvironment): DataStream[Row] = { streamExecEnv.createInput(createCsvInput(), returnType).name(explainSource()) } /** Returns the schema of the produced table. */ override def getTableSchema = new TableSchema(fieldNames, fieldTypes) /** Returns a copy of [[TableSource]] with ability to project fields */ override def projectFields(fields: Array[Int]): CsvTableSource = { val selectedFields = if (fields.isEmpty) Array(0) else fields new CsvTableSource( path, fieldNames, fieldTypes, selectedFields, fieldDelim, rowDelim, quoteCharacter, ignoreFirstLine, ignoreComments, lenient) } private def createCsvInput(): RowCsvInputFormat = { val inputFormat = new RowCsvInputFormat( new Path(path), selectedFieldTypes, rowDelim, fieldDelim, selectedFields) inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine) inputFormat.setLenient(lenient) if (quoteCharacter != null) { inputFormat.enableQuotedStringParsing(quoteCharacter) } if (ignoreComments != null) { inputFormat.setCommentPrefix(ignoreComments) } inputFormat } override def equals(other: Any): Boolean = other match { case that: CsvTableSource => returnType == that.returnType && path == that.path && fieldDelim == that.fieldDelim && rowDelim == that.rowDelim && quoteCharacter == that.quoteCharacter && ignoreFirstLine == that.ignoreFirstLine && ignoreComments == that.ignoreComments && lenient == that.lenient case _ => false } override def hashCode(): Int = { returnType.hashCode() } override def explainSource(): String = { s"CsvTableSource(" + s"read fields: ${getReturnType.getFieldNames.mkString(", ")})" } }