Flink如何巧用WaterMark機制解決亂序問題

問:數據工程師最指望數據怎麼來?ide

答:按順序來。設計

 

MapReduce當初能用起來,就是由於Map階段對全部數據都進行排序了,後面的Reduce階段就能夠直接用排序好的數據了。blog

批處理的時候由於數據已經落地了,咱能夠慢慢排序。可是流式數據都是一條一條過來的,這個時候數據到達的時間和出發時的順序不一致會致使很是多的問題,這該咋整呢?排序

Sparkstreaming對亂序支持不好,由於它實際上是「微批」,不是真正的流。加州伯克利大學AMP實驗室設計Spark的時候,想的就是弄一個更快的計算引擎,壓根就沒打算作成來一條處理一條的流式數據處理。因此對於一些亂序數據根本就不太關心,因此致使Sparkstreaming不能或者不太能支持亂序數據的處理。事件

 

可是Flink不行啊,數據一條一條的過來,而後進行窗口處理,亂序會致使各類統計問題,這就得必須解決了。get


 

什麼是亂序it

一條數據在Flink裏,有三個時間:io

  • Event Time:事件產生的時間;class

  • Ingestion Time:事件進入Flink的時間;stream

  • Window Processing Time:事件被處理的時間。

 

 當數據一條一條規規矩矩的按流程發送,MQ傳輸,Flink接受而後處理,這個時候,就是有序的數據。

 

 當出現各類異常,有些數據延遲了,排在後面的數據跑前面去了,這就出現了亂序。

 

 請思考一下,咱們應該以哪一個時間戳斷定亂序呢?


 

Flink的WaterMark機制

亂序會致使各類統計上的問題。好比一個Time Window本應該計算一、二、3,結果3遲到了,那這個窗口統計就丟數據了。這可太坑了。

 

 爲了解決這個問題,Flink設置了一個三個機制來解決這個問題:

  • WaterMark--水位線,;

  • allowLateNess--數據遲到時間;

  • sideOutPut--超長遲到數據收集;

 

水位線的設置很簡單(系統時間爲準):

override def getCurrentWatermark(): Watermark = {       new Watermark(System.currentTimeMillis - 5000)

設置Watermark爲-5秒。可是怎麼理解這個-5秒的水位線呢?

 

常常戶外徒步的同窗應該知道一個徒步小隊一般會有一正兩副領隊,隊首隊尾各一個副隊,正隊長在隊伍中穿插協調。

隊尾的領隊叫後隊領隊,後隊領隊要保證全部隊員都在前面,也就是說後隊領隊是整個隊伍的隊尾,當收隊的時候,看見後隊領隊,那就說明整個隊伍都已經徹底到達了。

 

這個Watermark就至關於給整個數據流設置一個後隊領隊。可是窗口是不知道具體要來幾個數的,因此只能設置一個時間上的限制,以此來推測當前窗口最後一條數據是否已經到達。假設窗口大小爲10秒,Watermark爲-5秒,那麼他會作如下事情:

  • 每來一條數據,取當前窗口內全部數據的最大時間戳;

  • 用最大時間戳扣減Watermark後看看是否是符合窗口關閉條件;

  • 若是不符合,則繼續進數據;

  • 若是符合,則關閉窗口開始計算。

 

你看,多像戶外徒步?

  • 每來一我的,就問問出發時是幾號,而後確認全部已到隊員最大的號碼;

  • 用最大的號碼對比一下後隊領隊的號碼;

  • 若是比後隊領隊的號碼小,就不收隊;

  • 若是號碼大於等於後隊領隊號碼,就收隊。


 

遲到的數據

固然啊,即使是用了Watermark機制,依然還會存在遲到的數據。就像戶外徒步同樣,有人走錯路而後又遇上來。後隊領隊分明沒超過任何一個隊員,可是仍是有隊員落在後面了。

因此Flink還增設了三種應對方式:

  • allowLateNess--對於遲到一小會的數據,設置一個容許遲到時間;

  • sideOutPut--對於超過容許遲到時間的數據,所有收集起來,後續再處理;

  • 若是都不處理,Flink就默認自動丟棄。

 

也就是說,在watermark機制下,窗口雖然到了關閉時間,可是若是你設置了allowLateNess=10秒,那這個窗口還會再等10秒,看看是否還有他那個小隊的數據,10秒後窗口關閉,開始計算。

若是等了10秒還沒等到,11秒的時候,本來屬於該窗口的數據才姍姍來遲,那麼sideOutPut會把數據收集起來,放到側輸出流,等待後續處理。這個數據確定就不會在當前窗口計算進去了。

相關文章
相關標籤/搜索