生成Timestamp和Watermark 的三個重載方法介紹可參見上一篇博客: Flink assignAscendingTimestamps 生成水印的三個重載方法html
以前想研究下Flink是怎麼處理亂序的數據,看了相關的源碼,加上測試,發現獲得了與預期徹底不相同的結果。java
預期是:亂序到達的數據,flink能夠基於數據的事件時間,自動整理數據,依次計算輸出ide
結果是:在assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]指派timestamp和watermark的狀況下,亂序到達的數據:遲到的數據直接從側邊輸出了,超前的數據直接結束當前的窗口,開啓超前數據對應的窗口,後面到達的正常數據,直接做爲遲到數據處理了函數
在獲得上面的結果的過程當中,仔細的研究了一下生產Timestamp和Watermark相關的源碼。post
Flink DataStream API 目前只能經過 assignTimestampsAndWatermarks方法建立時間戳和水印有兩種生成模式:
性能
一、基於事件時間建立每一個事件的Timestamp 和 基於事件時間週期性的建立Watermark(默認週期爲200ms)測試
二、基於事件時間建立每一個事件的Timestamp 和 基於事件時間每一個事件都建立一個Watermark(若是新的Watermark大於當前的Watermark,纔會發出)this
事件時間下,事件的Timestamp的建立都是直接依賴於事件攜帶的事件時間,而Watermark則是基於事件時間生成Watermark,因此有周期性建立Watermark和標記的Watermark(With Punctuated Watermarks)的區分(官網中基於Kafka 的分區時間做爲Watermark 也是週期性的生成Watermark,只不過傳入的事件時間改成事件在kafka中的timestamp了) url
一、週期性的建立Watermarkspa
週期性的建立Watermark的有兩種方法(kafka的分區時間的忽略):
assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T]
assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
1.1 assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] 對應源碼
調用方法以下:
.assignAscendingTimestamps(element => { // 方便打斷點debug println("xxxxxx : " + element.createTime) sdf.parse(element.createTime).getTime })
週期性的建立Watermark 是在 TimestampsAndPeriodicWatermarksOperator 中生成、發出,對應的時間來源是調用不一樣的生成timestamp 和 Watermark 的實現類
TimestampsAndPeriodicWatermarksOperator 相應代碼以下:
/* 處理事件元素: 獲取對應的事件時間的時間戳,替換事件默認的時間戳(若是數據源是kafka,時間戳就是數據在kafka中的時間戳) */ @Override public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); } /* 處理時間(Watermark) : 獲取當前時間對應的上一次的事件時間,生成新的watermark,新的watermark的時間戳大於當前的watermark,就發出新的watermark */ @Override public void onProcessingTime(long timestamp) throws Exception { // 從這裏能夠看到,每200ms 打印一次 System.out.println("timestamp : " + timestamp + ", system.current : " + System.currentTimeMillis()); // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime();
// 註冊timer ,週期性的調用,下面會展開 getProcessingTimeService().registerTimer(now + watermarkInterval, this); }
在這種生成timestamp 和 Watermark 的狀況下,userFunction 對應的類是:AscendingTimestampExtractor
對應源碼以下:
@Override public final long extractTimestamp(T element, long elementPrevTimestamp) { // 調用 assignAscendingTimestamps 的參數函數 final long newTimestamp = extractAscendingTimestamp(element); if (newTimestamp >= this.currentTimestamp) {
// 這是爲了下面生成Watermark的方法,總能獲得 大於等於 當前Watermark的 時間戳 this.currentTimestamp = newTimestamp; return newTimestamp; } else { violationHandler.handleViolation(newTimestamp, this.currentTimestamp); return newTimestamp; } } @Override public final Watermark getCurrentWatermark() { return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); }
timestamp的生成: TimestampsAndPeriodicWatermarksOperator#processElement 方法,調用AscendingTimestampExtractor#extractTimestamp 再調用 用戶代碼中具體生成timestamp 的方法,最終生成事件對應的timestamp,替換原有的timestamp
Watermark的生成:TimestampsAndPeriodicWatermarksOperator#onProcessingTime 方法,調用 AscendingTimestampExtractor#getCurrentWatermark, 返回生成timestamp 時的 currentTimestamp -1 ,生成 Watermark,若是生成的Watermark的timestamp 大於當前的 Watermark的timestamp 就發出新的Watermark
1.2 assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T]
調用方法以下:
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LateDataEvent](Time.milliseconds(50)) { override def extractTimestamp(element: LateDataEvent): Long = { println("current timestamp : " + sdf.parse(element.createTime).getTime) sdf.parse(element.createTime).getTime } })
在這種生成timestamp 和 Watermark 的狀況下,userFunction 對應的類是:BoundedOutOfOrdernessTimestampExtractor
對應源碼
@Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) {
// 這是爲了上面上次Watermark的方法總能獲取到 大於等於 當前Watermark的時間戳 currentMaxTimestamp = timestamp; } return timestamp; }
基本上與上面相同,只是這種狀況下,生成Watermark會 減去相應的 maxOutOfOrderness (容許延遲時間,就是代碼中BoundedOutOfOrdernessTimestampExtractor對應的參數)
之因此說是週期性的,是由於生成Watermark的方法是週期性調用的:
// 註冊timer 按期執行
getProcessingTimeService().registerTimer(now + watermarkInterval, this); // 對應 watermarkInterval 來自與系統配置 watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); // 對應配置, 默認 200ms env.getConfig.setAutoWatermarkInterval(400)
看代碼可知,生成timestamp和Watermark是兩條線,timestamp 是每一個事件消息都會生成,而Watermark 是週期的
二、標記的Watermark(With Punctuated Watermarks)
這種Watermark的生成只有一種,對應代碼以下:
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() { // check extractTimestamp emitted watermark is non-null and large than previously 生成當前事件的Watermark override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } // generate next watermark 生成當前事件的timestamp override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = { val eventTime = sdf.parse(element.createTime).getTime eventTime } })
對應上面生成添加時間戳到事件中和發出Watermark 在 TimestampsAndPunctuatedWatermarksOperator中具體以下:
@Override public void processElement(StreamRecord<T> element) throws Exception { final T value = element.getValue(); // extractTimestamp 方法就是assignTimestampsAndWatermarks 中的 extractTimestamp 生成事件的時間戳 final long newTimestamp = userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); // checkAndGetNextWatermark 方法就是assignTimestampsAndWatermarks 中的 checkAndGetNextWatermark,檢查Watermark final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
// 新的Watermark大於當前的Watermark纔會發出 if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
這裏能夠看出,每條數據都會生成 tiemstamp 和 Watermark(不必定會發出,若是數據都是正常的,Watermark的消息會和事件的消息同樣多,因此會影響性能)
搞定。
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文