【翻譯】Flink window

本文翻譯自flink官網:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.htmlhtml

Windows是處理無限流的核心。Windows將流分紅有限大小的「存儲桶」,咱們能夠在其上應用計算。本文檔重點介紹如何在Flink中執行窗口,以及程序員如何從其提供的功能中得到最大收益。java

窗口式Flink程序的通常結構以下所示。第一個段指的是鍵控流,第二個段指的是非鍵控流正如咱們所看到的,惟一的區別是keyBy(...) 的鍵控流調用 window(...),而非鍵控流調用windowAll(...)這還將用做本頁面其他部分的路線圖。git

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

 Non-Keyed Windows

 stream程序員

.windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

在上面,方括號([…])中的命令是可選的。這代表Flink容許您以多種不一樣方式自定義窗口邏輯,從而使其最適合您的需求。github

窗口生命週期

簡單來講,一旦屬於該窗口的第一個元素到達,就會建立一個窗口,而且當時間(事件或處理時間)超過其結束時間戳加上用戶指定的時間(請參閱「 容許延遲」)後,該窗口將被徹底刪除 )。Flink保證只刪除基於時間的窗口,而不保證其餘類型,例如全局窗口(請參閱窗口分配器)。例如,採用基於事件時間的窗口化策略,該策略每5分鐘建立一次不重疊(或翻滾)的窗口,並容許延遲1分鐘,所以Flink將在 12:00 到 12:05 之間的第一個元素落入此間隔時,建立一個新窗口,當 watermark 達到 時間戳 12:06 時,將刪除這個windowapache

此外,每一個窗口將具備 Trigger(參見觸發器)和一個函數(ProcessWindowFunctionReduceFunction, AggregateFunctionFoldFunction)(見窗口功能)鏈接到它。該函數將包含應用於窗口內容的計算,而 Trigger 指定了在什麼條件下將 應用窗口函數(觸發計算)。觸發策略可能相似於「當窗口中的元素數大於4時」或「當 watermark 達到窗口末尾時」。觸發器還能夠決定在建立和刪除窗口之間的任什麼時候間清除窗口的內容。在這種狀況下,清除僅是指窗口中的元素,而不是窗口元數據。這意味着仍能夠將新數據添加到該窗口。windows

除上述內容外,您還能夠指定一個Evictor(請參閱Evictors),它將在觸發器後以及應用此功能以前(或以後)從窗口中刪除元素。api

在下文中,咱們將對上述每一個組件進行更詳細的介紹。咱們先從上面的代碼片斷中的必需部分開始(請參見Keyed vs Non- Keyed WindowsWindow Assigner和 Window Function),而後再轉到可選部分。markdown

鍵控與非鍵控Windows

要指定的第一件事是您的流是否應該設置key 。這必須在定義窗口以前完成。使用keyBy(...)會將您的無限流分割成邏輯鍵流。若是keyBy(...)未調用,則不會爲您的流設置key。session

在使用鍵控流的狀況下,傳入事件的任何屬性均可以用做鍵(此處有更多詳細信息)。擁有鍵控流將使您的窗口化計算能夠由多個任務並行執行,由於每一個邏輯鍵控流均可以獨立於其他邏輯流進行處理。引用同一鍵的全部元素將被髮送到同一並行任務。

對於非鍵控流,您的原始流將不會拆分爲多個邏輯流,而且全部窗口邏輯將由單個任務執行,並行度爲1。

窗口分配器(Window Assigners)

指定流是否爲鍵後,下一步是定義一個窗口分配器窗口分配器定義瞭如何將元素分配給窗口。這是經過 WindowAssigner 在 window(...)(針對鍵控流)或windowAll()(針對非鍵控流)調用中指定您選擇的選項來完成的

WindowAssigner 負責將每一個傳入元素分配給一個或多個窗口。Flink 帶有針對最多見用例的預約義窗口分配器,即滾動窗口, 滑動窗口,會話窗口和全局窗口您還能夠經過擴展WindowAssigner來實現自定義窗口分配器全部內置窗口分配器(全局窗口除外)均基於時間將元素分配給窗口,時間能夠是處理時間,也能夠是事件時間。請查看事件時間部分,以瞭解處理時間和事件時間之間的差別以及時間戳和水印的生成方式。

