Flink Window基本概念與實現原理

Window意爲窗口。在流處理系統中數據源源不斷流入到系統,咱們能夠逐條處理流入的數據,也能夠按必定規則一次處理流中的多條數據。當處理數據時程序須要知道何時開始處理、處理哪些數據。窗口提供了這樣一種依據,決定了數據什麼時候開始處理。windows

Flink內置Windowbash

Flink有3個內置Windowsession

  • 以事件數量驅動的Count Windowide

  • 以會話間隔驅動的Session Windowui

  • 以時間驅動的Time Windowthis

本文圍繞這3個內置窗口展開討論,咱們首先了解這3個窗口在運行時產生的現象,最後再討論它們的實現原理。spa

Count Window3d

計數窗口,採用事件數量做爲窗口處理依據。計數窗口分爲滾動和滑動兩類,使用keyedStream.countWindow實現計數窗口定義。代理

  • Tumbling Count Window 滾動計數窗口
    例子:以用戶分組,當每位用戶有3次付款事件時計算一次該用戶付款總金額。下圖中「消息A、B、C、D」表明4位不一樣用戶,咱們以A、B、C、D分組並計算金額。code

/** 每3個事件,計算窗口內數據 */
keyedStream.countWindow(3);
複製代碼


  • Sliding Count Window 滑動計數窗口
    例子:一位用戶每3次付款事件計算最近4次付款事件總金額。

/** 每3個事件,計算最近4個事件消息 */
keyedStream.countWindow(4,3);
複製代碼



Session Window

會話窗口,採用會話持續時長做爲窗口處理依據。設置指定的會話持續時長時間,在這段時間中再也不出現會話則認爲超出會話時長。


例子:每隻股票超過2秒沒有交易事件時計算窗口內交易總金額。下圖中「消息A、消息B」表明兩隻不一樣的股票。

/** 會話持續2秒。當超過2秒再也不出現會話認爲會話結束 */
keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))複製代碼


Time Window


時間窗口,採用時間做爲窗口處理依據。時間窗分爲滾動和滑動兩類,使用keyedStream.timeWindow實現時間窗定義。

  • Tumbling Time Window 滾動時間窗口:

/** 每1分鐘,計算窗口數據 */
keyedStream.timeWindow(Time.minutes(1));複製代碼
  • Sliding Time Window 滑動時間窗口:

/** 每半分鐘,計算最近1分鐘窗口數據 */
keyedStream.timeWindow(Time.minutes(1), Time.seconds(30));
複製代碼


Flink Window組件

Flink Window使用3個組件協同實現了內置的3個窗口。經過對這3個組件不一樣的組合,能夠知足許多場景的窗口定義。


WindowAssigner組件爲數據分配窗口、Trigger組件決定如何處理窗口中的數據、藉助Evictor組件實現靈活清理窗口中數據時機。

WindowAssigner

當有數據流入到Window Operator時須要按照必定規則將數據分配給窗口,WindowAssigner爲數據分配窗口。下面代碼片斷是WindowAssigner部分定義,assignWindows方法定義返回的結果是一個集合,也就是說數據容許被分配到多個窗口中。

/*** WindowAssigner關鍵接口定義 ***/
public abstract class WindowAssigner<T, W extends Window> implements Serializable {

    /** 分配數據到窗口集合並返回 */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
}複製代碼
Flink內置WindowAssigner

Flink針對不一樣窗口類型實現了相應的WindowAssigner。Flink 1.7.0繼承關係以下圖

Trigger

Trigger觸發器,它定義了3個觸發動做,而且定義了觸發動做處理完畢後的返回結果。返回結果交給Window Operator後由Window Operator決定後續操做。也就是說,Trigger經過具體的動做處理結果決定窗口是否應該被處理、被清除、被處理+清除、仍是什麼都不作。
/** Trigger關鍵接口定義 */
public abstract class Trigger<T, W extends Window> implements Serializable {

