掌握Flink中三種經常使用的Time處理方式,掌握Flink中滾動窗口以及滑動窗口的使用,瞭解Flink中的watermark。html
Flink 在流處理工程中支持不一樣的時間概念。java
執行相應算子操做的機器的系統時間.git
當流程序在處理時間運行時,全部基於時間的 算子操做(如時間窗口)將使用運行相應算子的機器的系統時鐘。每小時處理時間窗口將包括在系統時鐘指示整個小時之間到達特定算子的全部記錄。程序員
例如,若是應用程序在上午9:15開始運行,則第一個每小時處理時間窗口將包括在上午9:15到上午10:00之間處理的事件,下一個窗口將包括在上午10:00到11:00之間處理的事件github
處理時間是最簡單的時間概念,不須要流和機器之間的協調apache
它提供最佳性能和最低延遲。可是,在分佈式和異步環境中,處理時間不提供肯定性,由於它容易受到記錄到達系統的速度(例如從消息隊列)到記錄在系統內的算子之間流動的速度的影響。和停電(調度或其餘)。編程
每一個單獨的事件在其生產設備上發生的時間.windows
此時間一般在進入Flink以前內置在記錄中,而且能夠從每一個記錄中提取該事件時間戳。異步
在事件時間,時間的進展取決於數據,而不是任何掛鐘。分佈式
事件時間程序必須指定如何生成事件時間水印,這是表示事件時間進度的機制.
在一個完美的世界中,事件時間處理將產生徹底一致和肯定的結果,不管事件什麼時候到達,或者順序.
可是,除非事件已知按順序到達(按時間戳),不然事件時間處理會在等待無序事件時產生一些延遲。因爲只能等待一段有限的時間,所以限制了肯定性事件時間應用程序的可能性。
假設全部數據都已到達,算子操做將按預期運行,即便在處理無序或延遲事件或從新處理歷史數據時也會產生正確且一致的結果。
例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的全部記錄,不管它們到達的順序如何,或者什麼時候處理它們。(有關更多信息,請參閱有關遲發事件的部分。)
請注意,有時當事件時間程序實時處理實時數據時,它們將使用一些處理時間 算子操做,以確保它們及時進行。
事件進入Flink的時間.
在源算子處,每一個記錄將源的當前時間做爲時間戳,而且基於時間的算子操做(如時間窗口)引用該時間戳。
在概念上位於事件時間和處理時間之間。
在內部,攝取時間與事件時間很是類似,但具備自動時間戳分配和自動水印生成函數
Flink DataStream程序的第一部分一般設置基本時間特性
該設置定義了數據流源的行爲方式(例如,它們是否將分配時間戳),以及窗口 算子操做應該使用的時間概念,好比
KeyedStream.timeWindow(Time.seconds(30))。
如下示例顯示了一個Flink程序,該程序在每小時時間窗口中聚合事件。窗口的行爲適應時間特徵。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 可選的: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props)); stream .keyBy( (event) -> event.getUser() ) .timeWindow(Time.hours(1)) .reduce( (a, b) -> a.add(b) ) .addSink(...);
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props)) stream .keyBy( _.getUser ) .timeWindow(Time.hours(1)) .reduce( (a, b) => a.add(b) ) .addSink(...)
請注意,爲了在事件時間運行此示例,程序須要使用直接爲數據定義事件時間的源並自行發出水印,或者程序必須在源以後注入時間戳分配器和水印生成器。這些函數描述瞭如何訪問事件時間戳,以及事件流表現出的無序程度。
Windows是處理無限流的核心。Windows將流拆分爲有限大小的「桶」,咱們能夠在其上應用計算。咱們重點介紹如何在Flink中執行窗口,以及程序員如何從其提供的函數中獲益最大化。
窗口Flink程序的通常結構以下所示
正如所看到的,惟一的區別是keyBy(...)呼籲Keys流和window(...)成爲windowAll(...)非被Key化的數據流。這也將做爲頁面其他部分的路線圖。
在上面,方括號(...)中的命令是可選的。這代表Flink容許您以多種不一樣方式自定義窗口邏輯,以便最適合您的需求。
簡而言之,只要應該屬於此窗口的第一個數據元到達,就會建立一個窗口,當時間(事件或處理時間)超過其結束時間戳加上用戶指定 時,窗口將被徹底刪除allowed lateness(請參閱容許的延遲))。Flink保證僅刪除基於時間的窗口而不是其餘類型,例如全局窗口(請參閱窗口分配器)。例如,使用基於事件時間的窗口策略,每5分鐘建立一個非重疊(或翻滾)的窗口,並容許延遲1分鐘,Flink將建立一個新窗口,用於間隔12:00和12:05當具備落入此間隔的時間戳的第一個數據元到達時,當水印經過12:06 時間戳時它將刪除它。
此外,每一個窗口將具備Trigger和一個函數(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction)鏈接到它。該函數將包含要應用於窗口內容的計算,而Trigger指定窗口被認爲準備好應用該函數的條件。
觸發策略可能相似於「當窗口中的數據元數量大於4」時,或「當水印經過窗口結束時」。
觸發器還能夠決定在建立和刪除之間的任什麼時候間清除窗口的內容。在這種狀況下,清除僅指窗口中的數據元,而不是窗口元數據。這意味着仍然能夠將新數據添加到該窗口。
除了上述內容以外,您還能夠指定一個Evictor,它能夠在觸發器觸發後以及應用函數以前和/或以後從窗口中刪除數據元。
要指定的第一件事是您的流是否應該鍵入。必須在定義窗口以前完成此 算子操做。使用the keyBy(...)將您的無限流分紅邏輯被Key化的數據流。若是keyBy(...)未調用,則表示您的流不是被Keys化的。
對於被Key化的數據流,能夠將傳入事件的任何屬性用做鍵(此處有更多詳細信息)。擁有被Key化的數據流將容許您的窗口計算由多個任務並行執行,由於每一個邏輯被Key化的數據流能夠獨立於其他任務進行處理。引用相同Keys的全部數據元將被髮送到同一個並行任務。
在非被Key化的數據流的狀況下,您的原始流將不會被拆分爲多個邏輯流,而且全部窗口邏輯將由單個任務執行,即並行度爲1。
指定流是否已鍵入後,下一步是定義一個窗口分配器.
窗口分配器定義如何將數據元分配給窗口,這是經過WindowAssigner 在window(...)(對於被Keys化流)或windowAll()(對於非被Keys化流)調用中指定您的選擇來完成的
WindowAssigner
負責將每一個傳入數據元分配給一個或多個窗口
Flink帶有預約義的窗口分配器,用於最多見的用例,即
還能夠經過擴展WindowAssigner類來實現自定義窗口分配器。全部內置窗口分配器(全局窗口除外)都根據時間爲窗口分配數據元,這能夠是處理時間或事件時間。請查看咱們關於活動時間的部分,瞭解處理時間和事件時間之間的差別以及時間戳和水印的生成方式。
基於時間的窗口具備開始時間戳(包括)和結束時間戳(不包括),它們一塊兒描述窗口的大小。
在代碼中,Flink在使用TimeWindow基於時間的窗口時使用,該窗口具備查詢開始和結束時間戳的方法maxTimestamp()返回給定窗口的最大容許時間戳
下圖顯示了每一個分配者的工做狀況。紫色圓圈表示流的數據元,這些數據元由某個鍵(在這種狀況下是用戶1,用戶2和用戶3)劃分。x軸顯示時間的進度。
一個滾動窗口分配器的每一個數據元分配給指定的窗口的窗口大小。滾動窗口具備固定的尺寸,不重疊.
例如,若是指定大小爲5分鐘的翻滾窗口,則將評估當前窗口,而且每五分鐘將啓動一個新窗口,以下圖所示
如下代碼段顯示瞭如何使用滾動窗口。
DataStream<T> input = ...; // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
val input: DataStream[T] = ... // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
該滑動窗口分配器分配元件以固定長度的窗口。與滾動窗口分配器相似,窗口大小由窗口大小參數配置
附加的窗口滑動參數控制滑動窗口的啓動頻率。所以,若是幻燈片小於窗口大小,則滑動窗口能夠重疊。在這種狀況下,數據元被分配給多個窗口。
例如,您能夠將大小爲10分鐘的窗口滑動5分鐘。有了這個,你每隔5分鐘就會獲得一個窗口,其中包含過去10分鐘內到達的事件,以下圖所示。
如下代碼段顯示瞭如何使用滑動窗口
DataStream<T> input = ...; // 滑動 事件時間 窗口 input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // 滑動 處理時間 窗口 input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>); // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>);
val input: DataStream[T] = ... // tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // tumbling processing-time windows input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) // daily tumbling event-time windows offset by -8 hours. input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) .<windowed transformation>(<window function>)
定義窗口分配器後,咱們須要指定要在每一個窗口上執行的計算。這是窗口函數的職責,窗口函數用於在系統肯定窗口準備好進行處理後處理每一個(多是被Keys化的)窗口的數據元
的窗函數能夠是一個ReduceFunction,AggregateFunction,FoldFunction或ProcessWindowFunction。前兩個能夠更有效地執行,由於Flink能夠在每一個窗口到達時遞增地聚合它們的數據元.
ProcessWindowFunction獲取Iterable窗口中包含的全部數據元以及有關數據元所屬窗口的其餘元信息。
具備ProcessWindowFunction的窗口轉換不能像其餘狀況同樣有效地執行,由於Flink必須在調用函數以前在內部緩衝窗口的全部數據元。這能夠經過組合來減輕ProcessWindowFunction與ReduceFunction,AggregateFunction或FoldFunction以得到兩個窗口元件的增量聚合而且該附加元數據窗口 ProcessWindowFunction接收。咱們將查看每一個變體的示例。
指定如何組合輸入中的兩個數據元以生成相同類型的輸出數據元.
Flink使用ReduceFunction來遞增地聚合窗口的數據元.
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>> { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });
val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>) .window(<window assigner>) .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
原來傳遞進來的數據是字符串,此處咱們就使用數值類型,經過數值類型來演示增量的效果
這裏不是等待窗口全部的數據進行一次性處理,而是數據兩兩處理
7.2 聚合函數An AggregateFunction是一個通用版本,ReduceFunction它有三種類型:輸入類型(IN),累加器類型(ACC)和輸出類型(OUT)。輸入類型是輸入流中數據元的類型,而且AggregateFunction具備將一個輸入數據元添加到累加器的方法。該接口還具備用於建立初始累加器的方法,用於將兩個累加器合併到一個累加器中以及用於OUT從累加器提取輸出(類型)。咱們將在下面的示例中看到它的工做原理。
與之相同ReduceFunction,Flink將在窗口到達時遞增地聚合窗口的輸入數據元。
一個AggregateFunction能夠被定義並這樣使用:
/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());
The accumulator is used to keep a running sum and a count. The [getResult] method \* computes the average. \*/ class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] { override def createAccumulator() = (0L, 0L) override def add(value: (String, Long), accumulator: (Long, Long)) = (accumulator.\_1 + value.\_2, accumulator.\_2 + 1L) override def getResult(accumulator: (Long, Long)) = accumulator.\_1 / accumulator.\_2 override def merge(a: (Long, Long), b: (Long, Long)) = (a.\_1 + b.\_1, a.\_2 + b.\_2) } val input: DataStream[(String, Long)] = ... input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate)
ProcessWindowFunction獲取包含窗口的全部數據元的Iterable,以及可訪問時間和狀態信息的Context對象,這使其可以提供比其餘窗口函數更多的靈活性。這是以性能和資源消耗爲代價的,由於數據元不能以遞增方式聚合,而是須要在內部進行緩衝,直到窗口被認爲已準備好進行處理。
ProcessWindowFunction外觀簽名以下:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ def process( key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]) /** * The context holding window metadata */ abstract class Context { /** * Returns the window that is being evaluated. */ def window: W /** * Returns the current processing time. */ def currentProcessingTime: Long /** * Returns the current event-time watermark. */ def currentWatermark: Long /** * State accessor for per-key and per-window state. */ def windowState: KeyedStateStore /** * State accessor for per-key global state. */ def globalState: KeyedStateStore } }
該key參數是經過KeySelector爲keyBy()調用指定的Keys提取的Keys。在元組索引鍵或字符串字段引用的狀況下,此鍵類型始終是Tuple,您必須手動將其轉換爲正確大小的元組以提取鍵字段。
A ProcessWindowFunction能夠像這樣定義和使用:
DataStream<Tuple2<String, Long>> input = ...; input .keyBy(t -> t.f0) .timeWindow(Time.minutes(5)) .process(new MyProcessWindowFunction()); /* ... */ public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> { @Override public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) { long count = 0; for (Tuple2<String, Long> in: input) { count++; } out.collect("Window: " + context.window() + "count: " + count); } }
val input: DataStream[(String, Long)] = ... input .keyBy(_._1) .timeWindow(Time.minutes(5)) .process(new MyProcessWindowFunction()) /* ... */ class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { var count = 0L for (in <- input) { count = count + 1 } out.collect(s"Window ${context.window} count: $count") } }
該示例顯示了ProcessWindowFunction對窗口中的數據元進行計數的狀況。此外,窗口函數將有關窗口的信息添加到輸出。
注意注意,使用ProcessWindowFunction簡單的聚合(例如count)是很是低效的