基於時間的窗口具備開始時間戳(包括端點)和結束時間戳(包括端點),它們共同描述了窗口的大小。在代碼中,Flink 在使用 TimeWindow 基於時間的窗口時使用,該方法具備查詢開始和結束時間戳記的方法 maxTimestamp()還具備返回給定窗口容許的最大時間戳的附加方法

在下面,咱們展現Flink的預約義窗口分配器如何工做以及如何在 DataStream 程序中使用它們。下圖顯示了每一個分配器的工做狀況。紫色圓圈表示流的元素,這些元素由某個鍵(在這種狀況下爲用戶1,用戶2和用戶3劃分x軸顯示時間進度。

翻滾窗戶

翻滾窗口分配器的每一個元素分配給指定的窗口的窗口大小。翻滾窗口具備固定的大小,而且不重疊。例如,若是您指定大小爲5分鐘的翻滾窗口,當前窗口將被evaluated (不知道怎麼翻譯,大概是被建立,而且分配元素),而且每五分鐘將啓動一個新窗口,以下圖所示。

 

 如下代碼段顯示瞭如何使用滾動窗口。

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

時間間隔能夠經過使用一個指定Time.milliseconds(x)Time.seconds(x), Time.minutes(x),等等。

如最後一個示例所示,滾動窗口分配器還採用一個可選offset 參數,參數可用於更改窗口的對齊方式。例如,若是沒有偏移,則每小時滾動窗口與整點對齊,即您將得到諸如 1:00:00.000 - 1:59:59.9992:00:00.000 - 2:59:59.999依此類推的窗口 。若是要更改,能夠提供一個偏移量。如 15分鐘的偏移量,你會,例如,拿 1:15:00.000 - 2:14:59.9992:15:00.000 - 3:14:59.999等,一個重要的用例的偏移是窗口調整到 UTC-0 時區,例如,在中國,您必須指定的偏移量Time.hours(-8)。(這裏Flink 在某些版本有bug,JIRA : FLINK-11326

 滑動窗

滑動窗口分配器分配以固定長度的窗口。相似於滾動窗口分配器,窗口的大小窗口大小參數配置附加的窗口滑動參數控制滑動窗口啓動的頻率。所以,若是滑動參數小於窗口大小,則滑動窗口可能會重疊。在這種狀況下,元素被分配給多個窗口。

例如,您能夠將大小爲10分鐘的窗口滑動5分鐘。這樣,您每隔5分鐘就會獲得一個窗口,其中包含最近10分鐘內到達的事件,以下圖所示。

 

 如下代碼段顯示瞭如何使用滑動窗口。

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

時間間隔能夠經過使用一個指定Time.milliseconds(x)Time.seconds(x), Time.minutes(x),等等。

如最後一個示例所示,滑動窗口分配器還採用一個可選offset參數,參數可用於更改窗口的對齊方式。例如,在沒有偏移的狀況下,每小時滑動30分鐘的窗口將與整點對齊,即您將得到諸如 1:00:00.000 - 1:59:59.9991:30:00.000 - 2:29:59.999依此類推的窗口。若是更改,能夠提供一個偏移量。如15分鐘的偏移量,你會拿到 1:15:00.000 - 2:14:59.9991:45:00.000 - 2:44:59.999等的窗口,一個重要的用例的偏移是窗口調整到 UTC-0 時區,例如,在中國,您必須指定的偏移量Time.hours(-8)

會話窗口

在會話窗口中按活動會話分配器中的元素。與滾動窗口和滑動窗口相比,會話窗口不重疊且沒有固定的開始和結束時間。相反,當會話窗口在必定時間段內未收到元素時(即發生不活動間隙時),它將關閉。會話窗口分配器可與靜態配置會話間隙或與 會話間隙提取功能,其限定不活動週期有多長。當此時間段到期時,當前會話將關閉,隨後的元素將分配給新的會話窗口。

 

如下代碼段顯示瞭如何使用會話窗口。

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

靜態間隙能夠經過使用 Time.milliseconds(x)Time.seconds(x), Time.minutes(x) 等中的一個來指定。

動態間隙是經過實現SessionWindowTimeGapExtractor接口指定的

注意:因爲會話窗口沒有固定的開始和結束,所以對它們的evaluated (決定窗口元素的歸屬)不一樣於滾動窗口和滑動窗口。在內部,會話窗口運算符會爲每一個到達的記錄建立一個新窗口,若是窗口彼此之間的距離比已定義的間隔小,則將它們合併在一塊兒。爲了可合併的,會話窗口操做者須要一個合併觸發器和一個合併 的窗函數,如ReduceFunctionAggregateFunction,或ProcessWindowFunction (FoldFunction不能合併。)

全局視窗

一個局性的窗口分配器分配使用相同的密鑰的全部元素到相同的單個全局窗口。僅當您還指定自定義觸發器時,此窗口方案纔有用。不然,將不會執行任何計算,由於全局窗口沒有能夠處理聚合元素的天然結束。

 

 

如下代碼段顯示瞭如何使用全局窗口。 

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

窗口函數

定義窗口分配器後,咱們須要指定要在每一個窗口上執行的計算。這是窗口函數的職責,一旦系統肯定某個窗口已準備好進行處理,就可使用該窗口的函數來處理每一個(多是鍵控)窗口的元素(請參閱觸發器,瞭解Flink如何肯定何時窗口準備好)。

窗口函數能夠是 ReduceFunctionAggregateFunctionFoldFunction 或 ProcessWindowFunction前兩個能夠更有效地執行(請參見「 狀態大小」部分),由於 Flink 能夠在每一個窗口元素到達時逐步地聚合它們 ProcessWindowFunction 獲取窗口中包含的全部元素的 Iterable ,以及有關元素所屬窗口的其餘元信息。

帶 ProcessWindowFunction 的窗口轉換不能像其餘狀況同樣有效地執行,由於Flink必須在調用函數以前在內部緩衝窗口的全部元素。這能夠經過組合 ProcessWindowFunction 與 ReduceFunctionAggregateFunction FoldFunction 來減輕(窗口狀態大小),(ReduceFunctionAggregateFunction FoldFunction得到窗口元素的增量聚合,附加ProcessWindowFunction 接收 咱們將看看這些變體的每一個示例。

Reduce功能

ReduceFunction 指定如何將輸入中的兩個元素組合在一塊兒以產生相同類型的輸出元素。Flink 使用 ReduceFunction 來逐步聚合窗口的元素。

ReduceFunction像這樣使用:

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

上面的示例彙總了窗口中全部元素的元組的第二個字段。

聚合函數

 AggregateFunction 是一個通常化版本 ReduceFunction,其具備三種類型:輸入類型(IN),累加器(ACC),和一個輸出類型(OUT)。輸入類型是輸入流中元素的類型,而且 AggregateFunction 具備將一個輸入元素添加到累加器的方法。該接口還具備建立初始累加器,將兩個累加器合併爲一個累加器以及 OUT(方法)從累加器提取輸出(類型)的方法。在下面的示例中,咱們將瞭解其工做原理。

與同樣ReduceFunction,Flink將在窗口輸入元素到達時逐步聚合它們。

一個AggregateFunction能夠被定義並這樣使用:

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

上面的示例計算窗口中元素的第二個字段的平均值。

 Fold功能

FoldFunction指定如何將窗口的輸入元素與輸出類型的元素組合。 對於添加到窗口的每一個元素和當前輸出值,將遞增調用FoldFunction。 第一個元素將與預約義的輸出類型的初始值組合。

FoldFunction能夠定義像這樣使用:

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("") { (acc, v) => acc + v._2 }

上面的示例將全部輸入Long附加到最初爲空字符串中

 注意: fold()不能與會話窗口或其餘可合併窗口一塊兒使用。

 ProcessWindowFunction

 ProcessWindowFunction 獲取一個 Iterable,該Iterable包含窗口的全部元素,以及一個 Context 對象,該對象能夠訪問時間和狀態信息,從而使其比其餘窗口函數更具靈活性。這是以性能和資源消耗爲代價的,由於沒法增量聚合元素,而是須要在內部對其進行緩衝,直到將窗口視爲已準備好進行處理爲止。

ProcessWindowFunctionlook 的定義以下:

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W

    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long

    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long

    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore

    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }

}

注意:key參數是經過提取被指定的keyBy()調用KeySelector 指定的若是是元組索引鍵或字符串字段引用,則始終使用此鍵類型,Tuple而且必須手動將其強制轉換爲正確大小的元組以提取鍵字段。

ProcessWindowFunction能夠這樣使用:

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

該示例顯示了一個 ProcessWindowFunction 計算窗口中元素的數量。另外,窗口函數將有關窗口的信息添加到輸出中。

注意:請注意 ProcessWindowFunction 用於簡單的聚合(如count)效率很低。下一部分說明如何將 ReduceFunction 或 AggregateFunction 與 ProcessWindowFunction 結合使用,以同時得到增量聚合的 ProcessWindowFunction

具備增量聚合的ProcessWindowFunction

ProcessWindowFunction 可與 ReduceFunction、 AggregateFunction 或 FoldFunction組合爲它們在窗口到達元素作增量聚會,當窗口關閉時ProcessWindowFunction 將提供彙總結果。這使得它能夠遞增地計算窗口,同時能夠訪問ProcessWindowFunction的其餘窗口元信息。

注意:您也可使用舊版 WindowFunction 而不是 ProcessWindowFunction 用於增量窗口聚合。

具備ReduceFunction的增量窗口聚合

如下示例顯示瞭如何將增量 ReduceFunction ProcessWindowFunction組合,以返回窗口中的最小事件以及該窗口的開始時間。

val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((context.window.getStart, min))
      }
  )

