flink 多種類型窗口聚合理解

Flink 認爲 Batch 是 Streaming 的一個特例,因此 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個橋樑。Flink 提供了很是完善的窗口機制,這是我認爲的 Flink 最大的亮點之一(其餘的亮點包括消息亂序處理,和 checkpoint 機制)。本文咱們將介紹流式處理中的窗口概念,介紹 Flink 內建的一些窗口和 Window API,最後討論下窗口在底層是如何實現的。windows

什麼是 Window
在流處理應用中,數據是接二連三的,所以咱們不可能等到全部數據都到了纔開始處理。固然咱們能夠每來一個消息就處理一次,可是有時咱們須要作一些聚合類的處理,例如:在過去的1分鐘內有多少用戶點擊了咱們的網頁。在這種狀況下,咱們必須定義一個窗口,用來收集最近一分鐘內的數據,並對這個窗口內的數據進行計算。session

窗口能夠是時間驅動的(Time Window,例如:每30秒鐘),也能夠是數據驅動的(Count Window,例如:每一百個元素)。一種經典的窗口分類能夠分紅:翻滾窗口(Tumbling Window,無重疊),滾動窗口(Sliding Window,有重疊),和會話窗口(Session Window,活動間隙)。併發


咱們舉個具體的場景來形象地理解不一樣窗口的概念。假設,淘寶網會記錄每一個用戶每次購買的商品個數,咱們要作的是統計不一樣窗口中用戶購買商品的總數。下圖給出了幾種經典的窗口切分概述圖:ide

上圖中,raw data stream 表明用戶的購買行爲流,圈中的數字表明該用戶本次購買的商品個數,事件是按時間分佈的,因此能夠看出事件之間是有time gap的。Flink 提供了上圖中全部的窗口類型,下面咱們會逐一進行介紹。函數

Time Window
就如名字所說的,Time Window 是根據時間對數據流進行分組的。這裏咱們涉及到了流處理中的時間問題,時間問題和消息亂序問題是緊密關聯的,這是流處理中現存的難題之一,咱們將在後續的 EventTime 和消息亂序處理中對這部分問題進行深刻探討。這裏咱們只須要知道 Flink 提出了三種時間的概念,分別是event time(事件時間:事件發生時的時間),ingestion time(攝取時間:事件進入流處理系統的時間),processing time(處理時間:消息被計算處理的時間)。Flink 中窗口機制和時間類型是徹底解耦的,也就是說當須要改變時間類型時不須要更改窗口邏輯相關的代碼。源碼分析

Tumbling Time Window
如上圖,咱們須要統計每一分鐘中用戶購買的商品的總數,須要將用戶的行爲事件按每一分鐘進行切分,這種切分被成爲翻滾時間窗口(Tumbling Time Window)。翻滾窗口能將數據流切分紅不重疊的窗口,每個事件只能屬於一個窗口。經過使用 DataStream API,咱們能夠這樣實現:性能

// Stream of (userId, buyCnt)
val buyCnts: DataStream[(Int, Int)] = ...優化

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by userId
  .keyBy(0) 
  // tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
  // compute sum over buyCnt
  .sum(1)
Sliding Time Window
可是對於某些應用,它們須要的窗口是不間斷的,須要平滑地進行窗口聚合。好比,咱們能夠每30秒計算一次最近一分鐘用戶購買的商品總數。這種窗口咱們稱爲滑動時間窗口(Sliding Time Window)。在滑窗中,一個元素能夠對應多個窗口。經過使用 DataStream API,咱們能夠這樣實現:this

val slidingCnts: DataStream[(Int, Int)] = buyCnts
  .keyBy(0) 
  // sliding time window of 1 minute length and 30 secs trigger interval
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1)
Count Window
Count Window 是根據元素個數對數據流進行分組的。.net

Tumbling Count Window
當咱們想要每100個用戶購買行爲事件統計購買總數,那麼每當窗口中填滿100個元素了,就會對窗口進行計算,這種窗口咱們稱之爲翻滾計數窗口(Tumbling Count Window),上圖所示窗口大小爲3個。經過使用 DataStream API,咱們能夠這樣實現:

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = buyCnts
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the buyCnt sum 
  .sum(1)
Sliding Count Window
固然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是相似的,例如計算每10個元素計算一次最近100個元素的總和,代碼示例以下。

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval
  .countWindow(100, 10)
  .sum(1)
Session Window
在這種用戶交互事件流中,咱們首先想到的是將事件聚合到會話窗口中(一段用戶持續活躍的週期),由非活躍的間隙分隔開。如上圖所示,就是須要計算每一個用戶在活躍期間總共購買的商品數量,若是用戶30秒沒有活動則視爲會話斷開(假設raw data stream是單個用戶的購買行爲流)。Session Window 的示例代碼以下:

// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
  
val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // session window based on a 30 seconds session gap interval 
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
  .sum(1)
