本文主要研究一下flink Table的ScalarFunctionhtml
public class HashCode extends ScalarFunction { private int factor = 0; @Override public void open(FunctionContext context) throws Exception { // access "hashcode_factor" parameter // "12" would be the default value if parameter does not exist factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12")); } public int eval(String s) { return s.hashCode() * factor; } } ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // set job parameter Configuration conf = new Configuration(); conf.setString("hashcode_factor", "31"); env.getConfig().setGlobalJobParameters(conf); // register the function tableEnv.registerFunction("hashCode", new HashCode()); // use the function in Java Table API myTable.select("string, string.hashCode(), hashCode(string)"); // use the function in SQL tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
abstract class ScalarFunction extends UserDefinedFunction { /** * Creates a call to a [[ScalarFunction]] in Scala Table API. * * @param params actual parameters of function * @return [[Expression]] in form of a [[ScalarFunctionCall]] */ final def apply(params: Expression*): Expression = { ScalarFunctionCall(this, params) } // ---------------------------------------------------------------------------------------------- /** * Returns the result type of the evaluation method with a given signature. * * This method needs to be overridden in case Flink's type extraction facilities are not * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * * @param signature signature of the method the return type needs to be determined * @return [[TypeInformation]] of result type or null if Flink should determine the type */ def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null /** * Returns [[TypeInformation]] about the operands of the evaluation method with a given * signature. * * In order to perform operand type inference in SQL (especially when NULL is used) it might be * necessary to determine the parameter [[TypeInformation]] of an evaluation method. * By default Flink's type extraction facilities are used for this but might be wrong for * more complex, custom, or composite types. * * @param signature signature of the method the operand types need to be determined * @return [[TypeInformation]] of operand types */ def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = { signature.map { c => try { TypeExtractor.getForClass(c) } catch { case ite: InvalidTypesException => throw new ValidationException( s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " + s"automatically determined. Please provide type information manually.") } } } }
class CRowProcessRunner( name: String, code: String, @transient var returnType: TypeInformation[CRow]) extends ProcessFunction[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[ProcessFunction[Row, Row]] with Logging { private var function: ProcessFunction[Row, Row] = _ private var cRowWrapper: CRowWrappingCollector = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling ProcessFunction: $name \n\n Code:\n$code") val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) LOG.debug("Instantiating ProcessFunction.") function = clazz.newInstance() FunctionUtils.setFunctionRuntimeContext(function, getRuntimeContext) FunctionUtils.openFunction(function, parameters) this.cRowWrapper = new CRowWrappingCollector() } override def processElement( in: CRow, ctx: ProcessFunction[CRow, CRow]#Context, out: Collector[CRow]) : Unit = { cRowWrapper.out = out cRowWrapper.setChange(in.change) function.processElement( in.row, ctx.asInstanceOf[ProcessFunction[Row, Row]#Context], cRowWrapper) } override def getProducedType: TypeInformation[CRow] = returnType override def close(): Unit = { FunctionUtils.closeFunction(function) } }
@PublicEvolving public abstract class ProcessFunction<I, O> extends AbstractRichFunction { private static final long serialVersionUID = 1L; /** * Process one element from the input stream. * * <p>This function can output zero or more elements using the {@link Collector} parameter * and also update internal state or set timers using the {@link Context} parameter. * * @param value The input value. * @param ctx A {@link Context} that allows querying the timestamp of the element and getting * a {@link TimerService} for registering timers and querying the time. The * context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer, * querying the {@link TimeDomain} of the firing timer and getting a * {@link TimerService} for registering timers and querying the time. * The context is only valid during the invocation of this method, do not store it. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} /** * Information available in an invocation of {@link #processElement(Object, Context, Collector)} * or {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class Context { /** * Timestamp of the element currently being processed or timestamp of a firing timer. * * <p>This might be {@code null}, for example if the time characteristic of your program * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. */ public abstract Long timestamp(); /** * A {@link TimerService} for querying time and registering timers. */ public abstract TimerService timerService(); /** * Emits a record to the side output identified by the {@link OutputTag}. * * @param outputTag the {@code OutputTag} that identifies the side output to emit to. * @param value The record to emit. */ public abstract <X> void output(OutputTag<X> outputTag, X value); } /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); } }
class DataStreamCalc( cluster: RelOptCluster, traitSet: RelTraitSet, input: RelNode, inputSchema: RowSchema, schema: RowSchema, calcProgram: RexProgram, ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) with CommonCalc with DataStreamRel { //...... override def translateToPlan( tableEnv: StreamTableEnvironment, queryConfig: StreamQueryConfig): DataStream[CRow] = { val config = tableEnv.getConfig val inputDataStream = getInput.asInstanceOf[DataStreamRel].translateToPlan(tableEnv, queryConfig) // materialize time attributes in condition val condition = if (calcProgram.getCondition != null) { val materializedCondition = RelTimeIndicatorConverter.convertExpression( calcProgram.expandLocalRef(calcProgram.getCondition), inputSchema.relDataType, cluster.getRexBuilder) Some(materializedCondition) } else { None } // filter out time attributes val projection = calcProgram.getProjectList.asScala .map(calcProgram.expandLocalRef) val generator = new FunctionCodeGenerator(config, false, inputSchema.typeInfo) val genFunction = generateFunction( generator, ruleDescription, inputSchema, schema, projection, condition, config, classOf[ProcessFunction[CRow, CRow]]) val inputParallelism = inputDataStream.getParallelism val processFunc = new CRowProcessRunner( genFunction.name, genFunction.code, CRowTypeInfo(schema.typeInfo)) inputDataStream .process(processFunc) .name(calcOpName(calcProgram, getExpressionString)) // keep parallelism to ensure order of accumulate and retract messages .setParallelism(inputParallelism) } }