具備AggregateFunction的增量窗口聚合

如下示例顯示瞭如何將增量 AggregateFunction 與 ProcessWindowFunction 組合以計算平均值,並與平均值一塊兒輸出 key 和窗口。

val input: DataStream[(String, Long)] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {

  def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]): () = {
    val average = averages.iterator.next()
    out.collect((key, average))
  }
}

具備FoldFunction的增量窗口聚合

如下示例顯示瞭如何將增量 FoldFunction 與 ProcessWindowFunction 組合以提取窗口中的事件數,並還返回窗口的 key 和結束時間。

val input: DataStream[SensorReading] = ...

input
 .keyBy(<key selector>)
 .timeWindow(<duration>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )

在 ProcessWindowFunction 中使用窗口狀態 

除了訪問鍵控狀態(如任何rich function 所容許的那樣),ProcessWindowFunction 還可使用鍵控狀態,該鍵控狀態的範圍僅限於該函數當前正在處理的窗口。在這種狀況下,重要的是要了解每一個窗口狀態所指的窗口是什麼。涉及不一樣的「窗口」:

  • 指定窗口操做時定義的窗口:這多是1小時的翻滾窗口或2小時的滑動窗口滑動1小時
  • 給定鍵的已定義窗口的實際實例:對於用戶ID xyz,這多是從12:00到13:00的時間窗口這是基於窗口定義的,而且根據做業當前正在處理的鍵的數量以及事件屬於哪一個時隙,會有不少窗口。

