Flink Sink 接收數據的順序(Window發送數據順序)

Flink Sink 接收數據的順序(Window發送數據順序)

概述

  • InternalTimerServiceImpl.processingTimeTimersQueue存儲着同一個Window中全部Key,取第一個key,調用WindowOperator.onProcessingTime進行處理,併發送給Sinkgit

  • InternalTimerServiceImpl.processingTimeTimersQueue key處理的順序是,先處理第一個,而後依次把最後一個元素放到第一個元素進行處理github

  • Key,處理的順序,如 1 2 3 5 4,就會變成windows

    1
     4
     5
     3
     2
  • 源碼: https://github.com/opensourceteams/fink-maven-scala-2併發

輸入數據

1 2 1 3 2 5 4

源碼分析

RecordWriter.emit

  • 當WordCount中的數據通過Operator(Source,FlatMap,Map) 處理後,經過RecordWriter.emit()函數發射數據app

  • 此時發這樣的數據格式發送less

    WordWithCount(1,1)
    WordWithCount(2,1)
    WordWithCount(1,1)
    WordWithCount(3,1)
    WordWithCount(2,1)
    WordWithCount(5,1)
    WordWithCount(4,1)
  • WindowOperator.processElement會接收並處理maven

private void emit(T record, int[] targetChannels) throws IOException, InterruptedException {
		serializer.serializeRecord(record);

		boolean pruneAfterCopying = false;
		for (int channel : targetChannels) {
			if (copyFromSerializerToTargetChannel(channel)) {
				pruneAfterCopying = true;
			}
		}

		// Make sure we don't hold onto the large intermediate serialization buffer for too long
		if (pruneAfterCopying) {
			serializer.prune();
		}
	}

WindowOperator.processElement(StreamRecord<IN> element)

  • WindowOperator.processElement,給每個WordWithCount(1,1) 這樣的元素分配window,也就是確認每個元素屬於哪個窗口,由於須要對同一個窗口的相同key進行聚合操做ide

    final Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);
  • 把當前元素增長到state中保存,add函數中會對相同key進行聚合操做(reduce),對同一個window中相同key進行求和就是在這個方法中進行的函數

    windowState.add(element.getValue());
  • triggerContext.onElement(element),對當前元素設置trigger,也就是當前元素的window在哪一個時間點觸發(結束的時間點), 把當前元素的key,增長到InternalTimerServiceImpl.processingTimeTimersQueue中,每一條數據會加一次,加完後會去重,至關於Set,對相同Key的處理, 後面發送給Sink的數據,就是遍歷這個processingTimeTimersQueue中的數據,固然,每次發送第一個元素,發送後,會把最後一個元素放到第一個元素源碼分析

    TriggerResult triggerResult = triggerContext.onElement(element);
public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
					@Override
					public void merge(W mergeResult,
							Collection<W> mergedWindows, W stateWindowResult,
							Collection<W> mergedStateWindows) throws Exception {

						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
							throw new UnsupportedOperationException("The end timestamp of an " +
									"event-time window cannot become earlier than the current watermark " +
									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
									" window: " + mergeResult);
						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
							throw new UnsupportedOperationException("The end timestamp of a " +
									"processing-time window cannot become earlier than the current processing time " +
									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
									" window: " + mergeResult);
						}

						triggerContext.key = key;
						triggerContext.window = mergeResult;

						triggerContext.onMerge(mergedWindows);

						for (W m: mergedWindows) {
							triggerContext.window = m;
							triggerContext.clear();
							deleteCleanupTimer(m);
						}

						// merge the merged state windows into the newly resulting state window
						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
					}
				});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				windowState.setCurrentNamespace(stateWindow);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(actualWindow, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}

InternalTimerServiceImpl.onProcessingTime

  • processingTimeTimersQueue(HeapPriorityQueueSet) 該對象中存儲了全部的key,這些key是去重後,按處理順序排序

  • processingTimeTimersQueue.peek() 取出第一條數據進行處理

  • processingTimeTimersQueue.poll();會移除第一條數據,而且,拿最後一條數據,放第1一個元素,致使,全部元素的處理順序是,先處理第一個元素,而後,把最後一個元素放第一個, 最後一個就置爲空,再循環處理全部數據,至關於處理完第一個元素,處後從最後一個元素開始處理,一直處理到完成,舉例

    1 2 1 3 2 5 4
    存爲 1 2 3 5 4 
    順序就變爲
     1
     4
     5
     3
     2
  • keyContext.setCurrentKey(timer.getKey());//設置當前的key,當前須要處理的

  • triggerTarget.onProcessingTime(timer);// 調用 WindowOperator.onProcessingTime(timer)處理

queue = {HeapPriorityQueueElement[129]@8184} 
 1 = {TimerHeapInternalTimer@12441} "Timer{timestamp=1551505439999, key=(1), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 2 = {TimerHeapInternalTimer@12442} "Timer{timestamp=1551505439999, key=(2), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 3 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 5 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
 4 = {TimerHeapInternalTimer@12443} "Timer{timestamp=1551505439999, key=(3), namespace=TimeWindow{start=1551505380000, end=1551505440000}}"
  • 調用 WindowOperator.onProcessingTime(timer)處理當前key;
public void onProcessingTime(long time) throws Exception {
		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
		// inside the callback.
		nextTimer = null;

		InternalTimer<K, N> timer;

		while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
			processingTimeTimersQueue.poll();
			keyContext.setCurrentKey(timer.getKey());
			triggerTarget.onProcessingTime(timer);
		}

		if (timer != null && nextTimer == null) {
			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
		}
	}

WindowOperator.onProcessingTime

  • triggerResult.isFire()// 當前元素對應的window已經能夠發射了,即過告終束時間
  • windowState.get() //取出當前key對應的(key,value)此時已是相同key聚合後的值
  • emitWindowContents(triggerContext.window, contents);//發送給Sink進行處理
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
		triggerContext.key = timer.getKey();
		triggerContext.window = timer.getNamespace();

		MergingWindowSet<W> mergingWindows;

		if (windowAssigner instanceof MergingWindowAssigner) {
			mergingWindows = getMergingWindowSet();
			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
			if (stateWindow == null) {
				// Timer firing for non-existent window, this can only happen if a
				// trigger did not clean up timers. We have already cleared the merging
				// window and therefore the Trigger state, however, so nothing to do.
				return;
			} else {
				windowState.setCurrentNamespace(stateWindow);
			}
		} else {
			windowState.setCurrentNamespace(triggerContext.window);
			mergingWindows = null;
		}

		TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());

		if (triggerResult.isFire()) {
			ACC contents = windowState.get();
			if (contents != null) {
				emitWindowContents(triggerContext.window, contents);
			}
		}

		if (triggerResult.isPurge()) {
			windowState.clear();
		}

		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
			clearAllState(triggerContext.window, windowState, mergingWindows);
		}

		if (mergingWindows != null) {
			// need to make sure to update the merging state in state
			mergingWindows.persist();
		}
	}
相關文章
相關標籤/搜索