    /*** 新的數據進入窗口時觸發 ***/
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

    /*** 處理時間計數器觸發 ***/
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

    /*** 事件時間計數器觸發 ***/
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
}
複製代碼


當有數據流入Window Operator時會觸發onElement方法、當處理時間和事件時間生效時會觸發onProcessingTime和onEventTime方法。每一個觸發動做的返回結果用TriggerResult定義。

TriggerResult返回類型及說明

Trigger觸發運算後返回處理結果,處理結果使用TriggerResult枚舉表示。

public enum TriggerResult {
    CONTINUE,FIRE,PURGE,FIRE_AND_PURGE;
}複製代碼
Flink內置Trigger

Flink的內置窗口(Counter、Session、Time)有本身的觸發器實現。下表爲不一樣窗口使用的觸發器。


Evictor


Evictor驅逐者,若是定義了Evictor當執行窗口處理前會刪除窗口內指定數據再交給窗口處理,或等窗口執行處理後再刪除窗口中指定數據。

public interface Evictor<T, W extends Window> extends Serializable {
    /** 在窗口處理前刪除數據 */
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    /** 在窗口處理後刪除數據 */
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
}複製代碼
Flink內置Evictor

實現原理

經過KeyedStream能夠直接建立Count Window和Time Window。他們最終都是基於window(WindowAssigner)方法建立,在window方法中建立WindowedStream實例,參數使用當前的KeyedStream對象和指定的WindowAssigner。

/** 依據WindowAssigner實例化WindowedStream */
public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
    return new WindowedStream<>(this, assigner);
}複製代碼
/** WindowedStream構造器 */
public WindowedStream(KeyedStream<T, K> input, WindowAssigner<? super T, W> windowAssigner) {
    this.input = input;
    this.windowAssigner = windowAssigner;
    this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
複製代碼

構造器執行完畢後,WindowedStream建立完成。構造器中初始化了3個屬性。默認狀況下trigger屬性使用WindowAssigner提供的DefaultTrigger做爲初始值。

同時,WindowedStream提供了trigger方法用來覆蓋默認的trigger。Flink內置的計數窗口就使用windowedStream.trigger方法覆蓋了默認的trigger。

public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
    if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
        throw new UnsupportedOperationException();
    }
    if (windowAssigner instanceof BaseAlignedWindowAssigner) {
        throw new UnsupportedOperationException();
    }
    this.trigger = trigger;
    return this;
}複製代碼

在WindowedStream中還有一個比較重要的屬性evictor,能夠經過evictor方法設置。

public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
    if (windowAssigner instanceof BaseAlignedWindowAssigner) {
        throw new UnsupportedOperationException();
    }
    this.evictor = evictor;
    return this;
}複製代碼

WindowedStream實現中根據evictor屬性是否空(null == evictor)決定是建立WindowOperator仍是EvictingWindowOperator。EvictingWindowOperator繼承自WindowOperator,它主要擴展了evictor屬性以及相關的邏輯處理。

public class EvictingWindowOperator extends WindowOperator {
    private final Evictor evictor;
}複製代碼

Evictor定義了清理數據的時機。在EvictingWindowOperator的emitWindowContents方法中,實現了清理數據邏輯調用。這也是EvictingWindowOperator與WindowOperator的主要區別。「在WindowOperator中壓根就沒有evictor的概念」

private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
    /** Window處理前數據清理 */
    evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
    /** Window處理 */
    userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
    /** Window處理後數據清理 */
    evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
}
複製代碼

Count Window API

下面代碼片斷是KeyedStream提供建立Count Window的API。

/** 滾動計數窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
    return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}

/** 滑動計數窗口 */
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
    return window(GlobalWindows.create())
        .evictor(CountEvictor.of(size))
        .trigger(CountTrigger.of(slide));
}
複製代碼

