本文主要研究一下flink的TableFunctionhtml
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer). public class Split extends TableFunction<Tuple2<String, Integer>> { private String separator = " "; public Split(String separator) { this.separator = separator; } public void eval(String str) { for (String s : str.split(separator)) { // use collect(...) to emit a row collect(new Tuple2<String, Integer>(s, s.length())); } } } BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Table myTable = ... // table schema: [a: String] // Register the function. tableEnv.registerFunction("split", new Split("#")); // Use the table function in the Java Table API. "as" specifies the field names of the table. myTable.join(new Table(tableEnv, "split(a) as (word, length)")) .select("a, word, length"); myTable.leftOuterJoin(new Table(tableEnv, "split(a) as (word, length)")) .select("a, word, length"); // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scalajava
abstract class UserDefinedFunction extends Serializable { /** * Setup method for user-defined function. It can be used for initialization work. * * By default, this method does nothing. */ @throws(classOf[Exception]) def open(context: FunctionContext): Unit = {} /** * Tear-down method for user-defined function. It can be used for clean up work. * * By default, this method does nothing. */ @throws(classOf[Exception]) def close(): Unit = {} /** * @return true if and only if a call to this function is guaranteed to always return * the same result given the same parameters; true is assumed by default * if user's function is not pure functional, like random(), date(), now()... * isDeterministic must return false */ def isDeterministic: Boolean = true final def functionIdentifier: String = { val md5 = EncodingUtils.hex(EncodingUtils.md5(EncodingUtils.encodeObjectToString(this))) getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5) } /** * Returns the name of the UDF that is used for plan explain and logging. */ override def toString: String = getClass.getSimpleName }
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/TableFunction.scalasql
abstract class TableFunction[T] extends UserDefinedFunction { // ---------------------------------------------------------------------------------------------- /** * Emit an output row. * * @param row the output row */ protected def collect(row: T): Unit = { collector.collect(row) } // ---------------------------------------------------------------------------------------------- /** * The code generated collector used to emit row. */ private var collector: Collector[T] = _ /** * Internal use. Sets the current collector. */ private[flink] final def setCollector(collector: Collector[T]): Unit = { this.collector = collector } // ---------------------------------------------------------------------------------------------- /** * Returns the result type of the evaluation method with a given signature. * * This method needs to be overridden in case Flink's type extraction facilities are not * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * * @return [[TypeInformation]] of result type or null if Flink should determine the type */ def getResultType: TypeInformation[T] = null /** * Returns [[TypeInformation]] about the operands of the evaluation method with a given * signature. * * In order to perform operand type inference in SQL (especially when NULL is used) it might be * necessary to determine the parameter [[TypeInformation]] of an evaluation method. * By default Flink's type extraction facilities are used for this but might be wrong for * more complex, custom, or composite types. * * @param signature signature of the method the operand types need to be determined * @return [[TypeInformation]] of operand types */ def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { signature.map { c => try { TypeExtractor.getForClass(c) } catch { case ite: InvalidTypesException => throw new ValidationException( s"Parameter types of table function '${this.getClass.getCanonicalName}' cannot be " + s"automatically determined. Please provide type information manually.") } } } }
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/operators/ProcessOperator.javaapache
@Internal public class ProcessOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; private transient TimestampedCollector<OUT> collector; private transient ContextImpl context; /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ private long currentWatermark = Long.MIN_VALUE; public ProcessOperator(ProcessFunction<IN, OUT> function) { super(function); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction, getProcessingTimeService()); } @Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); context.element = element; userFunction.processElement(element.getValue(), context, collector); context.element = null; } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); this.currentWatermark = mark.getTimestamp(); } //...... }
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scalaapi
class CRowCorrelateProcessRunner( processName: String, processCode: String, collectorName: String, collectorCode: String, @transient var returnType: TypeInformation[CRow]) extends ProcessFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[Any] with Logging { private var function: ProcessFunction[Row, Row] = _ private var collector: TableFunctionCollector[_] = _ private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling TableFunctionCollector: $collectorName \n\n Code:\n$collectorCode") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, collectorName, collectorCode) LOG.debug("Instantiating TableFunctionCollector.") collector = clazz.newInstance().asInstanceOf[TableFunctionCollector[_]] this.cRowWrapper = new CRowWrappingCollector() LOG.debug(s"Compiling ProcessFunction: $processName \n\n Code:\n$processCode") val processClazz = compile(getRuntimeContext.getUserCodeClassLoader, processName, processCode) val constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]]) LOG.debug("Instantiating ProcessFunction.") function = constructor.newInstance(collector).asInstanceOf[ProcessFunction[Row, Row]] FunctionUtils.setFunctionRuntimeContext(collector, getRuntimeContext) FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) FunctionUtils.openFunction(collector, parameters) FunctionUtils.openFunction(function, parameters) } override def processElement( in: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]) : Unit = { cRowWrapper.out = out cRowWrapper.setChange(in.change) collector.setCollector(cRowWrapper) collector.setInput(in.row) collector.reset() function.processElement( in.row, ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], cRowWrapper) } override def getProducedType: TypeInformation[CRow] = returnType override def close(): Unit = { FunctionUtils.closeFunction(collector) FunctionUtils.closeFunction(function) } }