在流式數據中,數據是連續的,一般是無限的,對流中的全部元素進行計數是不可能的,因此在流上的聚合須要由window來劃定範圍,例如過去五分鐘內用戶瀏覽量的計算或者最後100個元素的和。window就是一種能夠把無限數據切割爲有限數據塊的手段。
窗口能夠由時間或者數量來作區分
1.根據時間進行截取,好比每10分鐘統計一次,即時間驅動的[Time Window]
2.根據消息數量進行統計,好比每100個數據統計一次,即數據驅動[Count Window]windows
時間窗口又分爲滾動窗口,滑動窗口,和會話窗口
(1)滾動窗口-tumbling windows
時間對齊,窗口長度固定,沒有重疊
如圖:以固定的長度進行分割,好比一分鐘的內的計數網絡
開窗方法:session
//滾動窗口 stream.keyBy(0) .window(TumblingEventTimeWindows.of(Time.seconds(2))) .sum(1) .print();
(2)滑動窗口-sliding windows
時間對齊,窗口長度固定,有重疊,展示的是數據的變化趨勢
如圖:窗口大小爲4,步長爲2,每隔兩秒統計僅4s的數據ide
開窗方法:oop
//滑動窗口 stream.keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(4))) .sum(1) .print();
(3)會話窗口-session windows
當流中達到多長時間沒有新的數據到來,上一個會話窗口就是截至到新數據到來前接收到的最後一條數據,當新數據到來後,上一個窗口將會關閉,開啓一個新的窗口。線程
開窗方法:翻譯
stream .keyBy(0) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .sum(1) .print();
(4)沒有窗口(全局窗口)-global windows
global window + trigger 一塊兒配合才能使用日誌
// 單詞每出現三次統計一次 stream .keyBy(0) .window(GlobalWindows.create()) //若是不加這個程序是啓動不起來的 .trigger(CountTrigger.of(3)) .sum(1) .print(); 執行結果: hello,3 hello,6 hello,9
總結:效果跟CountWindow(3)很像,但又有點不像,由於若是是CountWindow(3),單詞每次出現的都是3次,不會包含以前的次數,而咱們剛剛的這個每次都包含了以前的次數。code
針對stream數據中的時間,能夠分爲如下三種:
Event Time:事件產生的時間,它一般由事件中的時間戳描述。
Ingestion time:事件進入Flink的時間
Processing Time:事件被處理時當前系統的時間blog
案例演示:
原始日誌以下
2019-11-11 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
這條數據進入Flink的時間是2019-11-11 20:00:00,102
到達window處理的時間爲2019-11-11 20:00:01,100
2019-11-11 10:00:01,134 是Event time
2019-11-11 20:00:00,102 是Ingestion time
2019-11-11 20:00:01,100 是Processing time
思考:
若是咱們想要統計每分鐘內接口調用失敗的錯誤日誌個數,使用哪一個時間纔有意義?
需求:每隔5秒計算最近10秒的單詞出現的次數
自定義source,模擬:第 13 秒的時候連續發送 2 個事件,第 16 秒的時候再發送 1 個事件
輸出結果:
開始發送事件的時間:16:16:40 (hadoop,2) (hadoop,3) (hadoop,1)
窗口驗證過程:
需求:每隔5秒計算最近10秒的單詞出現的次數
自定義source,模擬:第 13 秒的時候連續發送 2 個事件,第 16 秒的時候再發送 1 個事件
可是這裏在13秒的時候正常發送了一個事件,有一個事件因爲網絡等其餘緣由,沒有成功發送,在19秒的時候才發送出去。
輸出結果:
開始發送事件的時間:16:18:50 (hadoop,1) (1573287543001,1) (1573287543001,1) (hadoop,3) (1573287546016,1) (1573287543016,1) (1573287546016,1) (hadoop,2) (1573287543016,1)
使用Event Time處理
在源數據流中map獲取時間時間 .assignTimestampsAndWatermarks(new EventTimeExtractor() ) EventTimeExtractor()實現AssignerWithPeriodicWatermarks接口,獲取事件時間 //拿到第一個事件的 Event Time @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) { return element.f1; }
執行結果:
start send event time:16:40:50
(hadoop,1)
(hadoop,3)
(hadoop,1)
如今咱們第三個window的結果已經計算準確了,可是咱們仍是沒有完全的解決問題(黃色事件應該在第一個窗口中計數,可是沒有)。接下來就須要咱們使用WaterMark機制來解決了。
核心實現是在窗口觸發的時候延時一段時間 //window的觸發時間 @Nullable @Override public Watermark getCurrentWatermark() { //window延遲5秒觸發 return new Watermark(System.currentTimeMillis() - 5000); }
執行結果:
start send event time:16:54:30
(hadoop,2)
(hadoop,3)
(hadoop,1)
使用eventTime的時候如何處理亂序數據?
在使用eventTime做爲處理時間的時候須要考慮亂序時間。
咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡延遲等緣由,致使亂序的產生,特別是使用kafka的話,多個分區的數據沒法保證有序(單個分區是保證有序的)。因此在進行window計算的時候,咱們又不能無限期的等下去,必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。
這個特別的機制,就是watermark,watermark是用於處理亂序事件的。watermark能夠
翻譯爲水位線
需求:獲得並打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的全部的事件
要求:按事件時間開窗並處理亂序問題
思路:
設置flink的時間處理機制爲enventTime,
事件流的事件進入後更新最新的事件時間,
最新的事件時間減掉容許最大的亂序時間爲水印時間,
當水印時間大於等於窗口時間時,計算當前窗口數據
核心思想:事件流時間推進窗口的移動和計算
演示數據:
-- window 計算觸發的條件 一條一條輸入: 000001,1461756862000 000001,1461756866000 000001,1461756872000 000001,1461756873000 000001,1461756874000 000001,1461756876000 000001,1461756877000 輸出結果: event = (000001,1461756862000)|Event Time:2016-04-27 19:34:22|Max Event Time:2016-04-27 19:34:22|Current Watermark:2016-04-27 19:34:12 water mark... event = (000001,1461756866000)|Event Time:2016-04-27 19:34:26|Max Event Time:2016-04-27 19:34:26|Current Watermark:2016-04-27 19:34:16 water mark... event = (000001,1461756872000)|Event Time:2016-04-27 19:34:32|Max Event Time:2016-04-27 19:34:32|Current Watermark:2016-04-27 19:34:22 water mark... event = (000001,1461756873000)|Event Time:2016-04-27 19:34:33|Max Event Time:2016-04-27 19:34:33|Current Watermark:2016-04-27 19:34:23 water mark... event = (000001,1461756874000)|Event Time:2016-04-27 19:34:34|Max Event Time:2016-04-27 19:34:34|Current Watermark:2016-04-27 19:34:24 water mark... event = (000001,1461756876000)|Event Time:2016-04-27 19:34:36|Max Event Time:2016-04-27 19:34:36|Current Watermark:2016-04-27 19:34:26 water mark... process start time:2021-01-04 18:26:51 window start time:2016-04-27 19:34:21 [(000001,1461756862000)|2016-04-27 19:34:22] window end time:2016-04-27 19:34:24 water mark... event = (000001,1461756877000)|Event Time:2016-04-27 19:34:37|Max Event Time:2016-04-27 19:34:37|Current Watermark:2016-04-27 19:34:27 water mark... process start time:2021-01-04 18:26:52 window start time:2016-04-27 19:34:24 [(000001,1461756866000)|2016-04-27 19:34:26] window end time:2016-04-27 19:34:27 當2016-04-27 19:34:37的事件時間更新爲最大的currentMaxEventTime,此時得到的時間水印是2016-04-27 19:34:27,觸發窗口[2016-04-27 19:34:24—2016-04-27 19:34:27)的計算 再輸入: 000001,1461756885000 000001,1461756892000 輸出: event = (000001,1461756885000)|Event Time:2016-04-27 19:34:45|Max Event Time:2016-04-27 19:34:45|Current Watermark:2016-04-27 19:34:35 water mark... process start time:2021-01-04 18:27:05 window start time:2016-04-27 19:34:30 [(000001,1461756872000)|2016-04-27 19:34:32] window end time:2016-04-27 19:34:33 water mark... event = (000001,1461756892000)|Event Time:2016-04-27 19:34:52|Max Event Time:2016-04-27 19:34:52|Current Watermark:2016-04-27 19:34:42 water mark... process start time:2021-01-04 18:30:38 window start time:2016-04-27 19:34:33 [(000001,1461756873000)|2016-04-27 19:34:33, (000001,1461756874000)|2016-04-27 19:34:34] window end time:2016-04-27 19:34:36 process start time:2021-01-04 18:30:38 window start time:2016-04-27 19:34:36 [(000001,1461756876000)|2016-04-27 19:34:36, (000001,1461756877000)|2016-04-27 19:34:37] window end time:2016-04-27 19:34:39 新的事件進入後更新了最新的時間時間,觸發新的窗口計算 若是再輸入: 000001,1461756862000 輸出: event = (000001,1461756862000)|Event Time:2016-04-27 19:34:22|Max Event Time:2016-04-27 19:34:52|Current Watermark:2016-04-27 19:34:42 能夠看到此時窗口時間已超過事件時間,會丟棄這個事件,不作處理
總結:window觸發的時間
(1)丟棄
這個是默認的處理方式
(2)allowedLateness
指定容許數據延遲的時間
核心思想:在容許最大延遲的基礎上再加一個容忍時間。
).assignTimestampsAndWatermarks(new EventTimeExtractor() ) .keyBy(0) .timeWindow(Time.seconds(3)) .allowedLateness(Time.seconds(2)) // 容許事件遲到 2 秒 .process(new SumProcessWindowFunction()) .print().setParallelism(1); 輸入數據: 000001,1461756870000 000001,1461756883000 000001,1461756870000 000001,1461756871000 000001,1461756872000 000001,1461756884000 000001,1461756870000 000001,1461756871000 000001,1461756872000 000001,1461756885000 000001,1461756870000 000001,1461756871000 000001,1461756872000
總結:
當咱們設置容許遲到 2 秒的事件,第一次 window 觸發的條件是 watermark >=
window_end_time;
第二次(或者屢次)觸發的條件是 watermark < window_end_time + allowedLateness。
(3)sideOutputLateData
收集遲到的數據
輸入:
000001,1461756870000 000001,1461756883000 遲到的數據 000001,1461756870000 000001,1461756871000 000001,1461756872000
一個window可能會接受到多個waterMark,咱們以最小的爲準。
//把並行度設置爲2 env.setParallelism(2); 輸入數據: 000001,1461756870000 000001,1461756883000 000001,1461756888000 輸出結果: 當前線程ID:55event = (000001,1461756883000)|19:34:43|19:34:43|19:34:33 當前線程ID:56event = (000001,1461756870000)|19:34:30|19:34:30|19:34:20 當前線程ID:56event = (000001,1461756888000)|19:34:48|19:34:48|19:34:38 處理時間:19:31:25 window start time : 19:34:30 2> [(000001,1461756870000)|19:34:30] window end time : 19:34:33
ID爲56的線程有兩個WaterMark:20,38那麼38這個會替代20,因此ID爲56的線程的WaterMark是38而後ID爲55的線程的WaterMark是33,而ID爲56是WaterMark是38,會在裏面求一個小的值做爲waterMark,就是33,這個時候會觸發Window爲[30-33)的窗口,那這個窗口裏面就有(000001,1461756870000)這條數據。