滾動計數窗口與滑動計數窗口有幾個差別

  • 入參不一樣

  • 滑動窗口使用了evictor組件

  • 二者使用的trigger組件不一樣

下面咱們對這幾點差別作深刻分析,看一看他們是如何影響滾動計數窗口和滑動計數窗口的。

Count Window Assigner
經過方法window(GlobalWindows.create())建立WindowedStream實例,滾動計數窗口處理和滑動計數窗口處理都是基於GlobalWindows做爲WindowAssigner來建立窗口處理器。GlobalWindows將全部數據都分配到同一個GlobalWindow中。「這裏須要注意GlobalWindows是一個WindowAssigner,而GlobalWindow是一個Window」
/** GlobalWindows是一個WindowAssigner實現,這裏只展現實現assignWindows的代碼片斷 */
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
    /** 返回一個GlobalWindow */
    public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(GlobalWindow.get());
    }
}
複製代碼

GlobalWindow繼承了Window,表示爲一個窗口。對外提供get()方法返回GlobalWindow實例,而且是個全局單例。因此當使用GlobalWindows做爲WindowAssigner時,全部數據將被分配到一個窗口中。

/** GlobalWindow是一個Window */
public class GlobalWindow extends Window {
    private static final GlobalWindow INSTANCE = new GlobalWindow();
    /** 永遠返回GlobalWindow單例 */
    public static GlobalWindow get() {
        return INSTANCE;
    }
}
複製代碼
Count Window Trigger
滾動計數窗口建立時使用PurgingTrigger.of(CountTrigger.of(size))覆蓋了GlobalWindows默認的Trigger,而滑動計數窗口建立時使用CountTrigger.of(size)覆蓋了GlobalWindows默認的Trigger。

PurgingTrigger是一個代理模式的Trigger實現,在計數窗口中PurgingTrigger代理了CountTrigger。

/** PurgingTrigger代理的Trigger */
private Trigger<T, W> nestedTrigger;
/** PurgingTrigger私有構造器 */
private PurgingTrigger(Trigger<T, W> nestedTrigger) {
    this.nestedTrigger = nestedTrigger;
}

/** 爲代理的Trigger構造一個PurgingTrigger實例 */
public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
    return new PurgingTrigger<>(nestedTrigger);
}
複製代碼

在這裏比較一下PurgingTrigger.onElement和CountTrigger.onElement方法實現,幫助理解PurgingTrigger的做用。

/** CountTrigger實現 */
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
    ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
    count.add(1L);
    if (count.get() >= maxCount) {
        count.clear();
        return TriggerResult.FIRE;
    }
    return TriggerResult.CONTINUE;
}
/** PurgingTrigger實現 */
public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
    TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
    return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
複製代碼

在CountTrigger實現中,當事件流入窗口後計數+1,以後比較窗口中事件數是否大於設定的最大數量,一旦大於最大數量返回FIRE。也就是說只處理窗口數據,不作清理。

在PurgingTrigger實現中,依賴CountTrigger的處理邏輯,但區別在於當CounterTrigger返回FIRE時PurgingTrigger返回FIRE_AND_PURGE。也就是說不只處理窗口數據,還作數據清理。經過這種方式實現了滾動計數窗口數據不重疊。

Count Window Evictor
滾動計數窗口和滑動計數窗口另外一個區別在於滑動計數窗口經過windowedStream.evictor(CountEvictor.of(size))方法設置了Evictor,而滾動窗口並無設置Evictor。

滑動計數窗口依賴Evictor組件在窗口處理前清除了指定數量之外的數據,再交給窗口處理。經過這種方式實現了窗口計算最近指定次數的事件數量。

總結

Time Window API

下面代碼片斷是KeyedStream中提供建立Time Window的API。

