Flink(五)Flink的窗口和水印機制

1、概念

一、什麼是window

在流式數據中,數據是連續的,一般是無限的,對流中的全部元素進行計數是不可能的,因此在流上的聚合須要由window來劃定範圍,例如過去五分鐘內用戶瀏覽量的計算或者最後100個元素的和。window就是一種能夠把無限數據切割爲有限數據塊的手段。
窗口能夠由時間或者數量來作區分
1.根據時間進行截取,好比每10分鐘統計一次,即時間驅動的[Time Window]
2.根據消息數量進行統計,好比每100個數據統計一次,即數據驅動[Count Window]
Flink(五)Flink的窗口和水印機制windows

二、時間窗口的類型

時間窗口又分爲滾動窗口,滑動窗口,和會話窗口
(1)滾動窗口-tumbling windows
時間對齊,窗口長度固定,沒有重疊
Flink(五)Flink的窗口和水印機制
如圖:以固定的長度進行分割,好比一分鐘的內的計數網絡

開窗方法:session

//滾動窗口 stream.keyBy(0) 
.window(TumblingEventTimeWindows.of(Time.seconds(2))) 
.sum(1) 
.print();

(2)滑動窗口-sliding windows
時間對齊,窗口長度固定,有重疊,展示的是數據的變化趨勢
Flink(五)Flink的窗口和水印機制
如圖:窗口大小爲4,步長爲2,每隔兩秒統計僅4s的數據ide

開窗方法:oop

//滑動窗口 stream.keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(4)))
.sum(1) 
.print();

(3)會話窗口-session windows
Flink(五)Flink的窗口和水印機制
當流中達到多長時間沒有新的數據到來,上一個會話窗口就是截至到新數據到來前接收到的最後一條數據,當新數據到來後,上一個窗口將會關閉,開啓一個新的窗口。線程

開窗方法:翻譯

stream
    .keyBy(0) 
        .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
        .sum(1) 
        .print();

(4)沒有窗口(全局窗口)-global windows
global window + trigger 一塊兒配合才能使用日誌

// 單詞每出現三次統計一次
stream
        .keyBy(0)
        .window(GlobalWindows.create()) //若是不加這個程序是啓動不起來的            
        .trigger(CountTrigger.of(3)) .sum(1) .print();

執行結果:
hello,3 
hello,6 
hello,9

總結:效果跟CountWindow(3)很像,但又有點不像,由於若是是CountWindow(3),單詞每次出現的都是3次,不會包含以前的次數,而咱們剛剛的這個每次都包含了以前的次數。code

2、 Flink流處理時間方式

針對stream數據中的時間,能夠分爲如下三種:
Event Time:事件產生的時間,它一般由事件中的時間戳描述。
Ingestion time:事件進入Flink的時間
Processing Time:事件被處理時當前系統的時間
Flink(五)Flink的窗口和水印機制blog

案例演示:
原始日誌以下

2019-11-11 10:00:01,134 INFO executor.Executor: Finished task in state 0.0

這條數據進入Flink的時間是2019-11-11 20:00:00,102
到達window處理的時間爲2019-11-11 20:00:01,100
2019-11-11 10:00:01,134 是Event time
2019-11-11 20:00:00,102 是Ingestion time
2019-11-11 20:00:01,100 是Processing time

思考:

若是咱們想要統計每分鐘內接口調用失敗的錯誤日誌個數,使用哪一個時間纔有意義?

3、事件的無序處理

一、Process Time Window(有序)

需求:每隔5秒計算最近10秒的單詞出現的次數
自定義source,模擬:第 13 秒的時候連續發送 2 個事件,第 16 秒的時候再發送 1 個事件

Flink(五)Flink的窗口和水印機制

輸出結果:

開始發送事件的時間:16:16:40 
(hadoop,2) 
(hadoop,3) 
(hadoop,1)

窗口驗證過程:
Flink(五)Flink的窗口和水印機制

二、Process Time Window(無序)

需求:每隔5秒計算最近10秒的單詞出現的次數
自定義source,模擬:第 13 秒的時候連續發送 2 個事件,第 16 秒的時候再發送 1 個事件
可是這裏在13秒的時候正常發送了一個事件,有一個事件因爲網絡等其餘緣由,沒有成功發送,在19秒的時候才發送出去。

輸出結果:

開始發送事件的時間:16:18:50
(hadoop,1)
(1573287543001,1)
(1573287543001,1) 
(hadoop,3) 
(1573287546016,1) 
(1573287543016,1) 
(1573287546016,1) 
(hadoop,2) 
(1573287543016,1)