通常而言,window 是在無限的流上定義了一個有限的元素集合。這個集合能夠是基於時間的,元素個數的,時間和個數結合的,會話間隙的,或者是自定義的。Flink 的 DataStream API 提供了簡潔的算子來知足經常使用的窗口操做,同時提供了通用的窗口機制來容許用戶本身定義窗口分配邏輯。下面咱們會對 Flink 窗口相關的 API 進行剖析。

剖析 Window API
得益於 Flink Window API 鬆耦合設計,咱們能夠很是靈活地定義符合特定業務的窗口。Flink 中定義一個窗口主要須要如下三個組件。

Window Assigner:用來決定某個元素被分配到哪一個/哪些窗口中去。

以下類圖展現了目前內置實現的 Window Assigners:


Trigger:觸發器。決定了一個窗口什麼時候可以被計算或清除,每一個窗口都會擁有一個本身的Trigger。

以下類圖展現了目前內置實現的 Triggers:


Evictor:能夠譯爲「驅逐者」。在Trigger觸發以後,在窗口被處理以前,Evictor(若是有Evictor的話)會用來剔除窗口中不須要的元素,至關於一個filter。

以下類圖展現了目前內置實現的 Evictors:


上述三個組件的不一樣實現的不一樣組合,能夠定義出很是複雜的窗口。Flink 中內置的窗口也都是基於這三個組件構成的,固然內置窗口有時候沒法解決用戶特殊的需求,因此 Flink 也暴露了這些窗口機制的內部接口供用戶實現自定義的窗口。下面咱們將基於這三者探討窗口的實現機制。

Window 的實現
下圖描述了 Flink 的窗口機制以及各組件之間是如何相互工做的。

首先上圖中的組件都位於一個算子(window operator)中,數據流源源不斷地進入算子,每個到達的元素都會被交給 WindowAssigner。WindowAssigner 會決定元素被放到哪一個或哪些窗口(window),可能會建立新窗口。由於一個元素能夠被放入多個窗口中,因此同時存在多個窗口是可能的。注意,Window自己只是一個ID標識符,其內部可能存儲了一些元數據,如TimeWindow中有開始和結束時間,可是並不會存儲窗口中的元素。窗口中的元素實際存儲在 Key/Value State 中,key爲Window,value爲元素集合(或聚合值)。爲了保證窗口的容錯性,該實現依賴了 Flink 的 State 機制(參見 state 文檔)。

每個窗口都擁有一個屬於本身的 Trigger,Trigger上會有定時器,用來決定一個窗口什麼時候可以被計算或清除。每當有元素加入到該窗口,或者以前註冊的定時器超時了,那麼Trigger都會被調用。Trigger的返回結果能夠是 continue(不作任何操做),fire(處理窗口數據),purge(移除窗口和窗口中的數據),或者 fire + purge。一個Trigger的調用結果只是fire的話,那麼會計算窗口並保留窗口原樣,也就是說窗口中的數據仍然保留不變,等待下次Trigger fire的時候再次執行計算。一個窗口能夠被重複計算屢次知道它被 purge 了。在purge以前,窗口會一直佔用着內存。

當Trigger fire了,窗口中的元素集合就會交給Evictor(若是指定了的話)。Evictor 主要用來遍歷窗口中的元素列表,並決定最早進入窗口的多少個元素須要被移除。剩餘的元素會交給用戶指定的函數進行窗口的計算。若是沒有 Evictor 的話,窗口中的全部元素會一塊兒交給函數進行計算。

計算函數收到了窗口的元素(可能通過了 Evictor 的過濾),並計算出窗口的結果值,併發送給下游。窗口的結果值能夠是一個也能夠是多個。DataStream API 上能夠接收不一樣類型的計算函數,包括預約義的sum(),min(),max(),還有 ReduceFunction,FoldFunction,還有WindowFunction。WindowFunction 是最通用的計算函數,其餘的預約義的函數基本都是基於該函數實現的。

Flink 對於一些聚合類的窗口計算(如sum,min)作了優化,由於聚合類的計算不須要將窗口中的全部數據都保存下來,只須要保存一個result值就能夠了。每一個進入窗口的元素都會執行一次聚合函數並修改result值。這樣能夠大大下降內存的消耗並提高性能。可是若是用戶定義了 Evictor,則不會啓用對聚合窗口的優化,由於 Evictor 須要遍歷窗口中的全部元素,必需要將窗口中全部元素都存下來。

源碼分析
上述的三個組件構成了 Flink 的窗口機制。爲了更清楚地描述窗口機制,以及解開一些疑惑(好比 purge 和 Evictor 的區別和用途),咱們將一步步地解釋 Flink 內置的一些窗口(Time Window,Count Window,Session Window)是如何實現的。

Count Window 實現
Count Window 是使用三組件的典範,咱們能夠在 KeyedStream 上建立 Count Window,其源碼以下所示:

// tumbling count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
  return window(GlobalWindows.create())  // create window stream using GlobalWindows
      .trigger(PurgingTrigger.of(CountTrigger.of(size))); // trigger is window size
}
// sliding count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
  return window(GlobalWindows.create())
    .evictor(CountEvictor.of(size))  // evictor is window size
    .trigger(CountTrigger.of(slide)); // trigger is slide size
}
第一個函數是申請翻滾計數窗口,參數爲窗口大小。第二個函數是申請滑動計數窗口,參數分別爲窗口大小和滑動大小。它們都是基於 GlobalWindows 這個 WindowAssigner 來建立的窗口,該assigner會將全部元素都分配到同一個global window中,全部GlobalWindows的返回值一直是 GlobalWindow 單例。基本上自定義的窗口都會基於該assigner實現。

翻滾計數窗口並不帶evictor,只註冊了一個trigger。該trigger是帶purge功能的 CountTrigger。也就是說每當窗口中的元素數量達到了 window-size,trigger就會返回fire+purge,窗口就會執行計算並清空窗口中的全部元素,再接着儲備新的元素。從而實現了tumbling的窗口之間無重疊。

滑動計數窗口的各窗口之間是有重疊的,但咱們用的 GlobalWindows assinger 從始至終只有一個窗口,不像 sliding time assigner 能夠同時存在多個窗口。因此trigger結果不能帶purge,也就是說計算完窗口後窗口中的數據要保留下來(供下個滑窗使用)。另外,trigger的間隔是slide-size,evictor的保留的元素個數是window-size。也就是說,每一個滑動間隔就觸發一次窗口計算,並保留下最新進入窗口的window-size個元素,剔除舊元素。

假設有一個滑動計數窗口,每2個元素計算一次最近4個元素的總和,那麼窗口工做示意圖以下所示:

圖中所示的各個窗口邏輯上是不一樣的窗口,但在物理上是同一個窗口。該滑動計數窗口,trigger的觸發條件是元素個數達到2個(每進入2個元素就會觸發一次),evictor保留的元素個數是4個,每次計算完窗口總和後會保留剩餘的元素。因此第一次觸發trigger是當元素5進入,第三次觸發trigger是當元素2進入,並驅逐5和2,計算剩餘的4個元素的總和(22)併發送出去,保留下2,4,9,7元素供下個邏輯窗口使用。

Time Window 實現
一樣的,咱們也能夠在 KeyedStream 上申請 Time Window,其源碼以下所示:

// tumbling time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
  if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
    return window(TumblingProcessingTimeWindows.of(size));
  } else {
    return window(TumblingEventTimeWindows.of(size));
  }
}
// sliding time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
  if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
    return window(SlidingProcessingTimeWindows.of(size, slide));
  } else {
    return window(SlidingEventTimeWindows.of(size, slide));
  }
}
在方法體內部會根據當前環境註冊的時間類型,使用不一樣的WindowAssigner建立window。能夠看到,EventTime和IngestTime都使用了XXXEventTimeWindows這個assigner,由於EventTime和IngestTime在底層的實現上只是在Source處爲Record打時間戳的實現不一樣,在window operator中的處理邏輯是同樣的。

這裏咱們主要分析sliding process time window,以下是相關源碼:

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
  private static final long serialVersionUID = 1L;

  private final long size;

  private final long slide;

  private SlidingProcessingTimeWindows(long size, long slide) {
    this.size = size;
    this.slide = slide;
  }

  @Override
  public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
    timestamp = System.currentTimeMillis();
    List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
    // 對齊時間戳
    long lastStart = timestamp - timestamp % slide;
    for (long start = lastStart;
      start > timestamp - size;
      start -= slide) {
      // 當前時間戳對應了多個window
      windows.add(new TimeWindow(start, start + size));
    }
    return windows;
  }
  ...
}
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
  @Override
  // 每一個元素進入窗口都會調用該方法
  public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
    // 註冊定時器,當系統時間到達window end timestamp時會回調該trigger的onProcessingTime方法
    ctx.registerProcessingTimeTimer(window.getEnd());
    return TriggerResult.CONTINUE;
  }

  @Override
  // 返回結果表示執行窗口計算並清空窗口
  public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
    return TriggerResult.FIRE_AND_PURGE;
  }
  ...
}
首先,SlidingProcessingTimeWindows會對每一個進入窗口的元素根據系統時間分配到(size / slide)個不一樣的窗口,並會在每一個窗口上根據窗口結束時間註冊一個定時器(相同學口只會註冊一份),當定時器超時時意味着該窗口完成了,這時會回調對應窗口的Trigger的onProcessingTime方法,返回FIRE_AND_PURGE,也就是會執行窗口計算並清空窗口。整個過程示意圖以下:

如上圖所示橫軸表明時間戳(爲簡化問題,時間戳從0開始),第一條record會被分配到[-5,5)和[0,10)兩個窗口中,當系統時間到5時,就會計算[-5,5)窗口中的數據,並將結果發送出去,最後清空窗口中的數據,釋放該窗口資源。

--------------------- 

相關文章
相關標籤/搜索