/** 建立滾動時間窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
    if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
        return window(TumblingProcessingTimeWindows.of(size));
    } else {
        return window(TumblingEventTimeWindows.of(size));
    }
}
/** 建立滑動時間窗口 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
    if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
        return window(SlidingProcessingTimeWindows.of(size, slide));
    } else {
        return window(SlidingEventTimeWindows.of(size, slide));
    }
}
複製代碼


建立TimeWindow時會根據Flink應用當前時間類型environment.getStreamTimeCharacteristic()來決定使用哪一個WindowAssigner建立窗口。

Flink對時間分紅了3類。處理時間、攝入時間、事件時間。使用TimeCharacteristic枚舉定義。

public enum TimeCharacteristic {
    /** 處理時間 */
    ProcessingTime,
    /** 攝入時間 */
    IngestionTime,
    /** 事件時間 */
    EventTime
}
複製代碼


對於Flink的3個時間概念,咱們目前只須要了解

  • 處理時間(TimeCharacteristic.ProcessingTime)就是運行Flink環境的系統時鐘產生的時間

  • 事件時間(TimeCharacteristic.EventTime)是業務上產生的時間,由數據自身攜帶

  • 攝入時間(TimeCharacteristic.IngestionTime)是數據進入到Flink的時間,它在底層實現上與事件時間相同。

Time Window Assigner

下面的表格中展現了窗口類型和時間類型對應的WindowAssigner的實現類

咱們以一個TumblingProcessingTimeWindows和一個SlidingEventTimeWindows爲例,討論它的實現原理。

TumblingProcessingTimeWindows
TumblingProcessingTimeWindows基於處理時間的滾動時間窗口分配器,它是一個WindowAssigner。Flink提供兩個接口初始化TumblingProcessingTimeWindows
public static TumblingProcessingTimeWindows of(Time size) {
    return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}
public static TumblingProcessingTimeWindows of(Time size, Time offset) {
    return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
複製代碼


無論使用哪一種方式初始化TumblingProcessingTimeWindows,最終都會調用同一個構造方法初始化,構造方法初始化size和offset兩個屬性。

/** TumblingProcessingTimeWindows構造器 */
private TumblingProcessingTimeWindows(long size, long offset) {
    if (offset < 0 || offset >= size) {
        throw new IllegalArgumentException();
    }
    this.size = size;
    this.offset = offset;
}
複製代碼

TumblingProcessingTimeWindows是一個WindowAssigner,因此它實現了assignWindows方法來爲流入的數據分配窗口。

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    final long now = context.getCurrentProcessingTime();
    long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
    return Collections.singletonList(new TimeWindow(start, start + size));
}
複製代碼

第一步assignWindows首先得到系統當前時間戳,context.getCurrentProcessingTime();最終實現實際是調用System.currentTimeMillis()。

第二步執行TimeWindow.getWindowStartWithOffset(now, offset, size);這個方法根據當前時間、偏移量、設置的間隔時間最終計算窗口起始時間。

第三步根據起始時間和結束時間建立一個新的窗口new TimeWindow(start, start + size)並返回。

好比,但願每10秒處理一次窗口數據keyedStream.timeWindow(Time.seconds(10))。當數據源源不斷的流入Window Operator時,它會按10秒切割一個時間窗。

咱們假設數據在2019年1月1日 12:00:07到達,那麼窗口如下面方式切割(請注意,窗口是左閉右開)。

Window[2019年1月1日 12:00:00, 2019年1月1日 12:00:10)複製代碼

若是在2019年1月1日 12:10:09又一條數據到達,窗口是這樣的

Window[2019年1月1日 12:10:00, 2019年1月1日 12:10:10)複製代碼

若是咱們但願從第15秒開始,每過1分鐘計算一次窗口數據,這種場景須要用到offset。基於處理時間的滾動窗口能夠這樣寫

keyedStream.window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15)))複製代碼

咱們假設數據從2019年1月1日 12:00:14到達,那麼窗口如下面方式切割

Window[2019年1月1日 11:59:15, 2019年1月1日 12:00:15)複製代碼

