Flink 爲實時計算提供了三種時間,即事件時間(event time)、攝入時間(ingestion time)和處理時間(processing time)。在進行 window 計算時,使用攝入時間或處理時間的消息都是以系統的牆上時間(wall clocks)爲標準,所以事件都是按序到達的。然而若是使用更爲有意義的事件時間則會須要面對亂序事件問題(out-of-order events)和遲到事件問題(late events)。針對這兩個問題,Flink 主要採用了以水位線(watermark)爲核心的機制來應對。html
當基於事件時間的數據流進行窗口計算時,最爲困難的一點是如何肯定對應當前窗口的事件已盡所有到達。好比須要統計最近5分鐘打開音樂播放器的用戶數,服務端怎麼確保聚合計算時已經收集好全部用戶最近5分鐘的打開播放器日誌?事實上不存在能百分百準確判斷的方法,所以業界經常使用的方法是基於已經收集的消息來估算是否還有消息未到達,這就是水位線的思想。算法
水位線其實是一個時間戳,意義是早於該時間的消息已經徹底抵達計算引擎,即假設不會再有時間小於水位線的事件到達。這個假設是觸發窗口計算的基礎,只有水位線越過窗口對應的結束時間,窗口才會關閉和進行計算。apache
理想狀況下水位線應該與處理時間一致,而且處理時間與事件時間只相差常數時間甚至爲零,這意味着消息產生後立刻被處理。然而水位線的計算老是存在必定的延遲(見圖1),具體的延遲根據水位線實現的不一樣而也有所差異。Flink 提供了常規的按期水位線以及定製化的標點水位線兩種生成水位線的方式供用戶選擇。ide
按期水位線(Periodic Watermark)按照固定時間間隔生成新的水位線,不論是否有新的消息抵達。水位線提高的時間間隔是由用戶設置的,在兩次水位線提高時隔內會有一部分消息流入,用戶能夠根據這部分數據來計算出新的水位線。舉個例子,最簡單的水位線算法就是取目前爲止最大的事件時間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現大量遲到事件。日誌
應用按期水位線須要實現AssignerWithPeriodicWatermarks
API,如下是 Flink 官網提供的按期水位線的實現例子。code
1htm 2blog 3事件 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxOutOfOrderness = 3500L; // 3.5 seconds var currentMaxTimestamp: Long; override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { val timestamp = element.getCreationTime() currentMaxTimestamp = max(timestamp, currentMaxTimestamp) timestamp; } override def getCurrentWatermark(): Watermark = { // return the watermark as current highest timestamp minus the out-of-orderness bound new Watermark(currentMaxTimestamp - maxOutOfOrderness); } } |
其中extractTimestamp
用於從消息中提取事件時間,而getCurrentWatermark
用於生成新的水位線,新的水位線只有大於當前水位線纔是有效的。每一個窗口都會有該類的一個實例,所以能夠利用實例的成員變量保存狀態,好比上例中的當前最大時間戳。
標點水位線(Punctuated Watermark)經過數據流中某些特殊標記事件來觸發新水位線的生成。這種方式下窗口的觸發與時間無關,而是決定於什麼時候收到標記事件。
應用標點水位線須要實現AssignerWithPunctuatedWatermarks
API,如下是 Flink 官網提供的標點水位線的實現例子。
1 2 3 4 5 6 7 8 9 10 |
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = { if (element.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null } } |
其中extractTimestamp
用於從消息中提取事件時間,checkAndGetNextWatermark
用於檢查事件是否標點事件,如果則生成新的水位線。不一樣於按期水位線定時調用getCurrentWatermark
,標點水位線是每接受一個事件就須要調用checkAndGetNextWatermark
,若返回值非 null 且新水位線大於當前水位線,則觸發窗口計算。
雖然說水位線代表着早於它的事件不該該再出現,可是上如上文所講,接收到水位線之前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和通常亂序事件不一樣的是它們的亂序程度超出了水位線的預計,致使窗口在它們到達以前已經關閉。
遲到事件出現時窗口已經關閉併產出了計算結果,所以處理的方法有3種:
Flink 默認的處理方式是第3種直接丟棄,其餘兩種方式分別使用Side Output
和Allowed Lateness
。
Side Output
機制能夠將遲到事件單獨放入一個數據流分支,這會做爲 window 計算結果的副產品,以便用戶獲取並對其進行特殊處理。
Allowed Lateness
機制容許用戶設置一個容許的最大遲到時長。Flink 會再窗口關閉後一直保存窗口的狀態直至超過容許遲到時長,這期間的遲到事件不會被丟棄,而是默認會觸發窗口從新計算。由於保存窗口狀態須要額外內存,而且若是窗口計算使用了 ProcessWindowFunction
API 還可能使得每一個遲到事件觸發一次窗口的全量計算,代價比較大,因此容許遲到時長不宜設得太長,遲到事件也不宜過多,不然應該考慮下降水位線提升的速度或者調整算法。