每一個窗口的狀態與這兩個中的後者相關。這意味着,若是咱們處理1000個不一樣鍵的事件,而且當前全部事件的事件都屬於[12:00,13:00)時間窗口,那麼將有1000個窗口實例,每一個實例具備各自的每一個窗口狀態。

調用Context對象上有兩種方法process()能夠訪問兩種狀態:

  • globalState(),它容許訪問不在窗口範圍內的鍵狀態
  • windowState(),它容許訪問也做用於窗口的鍵控狀態

若是您預期同一窗口會屢次觸發,則此功能頗有用,例如,對於遲到的數據有較早的觸發,或者您有進行推測性較早觸發的自定義觸發器時,可能會發生這種狀況。在這種狀況下,您將存儲有關先前觸發或每一個窗口狀態中觸發次數的信息。

使用窗口狀態時,清除窗口時也要清除該狀態,這一點很重要。這應該在clear()方法中發生

WindowFunction(舊版)

在某些 ProcessWindowFunction 可使用的地方,您也可使用WindowFunction這是舊版本 ProcessWindowFunction,提供較少的上下文信息,而且沒有某些高級功能,例如每一個窗口的鍵控狀態。該接口將在某個時候被棄用。

WindowFunction 的簽名以下:

trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key    The key for which this window is evaluated.
    * @param window The window that is being evaluated.
    * @param input  The elements in the window being evaluated.
    * @param out    A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}