若是在2019年1月1日 12:00:16又一數據到達,那麼窗口如下面方式切割

Window[2019年1月1日 12:00:15, 2019年1月1日 12:01:15)複製代碼

TumblingProcessingTimeWindows.assignWindows方法每次都會返回一個新的窗口,也就是說窗口是不重疊的。但由於TimeWindow實現了equals方法,因此經過計算後start, start + size相同的數據,在邏輯上是同一個窗口。

public boolean equals(Object o) {
    if (this == o) {
        return true;
    }
    if (o == null || getClass() != o.getClass()) {
        return false;
    }
    TimeWindow window = (TimeWindow) o;
    return end == window.end && start == window.start;
}
複製代碼
SlidingEventTimeWindows
SlidingEventTimeWindows基於事件時間的滑動時間窗口分配器,它是一個WindowAssigner。Flink提供兩個接口初始化SlidingEventTimeWindows
public static SlidingEventTimeWindows of(Time size, Time slide) {
    return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
    return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),offset.toMilliseconds() % slide.toMilliseconds());
}複製代碼

一樣,無論使用哪一種方式初始化SlidingEventTimeWindows,最終都會調用同一個構造方法初始化,構造方法初始化三個屬性size、slide和offset。

protected SlidingEventTimeWindows(long size, long slide, long offset) {
    if (offset < 0 || offset >= slide || size <= 0) {
        throw new IllegalArgumentException();
    }
    this.size = size;
    this.slide = slide;
    this.offset = offset;
}複製代碼

SlidingEventTimeWindows是一個WindowAssigner,因此它實現了assignWindows方法來爲流入的數據分配窗口。

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    if (timestamp > Long.MIN_VALUE) {
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart; start > timestamp - size;start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    } else {
        throw new RuntimeException();
    }
}複製代碼

與基於處理時間的WindowAssigner不一樣,基於事件時間的WindowAssigner不依賴於系統時間,而是依賴於數據自己的事件時間。在assignWindows方法中第二個參數timestamp就是數據的事件時間。

第一步assignWindows方法會先初始化一個List<TimeWindow>,大小是size / slide。這個集合用來存放時間窗對象並做爲返回結果。

第二步執行TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);計算窗口起始時間。

第三步根據事件時間、滑動大小和窗口大小計算並生成數據能落入的窗口new TimeWindow(start, start + size),最後加入到List集合並返回。「由於是滑動窗口一個數據可能落在多個窗口」

好比,但願每5秒滑動一次處理最近10秒窗口數據keyedStream.timeWindow(Time.seconds(10), Time.seconds(5))。當數據源源不斷流入Window Operator時,會按10秒切割一個時間窗,5秒滾動一次。

咱們假設一條付費事件數據付費時間是2019年1月1日 17:11:24,那麼這個付費數據將落到下面兩個窗口中(請注意,窗口是左閉右開)。

Window[2019年1月1日 17:11:20, 2019年1月1日 17:11:30)
Window[2019年1月1日 17:11:15, 2019年1月1日 17:11:25)複製代碼
Time Windo w Trigger
Flink API在建立Time Window時沒有使用windowStream.trigger方法覆蓋默認Trigger。

TumblingProcessingTimeWindows使用ProcessingTimeTrigger做爲默認Trigger。ProcessingTimeTrigger在onElement的策略是永遠返回CONTINUE,也就是說它不會由於數據的流入觸發窗口計算和清理。在返回CONTINUE前調用registerProcessingTimeTimer(window.maxTimestamp());註冊一個定時器,而且邏輯相同學口只註冊一次,事件所在窗口的結束時間與系統當前時間差決定了定時器多久後觸發。

ScheduledThreadPoolExecutor.schedule(new TriggerTask(), timeEndTime - systemTime, TimeUnit.MILLISECONDS);
複製代碼

