Flink中watermark主要解決保序問題. 而保序問題的根本緣由是多個任務同時從流中並行處理數據,順序沒法保證. html
上游: 生成watermark
通常在WINDOW 操做以前生成WATERMARK, WATERMARK 有兩種:
AssignWithPeriodicWatermarks:
每隔N秒自動向流裏注入一個WATERMARK 時間間隔由ExecutionConfig.setAutoWatermarkInterval 決定. 每次調用getCurrentWatermark 方法, 若是獲得的WATERMARK 不爲空而且比以前的大就注入流中 (emitWatermark)
參考 TimestampsAndPeriodicWatermarksOperator.processElement 前端
AssignWithPunctuatedWatermarks:
基於事件向流裏注入一個WATERMARK,每個元素都有機會判斷是否生成一個WATERMARK. 若是獲得的WATERMARK 不爲空而且比以前的大就注入流中 (emitWatermark)
參考 TimestampsAndPunctuatedWatermarksOperator.processElement算法
每次生成WATERMARK將覆蓋流中已有的WATERMARK apache
下游: 處理watermark
StatusWatermarkValve 負責將不一樣Channel 的Watermark 對齊,再傳給pipeline 下游,對齊的概念是當前Channel的Watermark時間大於全部Channel最小的Watermark時間併發
WindowOperator 的處理:
WindowOperator.processElementide
實際觀察結果:測試
Window 觸發的條件
在 WindowOperator 中有兩個點會檢查窗口是否觸發,二者的檢查條件有所不一樣優化
processElement 這是在新的流數據進入時觸發
檢查條件: watermark時間 >= 窗口最大時間 參見 EventTimeTrigger.onElement
若是窗口不能被觸發則調用InteralTimeService.registerEventTimeTimer 註冊一個定時器,以KEY+窗口最大時間爲條件觸發, 到必定時間後定時器會被自動銷燬. 時間爲窗口最大時間+WindowOperator.allowedLateness WindowOperator.allowedLateness 能夠經過 Stream.window(...).allowedLateness(...) 設置. 通常應該略大於WatermarkGenerator 的 maxOutOfOrderness this
WATERMARK和普通數據分開處理
若是一個元素來的過晚 element.getTimestamp + allowedLateness < currentWatermark
會有一個特殊的OutputTag 和正常的流數據區分開
參考 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.htmlspa
若是窗口來的過晚, window.maxTimestamp + allowedLateness < currentWatermark, 則窗口會被直接丟棄
Watermark 的問題:
默認的Watermark機制是數據驅動的,新的數據進入纔會觸發水位上升, 而因爲maxOutOfOrderness 的存在, watermark < 最大流數據時間 < 當前窗口結束時間
根據以前的分析,最新的時間窗口老是不會被觸發,除非更新的數據進入再次提升水位到當前窗口結束時間之後, 若是數據進入的頻率低或者沒有新的數據進入流,那最新的時間窗口被處理的延時會很是高甚至永遠不會被觸發,這在實時性要求高的流式系統是很致命的. 好比一個銀行系統,要作客戶帳號層面的保序,每一個帳號的交易可能一天只有幾筆甚至一筆,若是咱們在Window 處理的時候KEY BY 帳號就會引發上述問題. 咱們能夠考慮KEY BY的條件改成 HASH(帳號) 再取模,而後在窗口處理中再次根據帳號分組,這樣雖然處理複雜一些,可是保證了窗口中數據的頻次
另一種方案是優化WATERMARK生成的機制,若是一段時間後WATERMARK仍然沒有變化,那就將WATERMARK自動上漲一次到當前窗口的結束時間,這樣保證窗口處理的延時有個上限
public abstract class AbstractWatermarkGenerator<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = -2006930231735705083L; private static final Logger logger = LoggerFactory.getLogger(AbstractWatermarkGenerator.class); private final long maxOutOfOrderness; // 10 seconds private long windowSize; private long currentMaxTimestamp; private long lastTimestamp = 0; private long lastWatermarkChangeTime = 0; private long windowPurgeTime = 0; public AbstractWatermarkGenerator(long maxOutOfOrderness, long windowSize) { this.maxOutOfOrderness = maxOutOfOrderness; this.windowSize = windowSize; } public AbstractWatermarkGenerator() { this(10000, 10000); } protected abstract long extractCurTimestamp(T element) throws Exception; public long extractTimestamp(T element, long previousElementTimestamp) { try { long curTimestamp = extractCurTimestamp(element); lastWatermarkChangeTime = new Date().getTime(); currentMaxTimestamp = Math.max(curTimestamp, currentMaxTimestamp); windowPurgeTime = Math.max(windowPurgeTime, getWindowExpireTime(currentMaxTimestamp)); if (logger.isDebugEnabled()) { logger.debug("Extracting timestamp: {}", currentMaxTimestamp); } return curTimestamp; } catch (Exception e) { logger.error("Error extracting timestamp", e); } return 0; } protected long getWindowExpireTime(long currentMaxTimestamp) { long windowStart = TimeWindow.getWindowStartWithOffset(currentMaxTimestamp, 0, windowSize); long windowEnd = windowStart + windowSize; return windowEnd + maxOutOfOrderness; } public Watermark getCurrentWatermark() { long curTime = new Date().getTime(); if (currentMaxTimestamp > lastTimestamp) { if (logger.isDebugEnabled()) { logger.debug("Current max timestamp has been increased since last"); } lastTimestamp = currentMaxTimestamp; lastWatermarkChangeTime = curTime; } else { long diff = windowPurgeTime - currentMaxTimestamp; if (diff > 0 && curTime - lastWatermarkChangeTime > diff) { if (logger.isDebugEnabled()) { logger.debug("Increase current MaxTimestamp once"); } currentMaxTimestamp = windowPurgeTime; lastTimestamp = currentMaxTimestamp; lastWatermarkChangeTime = curTime; } } return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } }
實際測試中發現 WATERMARK是否觸發和算子的併發度和WATERMARK生成的位置有關
測試結果以下:
因此注意WINDOW算子以前最好避免讓下游算子的併發度超過上游算子,不然就把WATERMARK的生成儘可能放到DAG的前端,這樣WATERMARK能夠被傳遞到下游算子