本文主要研究一下flink Table的Distinct Aggregationhtml
//Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation. Table orders = tableEnv.scan("Orders"); // Distinct aggregation on group by Table groupByDistinctResult = orders .groupBy("a") .select("a, b.sum.distinct as d"); // Distinct aggregation on time window group by Table groupByWindowDistinctResult = orders .window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w") .select("a, b.sum.distinct as d"); // Distinct aggregation on over window Table result = orders .window(Over .partitionBy("a") .orderBy("rowtime") .preceding("UNBOUNDED_RANGE") .as("w")) .select("a, b.avg.distinct over w, b.max over w, b.min over w"); //User-defined aggregation function can also be used with DISTINCT modifiers Table orders = tEnv.scan("Orders"); // Use distinct aggregation for user-defined aggregate functions tEnv.registerFunction("myUdagg", new MyUdagg()); orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/functions/AggregateFunction.scalajava
/** * Base class for User-Defined Aggregates. * * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom * methods. An [[AggregateFunction]] needs at least three methods: * - createAccumulator, * - accumulate, and * - getValue. * * There are a few other methods that can be optional to have: * - retract, * - merge, and * - resetAccumulator * * All these methods must be declared publicly, not static and named exactly as the names * mentioned above. The methods createAccumulator and getValue are defined in the * [[AggregateFunction]] functions, while other methods are explained below. * * * {{{ * Processes the input values and update the provided accumulator instance. The method * accumulate can be overloaded with different custom types and arguments. An AggregateFunction * requires at least one accumulate() method. * * @param accumulator the accumulator which contains the current aggregated results * @param [user defined inputs] the input value (usually obtained from a new arrived data). * * def accumulate(accumulator: ACC, [user defined inputs]): Unit * }}} * * * {{{ * Retracts the input values from the accumulator instance. The current design assumes the * inputs are the values that have been previously accumulated. The method retract can be * overloaded with different custom types and arguments. This function must be implemented for * datastream bounded over aggregate. * * @param accumulator the accumulator which contains the current aggregated results * @param [user defined inputs] the input value (usually obtained from a new arrived data). * * def retract(accumulator: ACC, [user defined inputs]): Unit * }}} * * * {{{ * Merges a group of accumulator instances into one accumulator instance. This function must be * implemented for datastream session window grouping aggregate and dataset grouping aggregate. * * @param accumulator the accumulator which will keep the merged aggregate results. It should * be noted that the accumulator may contain the previous aggregated * results. Therefore user should not replace or clean this instance in the * custom merge method. * @param its an [[java.lang.Iterable]] pointed to a group of accumulators that will be * merged. * * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit * }}} * * * {{{ * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for * dataset grouping aggregate. * * @param accumulator the accumulator which needs to be reset * * def resetAccumulator(accumulator: ACC): Unit * }}} * * * @tparam T the type of the aggregation result * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the * aggregated values which are needed to compute an aggregation result. * AggregateFunction represents its state using accumulator, thereby the state of the * AggregateFunction must be put into the accumulator. */ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction { /** * Creates and init the Accumulator for this [[AggregateFunction]]. * * @return the accumulator with the initial value */ def createAccumulator(): ACC /** * Called every time when an aggregation result should be materialized. * The returned value could be either an early and incomplete result * (periodically emitted as data arrive) or the final result of the * aggregation. * * @param accumulator the accumulator which contains the current * aggregated results * @return the aggregation result */ def getValue(accumulator: ACC): T /** * Returns true if this AggregateFunction can only be applied in an OVER window. * * @return true if the AggregateFunction requires an OVER window, false otherwise. */ def requiresOver: Boolean = false /** * Returns the TypeInformation of the AggregateFunction's result. * * @return The TypeInformation of the AggregateFunction's result or null if the result type * should be automatically inferred. */ def getResultType: TypeInformation[T] = null /** * Returns the TypeInformation of the AggregateFunction's accumulator. * * @return The TypeInformation of the AggregateFunction's accumulator or null if the * accumulator type should be automatically inferred. */ def getAccumulatorType: TypeInformation[ACC] = null }
這幾個方法中子類必須實現createAccumulator、getValue方法
)這幾個方法中子類必須實現createAccumulator、getValue方法
)對於datastream bounded over aggregate操做,要求實現restract方法,該方法接收ACC,T兩個參數,返回void;對於datastream session window grouping aggregate以及dataset grouping aggregate操做,要求實現merge方法,該方法接收ACC,java.lang.Iterable\<T\>兩個參數,返回void;對於dataset grouping aggregate操做,要求實現resetAccumulator方法,該方法接收ACC參數,返回void
)