定時器一旦觸發會回調Trigger的onProcessingTime方法。ProcessingTimeTrigger中實現的onProcessingTime直接返回FIRE。也就是說系統時間大於等於窗口最大時間時,經過回調方式觸發窗口計算。但由於返回的是FIRE只是觸發了窗口計算,並無作清除。

SlidingEventTimeWindows使用EventTimeTrigger做爲默認Trigger。事件時間、攝入時間與處理時間在時間概念上有一點不一樣,處理時間處理依賴的是系統時鐘生成的時間,而事件時間和攝入時間依賴的是Watermark(水印)。咱們如今只須要知道水印是一個時間戳,能夠由Flink以固定的時間間隔發出,或由開發人員根據業務自定義。水印用來衡量處理程序的時間進展。

EventTimeTrigger的onElement方法中比較窗口的結束時間與當前水印時間,若是窗口結束時間已小於或等於當前水印時間當即返回FIRE。

「我的理解這是因爲時間差問題致使的窗口時間小於或等於當前水印時間,正常狀況下若是窗口結束時間已經小於水印時間則數據不會被處理,也不會調用onElement」

若是窗口結束時間大於當前水印時間,調用registerEventTimeTimer(window.maxTimestamp())註冊一個事件後直接返回CONTINUE。EventTime註冊事件沒有使用Scheduled,由於它依賴水印時間。因此在註冊時將邏輯相同的時間窗封裝爲一個特定對象添加到一個排重隊列,而且相同學口對象只添加一次。

上面提到水印是以固定時間間隔發出或由開發人員自定義的,Flink處理水印時從排重隊列頭獲取一個時間窗對象與水印時間戳比較,一旦窗口時間小於或等於水印時間回調trigger的onEventTime。

EventTimeTrigger中onEventTime並非直接返回FIRE,而是判斷窗口結束時間與獲取的時間窗對象時間作比較,僅當時間相同時才返回FIRE,其餘狀況返回CONTINUE。「我的理解這麼作是爲了知足滑動窗口的需求,由於滑動窗口在排重隊列中存在兩個不一樣的對象,而兩個窗口對象的時間可能同時知足回調條件」

Time Window Evictor

Flink內置Time Window實現沒有使用Evictor。

Session Window API

KeyedStream中沒有爲Session Window提供相似Count Windown和Time Window同樣能直接使用的API。咱們可使用window(WindowAssigner assigner)建立Session Window。

好比建立一個基於處理時間,時間間隔爲2秒的SessionWindow能夠這樣實現

keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))複製代碼
Assigner
Flink內置的Session Window Assigner所有繼承MergingWindowAssigner。下圖展現了MergingWindowAssigner的上下結構關係。

MergingWindowAssigner繼承了WindowAssigner,因此它具有分配時間窗的能力。MergingWindowAssigner自身是一個能夠merge的Window,它的內部定義了一個mergeWindows抽象方法以及merge時的回調定義。

public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

public interface MergeCallback<W> {
    void merge(Collection<W> toBeMerged, W mergeResult);
}複製代碼

咱們以ProcessingTimeSessionWindows爲例介紹Session Window。ProcessingTimeSessionWindows提供了一個靜態方法用來初始化ProcessingTimeSessionWindows

public static ProcessingTimeSessionWindows withGap(Time size) {
    return new ProcessingTimeSessionWindows(size.toMilliseconds());
}複製代碼

靜態方法withGap接收一個時間參數,用來描述時間間隔。並調用構造方法將時間間隔賦值給sessionTimeout屬性。

protected ProcessingTimeSessionWindows(long sessionTimeout) {
    if (sessionTimeout <= 0) {
        throw new IllegalArgumentException();
    }
    this.sessionTimeout = sessionTimeout;
}複製代碼

ProcessingTimeSessionWindows是一個WindowAssigner,因此它實現了數據分配窗口的能力。

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
    long currentProcessingTime = context.getCurrentProcessingTime();
    return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
}複製代碼

