聊聊flink Table的Group Windows

本文主要研究一下flink Table的Group Windowshtml

實例

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w")  // group the table by window w
  .select("b.sum");  // aggregate

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, b.sum");  // aggregate

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
  • window操做能夠對Window進行別名,而後能夠在groupBy及select中引用,window有start、end、rowtime屬性能夠用,其中start及rowtime是inclusive的,而end爲exclusive

Tumbling Windows實例

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));
  • Tumbling Windows按固定窗口大小來移動,於是窗口不重疊;over方法用於指定窗口大小;窗口大小能夠基於event-time、processing-time、row-count來定義

Sliding Windows實例

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
  • Sliding Windows在slide interval小於window size的時候,窗口會有重疊,於是rows可能歸屬多個窗口;over方法用於指定窗口大小,窗口大小能夠基於event-time、processing-time、row-count來定義;every方法用於指定slide interval

Session Windows實例

// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));
  • Session Windows沒有固定的窗口大小,它基於inactivity的程度來關閉窗口,withGap方法用於指定兩個窗口的gap,做爲time interval;Session Windows只能使用event-time或者processing-time

Table.window

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaexpress

class Table(
    private[flink] val tableEnv: TableEnvironment,
    private[flink] val logicalPlan: LogicalNode) {

  //......
  
  def window(window: Window): WindowedTable = {
    new WindowedTable(this, window)
  }
  
  //......
}
  • Table提供了window操做,接收Window參數,建立的是WindowedTable

WindowedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalaapache

class WindowedTable(
    private[flink] val table: Table,
    private[flink] val window: Window) {

  def groupBy(fields: Expression*): WindowGroupedTable = {
    val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
    if (fields.size != fieldsWithoutWindow.size + 1) {
      throw new ValidationException("GroupBy must contain exactly one window alias.")
    }

    new WindowGroupedTable(table, fieldsWithoutWindow, window)
  }

  def groupBy(fields: String): WindowGroupedTable = {
    val fieldsExpr = ExpressionParser.parseExpressionList(fields)
    groupBy(fieldsExpr: _*)
  }

}
  • WindowedTable只提供groupBy操做,其中groupBy能夠接收String類型的參數,也能夠接收Expression類型的參數;String類型的參數會被轉換爲Expression類型,最後調用的是Expression類型參數的groupBy方法;若是groupBy除了window沒有其餘屬性,則其parallelism爲1,只會在單一task上執行;groupBy方法建立的是WindowGroupedTable

WindowGroupedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scalawindows

class WindowGroupedTable(
    private[flink] val table: Table,
    private[flink] val groupKeys: Seq[Expression],
    private[flink] val window: Window) {

  def select(fields: Expression*): Table = {
    val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
    val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)

    val projectsOnAgg = replaceAggregationsAndProperties(
      expandedFields, table.tableEnv, aggNames, propNames)

    val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)

    new Table(table.tableEnv,
      Project(
        projectsOnAgg,
        WindowAggregate(
          groupKeys,
          window.toLogicalWindow,
          propNames.map(a => Alias(a._1, a._2)).toSeq,
          aggNames.map(a => Alias(a._1, a._2)).toSeq,
          Project(projectFields, table.logicalPlan).validate(table.tableEnv)
        ).validate(table.tableEnv),
        // required for proper resolution of the time attribute in multi-windows
        explicitAlias = true
      ).validate(table.tableEnv))
  }

  def select(fields: String): Table = {
    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    //get the correct expression for AggFunctionCall
    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
    select(withResolvedAggFunctionCall: _*)
  }
}
  • WindowGroupedTable只提供select操做,其中select能夠接收String類型的參數,也能夠接收Expression類型的參數;String類型的參數會被轉換爲Expression類型,最後調用的是Expression類型參數的select方法;select方法建立了新的Table,其Project的child爲WindowAggregate

小結

  • window操做能夠對Window進行別名,而後能夠在groupBy及select中引用,window有start、end、rowtime屬性能夠用,其中start及rowtime是inclusive的,而end爲exclusive
  • Tumbling Windows按固定窗口大小來移動,於是窗口不重疊;over方法用於指定窗口大小;窗口大小能夠基於event-time、processing-time、row-count來定義;Sliding Windows在slide interval小於window size的時候,窗口會有重疊,於是rows可能歸屬多個窗口;over方法用於指定窗口大小,窗口大小能夠基於event-time、processing-time、row-count來定義;every方法用於指定slide interval;Session Windows沒有固定的窗口大小,它基於inactivity的程度來關閉窗口,withGap方法用於指定兩個窗口的gap,做爲time interval;Session Windows只能使用event-time或者processing-time
  • Table提供了window操做,接收Window參數,建立的是WindowedTable;WindowedTable只提供groupBy操做,其中groupBy能夠接收String類型的參數,也能夠接收Expression類型的參數;String類型的參數會被轉換爲Expression類型,最後調用的是Expression類型參數的groupBy方法;若是groupBy除了window沒有其餘屬性,則其parallelism爲1,只會在單一task上執行;groupBy方法建立的是WindowGroupedTable;WindowGroupedTable只提供select操做,其中select能夠接收String類型的參數,也能夠接收Expression類型的參數;String類型的參數會被轉換爲Expression類型,最後調用的是Expression類型參數的select方法;select方法建立了新的Table,其Project的child爲WindowAggregate

doc

相關文章
相關標籤/搜索