以前一直用翻滾窗口,每條數據都只屬於一個窗口,全部不須要考慮數據須要在多個窗口存的事情。node
恰好有個需求,要用到滑動窗口,來翻翻 flink 在滑動窗口中,數據是怎麼分配到多個窗口的windows
一段簡單的測試代碼:ide
val input = env.addSource(kafkaSource) val stream = input .map(node => { Event(node.get("id").asText(), node.get("createTime").asText()) }) .windowAll(SlidingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(10))) .process(new ProcessAllWindowFunction[Event, Event, TimeWindow] { override def process(context: Context, elements: Iterable[Event], out: Collector[Event]): Unit = { val it = elements.iterator var xx: Event = null while (it.hasNext) { xx = it.next() } out.collect(xx) } }) stream.print()
定義了一個長度爲1分鐘,滑動距離 10秒的窗口,因此正常每條數據應該對應 6 個窗口測試
在 process 中打個斷點就能夠追這段處理的源碼了this
數據的流向和 TumblingEventTimeWindows 是同樣的,因此直接跳到對應數據分配的地方spa
WindowOperator.processElement,代碼比較長,這裏就精簡一部分code
@Override public void processElement(StreamRecord<IN> element) throws Exception {
// 對應的須要分配的窗口 final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { } else {
// 循環遍歷,將數據放到對應的窗口狀態的 namesspace 中 for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; // 將數據放到對應的窗口中 windowState.setCurrentNamespace(window); windowState.add(element.getValue()); registerCleanupTimer(window); } } }
for 循環就是將數據放到多個窗口的循環,看下 dubug 信息對象
看對應的6個窗口,從後往前的blog
窗口分配的代碼,就對應這個方法的第一句:ip
final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext);
assignWindows 的源碼是根據 windowAssigner 的不一樣而改變的,這裏是: SlidingProcessingTimeWindows,對應源碼:
@Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; }
有個list 存儲對應的窗口時間對象,list 的長度就是 窗口的長度 / 滑動的距離 (即一條數據會出如今幾個窗口中)
這裏用的是處理時間,全部Timestamp 直接從 處理時間中取,數據對應的 最後一個窗口的開始時間 lastStart 就用處理時間傳到TimeWindow.getWindowStartWindOffset 中作計算
算出最後一個窗口的開始時間後,減 滑動的距離,就是上一個窗口的開始時間,直到 窗口的開始時間超出窗口的範圍
對應的關鍵就是 lastStart 的計算,看源碼:
/** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }
沒指定 offset ,因此 offset 爲0, lastStart = timestamp - (timestamp - offset + windowSize) % windowSize
windowSize 是 滑動的距離,這裏畫了個圖來講明計算的公式:
算出最後一個窗口的時間後,下面的 for 循環計算出數據對應的全部窗口,並建立一個時間窗口(這個時間窗口,並非一個窗口,只是窗口的時間,表達一個窗口的開始時間和結束時間)
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); }
因此 17 對應的這條數據對應的窗口就有 (10-20), (15,25)
一條數據屬於多少個窗口分配好了之後,就是把數據放到對應的窗口中了,flink 的窗口對應 state 的 namespace , 因此放到多個窗口,就是放到多個 namespace 中,對應的代碼是:
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
選擇 namespace,把數據放到對應的 state 中,後面窗口 fire 的時候,會從對應的 namespace 中 get 數據
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文