能夠這樣使用:

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction())

觸發器

Trigger 肯定窗口(由窗口分配器造成什麼時候調用窗口函數處理數據每一個 WindowAssigner 都有一個默認值 Trigger若是默認觸發器不符合您的需求,則可使用指定自定義觸發器 trigger(...)

觸發器接口具備五種方法,它們容許 Trigger 對不一樣事件作出反應:

  • onElement()對於添加到窗口中的每一個元素,都會調用方法。
  • onEventTime()當註冊的事件時間計時器觸發時,將調用方法。
  • onProcessingTime()當註冊的處理時間計時器觸發時,將調用方法。
  • 該 onMerge()方法與有狀態觸發器相關,而且在兩個觸發器的相應窗口合併時(例如在使用會話窗口時)合併兩個觸發器的狀態
  • 最後,在clear()方法在刪除相應的窗口後執行所需的任何操做。

關於上述方法,須要注意兩點:

1)前三個經過返回來決定如何對調用事件採起行動 TriggerResult該動做能夠是如下之一:

  • CONTINUE: 沒作什麼
  • FIRE:觸發​​計算
  • PURGE:清除窗口中的元素
  • FIRE_AND_PURGE:觸發​​計算並隨後清除窗口中的元素

2)這些方法中的任何一種均可以用於註冊處理或事件時間計時器以用於未來的操做。

Fire與Purge

一旦觸發器肯定窗口已準備好進行處理,它將觸發,返回 FIRE 或 FIRE_AND_PURGE這是窗口 operator 發出當前窗口結果的信號。給定一個包含 ProcessWindowFunction 全部元素的窗口(可能在將它們傳遞到驅逐器以後)傳遞給 ProcessWindowFunction帶有 ReduceFunctionAggregateFunction 或 Windows FoldFunction 僅發出其預計算的彙總結果。

觸發器觸發時,能夠是 FIRE 或 FIRE_AND_PURGEFIRE 的同時保留窗口內容,FIRE_AND_PURGE 刪除其內容。默認狀況下,預先實現的觸發器僅在 FIRE, 不清除窗口狀態的狀況下觸發計算

注意 PURGE 將僅刪除窗口的內容,並將保留有關該窗口的任何潛在元信息和任何觸發狀態。

WindowAssigners的默認觸發器

默認 Trigger 的 WindowAssigner 是適用於許多使用狀況。例如,全部事件時間窗口分配器都有 EventTimeTrigger 默認觸發器。一旦水印經過窗口的末端,該觸發器便會觸發。

注意:默認觸發器 GlobalWindow 是 NeverTrigger 永不觸發的。所以,使用 GlobalWindow 時,您始終必須定義一個自定義觸發器 

注意:經過使用指定觸發器 trigger()您將覆蓋的 WindowAssigner 默認觸發器 例如:若是您指定爲 CountTrigger,則 TumblingEventTimeWindows 您將再也不基於時間進度(意味着窗口結束的時候是不會觸發的,須要本身在觸發器裏面觸發),而是僅經過計數得到窗口觸發。如今,若是要基於時間和計數作出反應,則必須編寫本身的自定義觸發器。

