本文主要研究一下flink的Table Formatshtml
.withFormat( new Csv() .field("field1", Types.STRING) // required: ordered format fields .field("field2", Types.TIMESTAMP) .fieldDelimiter(",") // optional: string delimiter "," by default .lineDelimiter("\n") // optional: string delimiter "\n" by default .quoteCharacter('"') // optional: single character for string values, empty by default .commentPrefix('#') // optional: string to indicate comments, empty by default .ignoreFirstLine() // optional: ignore the first line, by default it is not skipped .ignoreParseErrors() // optional: skip records with parse error instead of failing by default )
.withFormat( new Json() .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default // required: define the schema either by using type information which parses numbers to corresponding types .schema(Type.ROW(...)) // or by using a JSON schema which parses to DECIMAL and TIMESTAMP .jsonSchema( "{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'," + " format: 'date-time'" + " }" + " }" + "}" ) // or use the table's schema .deriveSchema() )
.withFormat( new Avro() // required: define the schema either by using an Avro specific record class .recordClass(User.class) // or by using an Avro schema .avroSchema( "{" + " \"type\": \"record\"," + " \"name\": \"test\"," + " \"fields\" : [" + " {\"name\": \"a\", \"type\": \"long\"}," + " {\"name\": \"b\", \"type\": \"string\"}" + " ]" + "}" ) )
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scalajava
abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]]( private val tableEnv: TableEnvironment, private val connectorDescriptor: ConnectorDescriptor) extends TableDescriptor with SchematicDescriptor[D] with RegistrableDescriptor { this: D => private var formatDescriptor: Option[FormatDescriptor] = None private var schemaDescriptor: Option[Schema] = None //...... override def withFormat(format: FormatDescriptor): D = { formatDescriptor = Some(format) this } //...... }
flink-table-common-1.7.1-sources.jar!/org/apache/flink/table/descriptors/FormatDescriptor.javaapache
@PublicEvolving public abstract class FormatDescriptor extends DescriptorBase implements Descriptor { private String type; private int version; /** * Constructs a {@link FormatDescriptor}. * * @param type string that identifies this format * @param version property version for backwards compatibility */ public FormatDescriptor(String type, int version) { this.type = type; this.version = version; } @Override public final Map<String, String> toProperties() { final DescriptorProperties properties = new DescriptorProperties(); properties.putString(FormatDescriptorValidator.FORMAT_TYPE, type); properties.putInt(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION, version); properties.putProperties(toFormatProperties()); return properties.asMap(); } /** * Converts this descriptor into a set of format properties. Usually prefixed with * {@link FormatDescriptorValidator#FORMAT}. */ protected abstract Map<String, String> toFormatProperties(); }
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Csv.scalajson
class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 1) { private var fieldDelim: Option[String] = None private var lineDelim: Option[String] = None private val schema: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap[String, String]() private var quoteCharacter: Option[Character] = None private var commentPrefix: Option[String] = None private var isIgnoreFirstLine: Option[Boolean] = None private var lenient: Option[Boolean] = None def fieldDelimiter(delim: String): Csv = { this.fieldDelim = Some(delim) this } def lineDelimiter(delim: String): Csv = { this.lineDelim = Some(delim) this } def schema(schema: TableSchema): Csv = { this.schema.clear() schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) => field(n, t) } this } def field(fieldName: String, fieldType: TypeInformation[_]): Csv = { field(fieldName, TypeStringUtils.writeTypeInfo(fieldType)) this } def field(fieldName: String, fieldType: String): Csv = { if (schema.contains(fieldName)) { throw new ValidationException(s"Duplicate field name $fieldName.") } schema += (fieldName -> fieldType) this } def quoteCharacter(quote: Character): Csv = { this.quoteCharacter = Option(quote) this } def commentPrefix(prefix: String): Csv = { this.commentPrefix = Option(prefix) this } def ignoreFirstLine(): Csv = { this.isIgnoreFirstLine = Some(true) this } def ignoreParseErrors(): Csv = { this.lenient = Some(true) this } override protected def toFormatProperties: util.Map[String, String] = { val properties = new DescriptorProperties() fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _)) lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _)) val subKeys = util.Arrays.asList( DescriptorProperties.TABLE_SCHEMA_NAME, DescriptorProperties.TABLE_SCHEMA_TYPE) val subValues = schema.map(e => util.Arrays.asList(e._1, e._2)).toList.asJava properties.putIndexedFixedProperties( FORMAT_FIELDS, subKeys, subValues) quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _)) commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _)) isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _)) lenient.foreach(properties.putBoolean(FORMAT_IGNORE_PARSE_ERRORS, _)) properties.asMap() } }
flink-json-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Json.javaide
public class Json extends FormatDescriptor { private Boolean failOnMissingField; private Boolean deriveSchema; private String jsonSchema; private String schema; public Json() { super(FORMAT_TYPE_VALUE, 1); } public Json failOnMissingField(boolean failOnMissingField) { this.failOnMissingField = failOnMissingField; return this; } public Json jsonSchema(String jsonSchema) { Preconditions.checkNotNull(jsonSchema); this.jsonSchema = jsonSchema; this.schema = null; this.deriveSchema = null; return this; } public Json schema(TypeInformation<Row> schemaType) { Preconditions.checkNotNull(schemaType); this.schema = TypeStringUtils.writeTypeInfo(schemaType); this.jsonSchema = null; this.deriveSchema = null; return this; } public Json deriveSchema() { this.deriveSchema = true; this.schema = null; this.jsonSchema = null; return this; } @Override protected Map<String, String> toFormatProperties() { final DescriptorProperties properties = new DescriptorProperties(); if (deriveSchema != null) { properties.putBoolean(FORMAT_DERIVE_SCHEMA, deriveSchema); } if (jsonSchema != null) { properties.putString(FORMAT_JSON_SCHEMA, jsonSchema); } if (schema != null) { properties.putString(FORMAT_SCHEMA, schema); } if (failOnMissingField != null) { properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField); } return properties.asMap(); } }
flink-avro-1.7.1-sources.jar!/org/apache/flink/table/descriptors/Avro.javaui
public class Avro extends FormatDescriptor { private Class<? extends SpecificRecord> recordClass; private String avroSchema; public Avro() { super(AvroValidator.FORMAT_TYPE_VALUE, 1); } public Avro recordClass(Class<? extends SpecificRecord> recordClass) { Preconditions.checkNotNull(recordClass); this.recordClass = recordClass; return this; } public Avro avroSchema(String avroSchema) { Preconditions.checkNotNull(avroSchema); this.avroSchema = avroSchema; return this; } @Override protected Map<String, String> toFormatProperties() { final DescriptorProperties properties = new DescriptorProperties(); if (null != recordClass) { properties.putClass(AvroValidator.FORMAT_RECORD_CLASS, recordClass); } if (null != avroSchema) { properties.putString(AvroValidator.FORMAT_AVRO_SCHEMA, avroSchema); } return properties.asMap(); } }