Flink(五)Flink的窗口和水印機制

三、 使用Event Time處理無序

使用Event Time處理

在源數據流中map獲取時間時間
.assignTimestampsAndWatermarks(new EventTimeExtractor() )
EventTimeExtractor()實現AssignerWithPeriodicWatermarks接口,獲取事件時間
        //拿到第一個事件的 Event Time
        @Override
        public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
            return element.f1;
        }

執行結果:
start send event time:16:40:50
(hadoop,1)
(hadoop,3)
(hadoop,1)

如今咱們第三個window的結果已經計算準確了,可是咱們仍是沒有完全的解決問題(黃色事件應該在第一個窗口中計數,可是沒有)。接下來就須要咱們使用WaterMark機制來解決了。

Flink(五)Flink的窗口和水印機制

四、使用WaterMark機制解決無序

Flink(五)Flink的窗口和水印機制

核心實現是在窗口觸發的時候延時一段時間
        //window的觸發時間
        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            //window延遲5秒觸發
            return new Watermark(System.currentTimeMillis() - 5000);
        }

執行結果:
start send event time:16:54:30
(hadoop,2)
(hadoop,3)
(hadoop,1)

4、WaterMark機制

一、WaterMark的定義

使用eventTime的時候如何處理亂序數據?
在使用eventTime做爲處理時間的時候須要考慮亂序時間。
咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡延遲等緣由,致使亂序的產生,特別是使用kafka的話,多個分區的數據沒法保證有序(單個分區是保證有序的)。因此在進行window計算的時候,咱們又不能無限期的等下去,必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。
這個特別的機制,就是watermark,watermark是用於處理亂序事件的。watermark能夠
翻譯爲水位線

(1)有序的流的watermarks

Flink(五)Flink的窗口和水印機制

(2)無序的流的watermarks

Flink(五)Flink的窗口和水印機制

(3)多並行度流的watermarks

Flink(五)Flink的窗口和水印機制

二、WaterMark+Window處理亂序事件

需求:獲得並打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的全部的事件
要求:按事件時間開窗並處理亂序問題
思路:
設置flink的時間處理機制爲enventTime,
事件流的事件進入後更新最新的事件時間,
最新的事件時間減掉容許最大的亂序時間爲水印時間,
當水印時間大於等於窗口時間時,計算當前窗口數據
核心思想:事件流時間推進窗口的移動和計算

Flink(五)Flink的窗口和水印機制

演示數據:

-- window 計算觸發的條件 
一條一條輸入:
000001,1461756862000 
000001,1461756866000 
000001,1461756872000 
000001,1461756873000 
000001,1461756874000 
000001,1461756876000 
000001,1461756877000

輸出結果:
event = (000001,1461756862000)|Event Time:2016-04-27 19:34:22|Max Event Time:2016-04-27 19:34:22|Current Watermark:2016-04-27 19:34:12
water mark...
event = (000001,1461756866000)|Event Time:2016-04-27 19:34:26|Max Event Time:2016-04-27 19:34:26|Current Watermark:2016-04-27 19:34:16
water mark...
event = (000001,1461756872000)|Event Time:2016-04-27 19:34:32|Max Event Time:2016-04-27 19:34:32|Current Watermark:2016-04-27 19:34:22
water mark...
event = (000001,1461756873000)|Event Time:2016-04-27 19:34:33|Max Event Time:2016-04-27 19:34:33|Current Watermark:2016-04-27 19:34:23
water mark...
event = (000001,1461756874000)|Event Time:2016-04-27 19:34:34|Max Event Time:2016-04-27 19:34:34|Current Watermark:2016-04-27 19:34:24
water mark...
event = (000001,1461756876000)|Event Time:2016-04-27 19:34:36|Max Event Time:2016-04-27 19:34:36|Current Watermark:2016-04-27 19:34:26
water mark...

process start time:2021-01-04 18:26:51
window  start time:2016-04-27 19:34:21
[(000001,1461756862000)|2016-04-27 19:34:22]
window  end  time:2016-04-27 19:34:24
water mark...
event = (000001,1461756877000)|Event Time:2016-04-27 19:34:37|Max Event Time:2016-04-27 19:34:37|Current Watermark:2016-04-27 19:34:27
water mark...
process start time:2021-01-04 18:26:52
window  start time:2016-04-27 19:34:24
[(000001,1461756866000)|2016-04-27 19:34:26]
window  end  time:2016-04-27 19:34:27

當2016-04-27 19:34:37的事件時間更新爲最大的currentMaxEventTime,此時得到的時間水印是2016-04-27 19:34:27,觸發窗口[2016-04-27 19:34:24—2016-04-27 19:34:27)的計算