內置和自定義觸發器

Flink帶有一些內置觸發器。

  • (已經提到)EventTimeTrigger 根據事件時間(由水印測量的進度觸發。
  • 在 ProcessingTimeTrigger fire 基於處理時間。
  • CountTrigger 一旦窗口中的元素數量超過給定的限制,就會觸發。
  • PurgingTrigger 將另外一個觸發器做爲參數,並將其轉換爲一個清除觸發器。

若是須要實現自定義觸發器,則應實現抽象類的 Trigger請注意,API仍在不斷髮展,並可能在Flink的將來版本中更改。

驅逐器

Flink 的窗口模型容許 除了 WindowAssigner 和  Trigger 以外還指定一個可選的 Evictor 可使用 evictor(...)方法完成(如本文檔開頭所示)驅逐器能夠從窗口中刪除元素,在觸發器觸發和 被施加的窗口函數執行以前(或以後)。爲此,該Evictor接口有兩種方法:

/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

evictBefore()包含要在窗口函數以前應用的逐出邏輯,而evictAfter()包含要在窗口函數以後應用的逐出邏輯。 應用窗口函數以前逐出的元素將不會被其處理。

Flink附帶了三個預先實施的驅逐程序。這些是:

  • CountEvictor:從窗口中保留用戶指定數量的元素,並從窗口緩衝區的開頭丟棄其他的元素。
  • DeltaEvictor:使用 DeltaFunction 和 threshold,計算窗口緩衝區中最後一個元素與其他每一個元素之間的差值,並刪除差值大於或等於閾值的元素。
  • TimeEvictor:以 interval 毫秒爲單位做爲參數,對於給定的窗口,它將 max_ts 在其元素中找到最大時間戳,並刪除全部時間戳小於的元素 max_ts - interval

默認:默認狀況下,全部預先實現的驅逐程序均在窗口函數以前應用其邏輯。

注意:指定 evictor  可防止任何預彙集,由於在應用計算以前必須將窗口的全部元素傳遞給 evictor 。

注意Flink 不保證窗口內元素的順序這意味着,儘管 evictor  能夠從窗口的開頭刪除元素,但不必定是最早到達的元素。

容許延遲

在使用事件時間窗口時,可能會發生元素遲到的狀況,即  Flink 用於跟蹤事件時間進度的水印已經超過了元素所屬窗口的結束時間戳。請參閱 事件時間,尤爲是遲到元素,以更全面地討論 Flink 如何處理事件時間。

默認狀況下,當水印超過窗口末端時,將刪除晚期元素。可是,Flink容許爲窗口運算符指定最大容許延遲。容許延遲指定元素刪除以前能夠延遲的時間,其默認值爲0。在水印經過窗口末端以後但在經過窗口末端以前到達的元素加上容許延遲,仍添加到窗口中。根據使用的觸發器,延遲但未掉落的元素可能會致使窗口再次觸發。的狀況就是這樣EventTimeTrigger

爲了使此工做正常進行,Flink 保持窗口的狀態,直到容許的延遲過時爲止。一旦發生這種狀況,Flink 將刪除該窗口並刪除其狀態,如「 窗口生命週期」部分中所述。

默認:默認狀況下,容許的延遲設置爲 0。也就是說,到達水印後的元素將被丟棄。

您能夠這樣指定容許的延遲:

val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>)

注意:使用 GlobalWindows 窗口分配器時,永遠不會考慮任何數據遲到,由於全局窗口的結束時間戳爲Long.MAX_VALUE 

獲取遲到數據做爲側邊輸出

使用 Flink 的側邊輸出功能,您能夠獲取最近被丟棄的數據流。

首先,您須要指定要在窗口流上使用獲取遲到數據的 sideOutputLateData(OutputTag) 而後,您能夠根據窗口化操做的結果獲取側面輸出流:

val lateOutputTag = OutputTag[T]("late-data")

val input: DataStream[T] = ...