ProcessingTimeSessionWindows會爲每一個數據都分配一個新的時間窗口。因爲是基於處理時間,因此窗口的起始時間就是系統當前時間,而結束時間是系統當前時間+設置的時間間隔。經過起始時間和結束時間肯定了窗口的時間範圍。

Trigger
若是在代碼中咱們不手動覆蓋Trigger,那麼將使用ProcessingTimeSessionWindows默認的ProcessingTimeTrigger
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
    return ProcessingTimeTrigger.create();
}複製代碼

ProcessingTimeTrigger在基於處理時間的Time Window介紹過,它經過註冊、onProcessorTime回調方式觸發窗口計算,這裏再也不討論。

Evictor
Session Window不禁Flink API控制生成,徹底取決於客戶端如何建立。在建立Window實例後能夠經過調用evictor方法並傳入Flink內置的Evictor或本身實現的Evictor。
Merging
Session Window繼承MergingWindowAssigner,MergingWindowAssigner繼承WindowAssigner。因此本質上Session Window仍是一個WindowAssigner,但因繼承了MergingWindowAssigner使得本身具備了一個「能夠合併時間窗口」的特性。
public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
    TimeWindow.mergeWindows(windows, c);
}複製代碼

Session Window處理流程大體是這樣

  1. 使用WindowAssigner爲流入的數據分配窗口

  2. Merge窗口,將存在交集的窗口合併,取最小時間和最大時間做爲窗口的起始和關閉。假設有兩條數據流入系統後,經過WindowAssigner分配的窗口分別是
    數據A:Window[2019年1月1日 10:00:00, 2019年1月1日 10:20:00)
    數據B:Window[2019年1月1日 10:05:00, 2019年1月1日 10:25:00)
    通過合併後,使用數據A的起始時間和數據B的結束時間做爲節點,窗口時間變爲了
    [2019年1月1日 10:00:00, 2019年1月1日 10:25:00)

  3. 執行Trigger.onMerge,爲合併後的窗口註冊回調事件

  4. 移除其餘註冊的回調事件

  5. Window State合併

  6. 開始處理數據,執行Trigger.onElement
    …後續與其餘Window處理同樣

能夠看到,Session Window與Time Window相似,經過註冊回調方式觸發數據處理。但不一樣的是Session Window經過不斷爲新流入的數據作Merge操做來改變回調時間點,以實現Session Window的特性。

總結

  • Window Operator建立
    Window處理流程由WindowOperator或EvictingWindowOperator控制,他們的關係及區別體如今如下幾點

  1. EvictingWindowOperator繼承自WindowOperator,因此EvictingWindowOperator是一個WindowOperator,具有WindowOperator的特性。

  2. 清理窗口數據的機制不一樣,EvictingWindowOperator內部依賴Evictor組件,而WindowOperator內部不使用Evictor。這也致使它們兩個Operator初始化時的差別

  • MergeWindow特殊處理
    能夠合併窗口的WindowAssigner會繼承MergingWindowAssigner。當數據流入Window Operator後,根據WindowAssigner是否爲一個MergingWindowAssigner決定了處理流程。

  • 窗口生命週期
    Flink內置的窗口生命週期是不一樣的,下表描述了他們直接的差別

  • 側路輸出
    當Flink應用採用EventTime做爲時間機制時,Window不會處理延遲到達的數據,也就是說不處理在水印時間戳以前的數據。Flink提供了一個SideOutput機制能夠處理這些延遲到達的數據。經過WindowedStream.sideOutputLateData方法實現側路輸出。

  • 自定義窗口
    Flink內置窗口利用WindowAssigner、Trigger、Evictor3個組件的相互組合實現了多種很是強大的功能,咱們也能夠嘗試經過組件實現一個自定義的Window。因爲篇幅緣由,自定義窗口下篇再細聊。

做者:TalkingData 史天舒

相關文章
相關標籤/搜索