再輸入:
000001,1461756885000
000001,1461756892000

輸出:
event = (000001,1461756885000)|Event Time:2016-04-27 19:34:45|Max Event Time:2016-04-27 19:34:45|Current Watermark:2016-04-27 19:34:35
water mark...
process start time:2021-01-04 18:27:05
window  start time:2016-04-27 19:34:30
[(000001,1461756872000)|2016-04-27 19:34:32]
window  end  time:2016-04-27 19:34:33
water mark...
event = (000001,1461756892000)|Event Time:2016-04-27 19:34:52|Max Event Time:2016-04-27 19:34:52|Current Watermark:2016-04-27 19:34:42
water mark...
process start time:2021-01-04 18:30:38
window  start time:2016-04-27 19:34:33
[(000001,1461756873000)|2016-04-27 19:34:33, (000001,1461756874000)|2016-04-27 19:34:34]
window  end  time:2016-04-27 19:34:36
process start time:2021-01-04 18:30:38
window  start time:2016-04-27 19:34:36
[(000001,1461756876000)|2016-04-27 19:34:36, (000001,1461756877000)|2016-04-27 19:34:37]
window  end  time:2016-04-27 19:34:39

新的事件進入後更新了最新的時間時間,觸發新的窗口計算

若是再輸入:
000001,1461756862000
輸出:
event = (000001,1461756862000)|Event Time:2016-04-27 19:34:22|Max Event Time:2016-04-27 19:34:52|Current Watermark:2016-04-27 19:34:42
能夠看到此時窗口時間已超過事件時間,會丟棄這個事件,不作處理

總結:window觸發的時間

  1. watermark時間 >= window_end_time
  2. 在 [window_start_time, window_end_time) 區間中有數據存在,注意是左閉右開的區間,並且是
    以 event time 來計算的

三、 遲到太多的事件處理

Flink(五)Flink的窗口和水印機制

(1)丟棄
這個是默認的處理方式
(2)allowedLateness
指定容許數據延遲的時間
核心思想:在容許最大延遲的基礎上再加一個容忍時間。

).assignTimestampsAndWatermarks(new EventTimeExtractor() ) 
.keyBy(0) 
.timeWindow(Time.seconds(3)) 
.allowedLateness(Time.seconds(2)) // 容許事件遲到 2 秒 
.process(new SumProcessWindowFunction()) 
.print().setParallelism(1);

輸入數據:
000001,1461756870000 
000001,1461756883000 

000001,1461756870000 
000001,1461756871000 
000001,1461756872000
000001,1461756884000 

000001,1461756870000 
000001,1461756871000 
000001,1461756872000 

000001,1461756885000 

000001,1461756870000 
000001,1461756871000 
000001,1461756872000

Flink(五)Flink的窗口和水印機制
Flink(五)Flink的窗口和水印機制
Flink(五)Flink的窗口和水印機制
Flink(五)Flink的窗口和水印機制
總結:
當咱們設置容許遲到 2 秒的事件,第一次 window 觸發的條件是 watermark >=
window_end_time;
第二次(或者屢次)觸發的條件是 watermark < window_end_time + allowedLateness。

(3)sideOutputLateData
收集遲到的數據

輸入:

000001,1461756870000 
000001,1461756883000 
遲到的數據 
000001,1461756870000 
000001,1461756871000 
000001,1461756872000

四、多並行度下的WaterMark

Flink(五)Flink的窗口和水印機制

一個window可能會接受到多個waterMark,咱們以最小的爲準。

//把並行度設置爲2 
env.setParallelism(2);

輸入數據:
000001,1461756870000 
000001,1461756883000 
000001,1461756888000

輸出結果:
當前線程ID:55event = (000001,1461756883000)|19:34:43|19:34:43|19:34:33 

當前線程ID:56event = (000001,1461756870000)|19:34:30|19:34:30|19:34:20 
當前線程ID:56event = (000001,1461756888000)|19:34:48|19:34:48|19:34:38 
處理時間:19:31:25 
window start time : 19:34:30 
2> [(000001,1461756870000)|19:34:30] 
window end time : 19:34:33

ID爲56的線程有兩個WaterMark:20,38那麼38這個會替代20,因此ID爲56的線程的WaterMark是38而後ID爲55的線程的WaterMark是33,而ID爲56是WaterMark是38,會在裏面求一個小的值做爲waterMark,就是33,這個時候會觸發Window爲[30-33)的窗口,那這個窗口裏面就有(000001,1461756870000)這條數據。

相關文章
相關標籤/搜索