本文主要研究一下flink Table的Group Windowshtml
Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w") // group the table by window w .select("b.sum"); // aggregate Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, b.sum"); // aggregate Table table = input .window([Window w].as("w")) // define window with alias w .groupBy("w, a") // group the table by attribute a and window w .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
// Tumbling Event-time Window .window(Tumble.over("10.minutes").on("rowtime").as("w")); // Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble.over("10.minutes").on("proctime").as("w")); // Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble.over("10.rows").on("proctime").as("w"));
// Sliding Event-time Window .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w")); // Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w")); // Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
// Session Event-time Window .window(Session.withGap("10.minutes").on("rowtime").as("w")); // Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session.withGap("10.minutes").on("proctime").as("w"));
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaexpress
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { //...... def window(window: Window): WindowedTable = { new WindowedTable(this, window) } //...... }
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaapache
class WindowedTable( private[flink] val table: Table, private[flink] val window: Window) { def groupBy(fields: Expression*): WindowGroupedTable = { val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_)) if (fields.size != fieldsWithoutWindow.size + 1) { throw new ValidationException("GroupBy must contain exactly one window alias.") } new WindowGroupedTable(table, fieldsWithoutWindow, window) } def groupBy(fields: String): WindowGroupedTable = { val fieldsExpr = ExpressionParser.parseExpressionList(fields) groupBy(fieldsExpr: _*) } }
flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalawindows
class WindowGroupedTable( private[flink] val table: Table, private[flink] val groupKeys: Seq[Expression], private[flink] val window: Window) { def select(fields: Expression*): Table = { val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv) val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv) val projectsOnAgg = replaceAggregationsAndProperties( expandedFields, table.tableEnv, aggNames, propNames) val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField) new Table(table.tableEnv, Project( projectsOnAgg, WindowAggregate( groupKeys, window.toLogicalWindow, propNames.map(a => Alias(a._1, a._2)).toSeq, aggNames.map(a => Alias(a._1, a._2)).toSeq, Project(projectFields, table.logicalPlan).validate(table.tableEnv) ).validate(table.tableEnv), // required for proper resolution of the time attribute in multi-windows explicitAlias = true ).validate(table.tableEnv)) } def select(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) //get the correct expression for AggFunctionCall val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv)) select(withResolvedAggFunctionCall: _*) } }