InternalTimerServiceImpl.processingTimeTimersQueue存儲着同一個Window中全部Key,取第一個key,調用WindowOperator.onProcessingTime進行處理,併發送給Sinkgit
InternalTimerServiceImpl.processingTimeTimersQueue key處理的順序是,先處理第一個,而後依次把最後一個元素放到第一個元素進行處理github
Key,處理的順序,如 1 2 3 5 4,就會變成windows
1 4 5 3 2
1 2 1 3 2 5 4
當WordCount中的數據通過Operator(Source,FlatMap,Map) 處理後,經過RecordWriter.emit()函數發射數據app
此時發這樣的數據格式發送less
WordWithCount(1,1) WordWithCount(2,1) WordWithCount(1,1) WordWithCount(3,1) WordWithCount(2,1) WordWithCount(5,1) WordWithCount(4,1)
WindowOperator.processElement會接收並處理maven
private void emit(T record, int[] targetChannels) throws IOException, InterruptedException { serializer.serializeRecord(record); boolean pruneAfterCopying = false; for (int channel : targetChannels) { if (copyFromSerializerToTargetChannel(channel)) { pruneAfterCopying = true; } } // Make sure we don't hold onto the large intermediate serialization buffer for too long if (pruneAfterCopying) { serializer.prune(); } }
WindowOperator.processElement,給每個WordWithCount(1,1) 這樣的元素分配window,也就是確認每個元素屬於哪個窗口,由於須要對同一個窗口的相同key進行聚合操做ide
final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext);
把當前元素增長到state中保存,add函數中會對相同key進行聚合操做(reduce),對同一個window中相同key進行求和就是在這個方法中進行的函數
windowState.add(element.getValue());
triggerContext.onElement(element),對當前元素設置trigger,也就是當前元素的window在哪一個時間點觸發(結束的時間點), 把當前元素的key,增長到InternalTimerServiceImpl.processingTimeTimersQueue中,每一條數據會加一次,加完後會去重,至關於Set,對相同Key的處理, 後面發送給Sink的數據,就是遍歷這個processingTimeTimersQueue中的數據,固然,每次發送第一個元素,發送後,會把最後一個元素放到第一個元素源碼分析
TriggerResult triggerResult = triggerContext.onElement(element);
public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + " window: " + mergeResult); } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } }
processingTimeTimersQueue(HeapPriorityQueueSet) 該對象中存儲了全部的key,這些key是去重後,按處理順序排序
processingTimeTimersQueue.peek() 取出第一條數據進行處理
processingTimeTimersQueue.poll();會移除第一條數據,而且,拿最後一條數據,放第1一個元素,致使,全部元素的處理順序是,先處理第一個元素,而後,把最後一個元素放第一個, 最後一個就置爲空,再循環處理全部數據,至關於處理完第一個元素,處後從最後一個元素開始處理,一直處理到完成,舉例
1 2 1 3 2 5 4 存爲 1 2 3 5 4 順序就變爲 1 4 5 3 2
keyContext.setCurrentKey(timer.getKey());//設置當前的key,當前須要處理的
triggerTarget.onProcessingTime(timer);// 調用 WindowOperator.onProcessingTime(timer)處理
queue = {HeapPriorityQueueElement[129]@8184} 1 = {TimerHeapInternalTimer@12441} "Timer{timestamp=1551505439999, key=(1), namespace=TimeWindow{start=1551505380000, end=1551505440000}}" 2 = {TimerHeapInternalTimer@12442} "Timer{timestamp=1551505439999, key=(2), namespace=TimeWindow{start=1551505380000, end=1551505440000}}" 3 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}" 5 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}" 4 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; InternalTimer<K, N> timer; while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { processingTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onProcessingTime(timer); } if (timer != null && nextTimer == null) { nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this); } }
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { triggerContext.key = timer.getKey(); triggerContext.window = timer.getNamespace(); MergingWindowSet<W> mergingWindows; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(triggerContext.window); if (stateWindow == null) { // Timer firing for non-existent window, this can only happen if a // trigger did not clean up timers. We have already cleared the merging // window and therefore the Trigger state, however, so nothing to do. return; } else { windowState.setCurrentNamespace(stateWindow); } } else { windowState.setCurrentNamespace(triggerContext.window); mergingWindows = null; } TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents != null) { emitWindowContents(triggerContext.window, contents); } } if (triggerResult.isPurge()) { windowState.clear(); } if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { clearAllState(triggerContext.window, windowState, mergingWindows); } if (mergingWindows != null) { // need to make sure to update the merging state in state mergingWindows.persist(); } }