val result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>)

val lateStream = result.getSideOutput(lateOutputTag)

遲到元素注意事項

當指定的容許延遲大於0時,在水印經過窗口末尾以後,將保留窗口及其內容。在這些狀況下,當延遲但未丟棄的元素到達時,可能會觸發該窗口的另外一次觸發。這些觸發稱爲late firings,由於它們是由遲到的事件觸發的,與之相反的main firing 是窗口的第一次觸發。在會話窗口的狀況下,後期觸發會進一步致使窗口合併,由於它們可能「彌合」兩個預先存在的未合併窗口之間的間隙。

注意:您應注意,遲到觸發發射的元素應被視爲先前計算(結果)的更新結果,即,您的數據流將包含同一計算的多個結果。根據您的應用程序,您須要考慮這些重複的結果或對它們進行刪除重複數據。

窗口結果上作處理

窗口化操做的結果仍是一個DataStream,結果操做元素中沒有保留任何有關窗口化操做的信息,所以,若是要保留有關窗口的元信息,則必須在ProcessWindowFunction的結果元素中手動編碼該信息。在結果元素上設置的惟一相關信息是元素時間戳。因爲窗口結束時間戳是互斥的,所以將其設置爲已處理窗口的最大容許時間戳,即結束時間戳-1。 請注意,對於事件時間窗口和處理時間窗口都是如此。 即在窗口操做元素以後始終具備時間戳,但這能夠是事件時間時間戳或處理時間時間戳。 對於處理時間窗口,這沒有特殊的含義,可是對於事件時間窗口,這連同水印與窗口的交互方式一塊兒,能夠以相同的窗口大小進行連續的窗口操做。 在查看水印如何與窗口交互以後咱們將進行介紹。

水印和窗戶的相互做用

在繼續本節以前,您可能須要看一下有關 事件時間和水印的部分

當水印到達窗口 operator 時,將觸發兩件事:

  • 水印會觸發全部最大時間戳(即end-timestamp-1)小於新水印的全部窗口的計算
  • 水印被(按原樣)轉發到下游操做

直觀地,一旦下游操做收到水印後,水印就會「flushes」全部在下游操做中被認爲是後期的窗口。

連續窗口操做

如前所述,計算開窗結果的時間戳的方式以及水印與窗口的交互方式容許將連續的開窗操做組合在一塊兒。當您要執行兩個連續的窗口化操做時,若是您想使用不一樣的鍵(意思是有使用 keyby,不是說兩個窗口使用不一樣的 keyby),但仍但願來自同一上游窗口的元素最終位於同一下游窗口中,此功能將很是有用。考慮如下示例: 

val input: DataStream[Int] = ...

val resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer())

val globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction())

在此示例中,在此示例中,第一個操做的時間窗口[0,5)的結果也將在隨後的窗口操做中的時間窗口[0,5)中結束。這容許計算每一個鍵的總和,而後在第二個操做中計算同一窗口內的前k個元素。 

有效的 state 規模考慮 

Windows 能夠定義很長時間(例如幾天,幾周或幾個月),所以會積累很大的狀態。在估算窗口計算的存儲需求時,須要牢記一些規則:

  1. Flink 爲每一個元素所屬的窗口建立一個副本。鑑於此,滾動窗口將保留每一個元素的一個副本(一個元素剛好屬於一個窗口,除非它被延遲放置)。相反,滑動窗口會爲每一個元素建立多個,如「 窗口分配器」部分所述。所以,大小爲1天的滑動窗口和滑動1秒的滑動窗口可能不是一個好主意。

  2. ReduceFunctionAggregateFunctionFoldFunction能夠大大減小存儲需求,由於它們提早地聚合元素而且每一個窗口僅存儲一個值。相反,使用 ProcessWindowFunction 須要累積全部元素。

  3. 使用 Evictor能夠防止任何預聚合,由於在應用計算以前,必須將窗口的全部元素傳遞給 Evictor(請參閱Evictor)。

 歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文

 

相關文章
相關標籤/搜索