聊聊flink的Allowed Lateness

本文主要研究一下flink的Allowed Latenesshtml

WindowedStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.javajava

@Public
public class WindowedStream<T, K, W extends Window> {

	/** The keyed data stream that is windowed by this stream. */
	private final KeyedStream<T, K> input;

	/** The window assigner. */
	private final WindowAssigner<? super T, W> windowAssigner;

	/** The trigger that is used for window evaluation/emission. */
	private Trigger<? super T, ? super W> trigger;

	/** The evictor that is used for evicting elements before window evaluation. */
	private Evictor<? super T, ? super W> evictor;

	/** The user-specified allowed lateness. */
	private long allowedLateness = 0L;

	/**
	 * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
	 * dropped.
 	 */
	private OutputTag<T> lateDataOutputTag;

	@PublicEvolving
	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
		final long millis = lateness.toMilliseconds();
		checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

		this.allowedLateness = millis;
		return this;
	}

	@PublicEvolving
	public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
		this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
		return this;
	}

	//......

	public <R> SingleOutputStreamOperator<R> reduce(
			ReduceFunction<T> reduceFunction,
			WindowFunction<T, R, K, W> function,
			TypeInformation<R> resultType) {

		if (reduceFunction instanceof RichFunction) {
			throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
		}

		//clean the closures
		function = input.getExecutionEnvironment().clean(function);
		reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);

		final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
		KeySelector<T, K> keySel = input.getKeySelector();

		OneInputStreamOperator<T, R> operator;

		if (evictor != null) {
			@SuppressWarnings({"unchecked", "rawtypes"})
			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
				(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			ListStateDescriptor<StreamRecord<T>> stateDesc =
				new ListStateDescriptor<>("window-contents", streamRecordSerializer);

			operator =
				new EvictingWindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
					trigger,
					evictor,
					allowedLateness,
					lateDataOutputTag);

		} else {
			ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
				reduceFunction,
				input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			operator =
				new WindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					new InternalSingleValueWindowFunction<>(function),
					trigger,
					allowedLateness,
					lateDataOutputTag);
		}

		return input.transform(opName, resultType, operator);
	}

	//......
}
  • WindowedStream有兩個參數跟Allowed Lateness相關,一個是allowedLateness,用於指定容許元素遲到的時間長度,一個是lateDataOutputTag,用於配置遲到元素的輸出
  • WindowedStream的reduce、aggregate、fold、process等操做裏頭會根據evictor是否爲null來建立不一樣的WindowOperator(evictor不爲null建立的是EvictingWindowOperator,evictor爲null建立的是WindowOperator)
  • EvictingWindowOperator繼承了WindowOperator,其構造器比WindowOperator多了Evictor參數,但它們構造器都須要Trigger、allowedLateness、lateDataOutputTag參數

OutputTag

flink-core-1.7.0-sources.jar!/org/apache/flink/util/OutputTag.javaapache

@PublicEvolving
public class OutputTag<T> implements Serializable {

	private static final long serialVersionUID = 2L;

	private final String id;

	private final TypeInformation<T> typeInfo;

	public OutputTag(String id) {
		Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
		Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
		this.id = id;

		try {
			this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
		}
		catch (InvalidTypesException e) {
			throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " +
					"The most common reason is forgetting to make the OutputTag an anonymous inner class. " +
					"It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e);
		}
	}

	public OutputTag(String id, TypeInformation<T> typeInfo) {
		Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
		Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
		this.id = id;
		this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
	}

	// ------------------------------------------------------------------------

	public String getId() {
		return id;
	}

	public TypeInformation<T> getTypeInfo() {
		return typeInfo;
	}

	// ------------------------------------------------------------------------

	@Override
	public boolean equals(Object obj) {
		return obj instanceof OutputTag
			&& ((OutputTag) obj).id.equals(this.id);
	}

	@Override
	public int hashCode() {
		return id.hashCode();
	}

	@Override
	public String toString() {
		return "OutputTag(" + getTypeInfo() + ", " + id + ")";
	}
}
  • OutputTag是一個帶有名稱及類型信息的side output標識;flink容許ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction這些function輸出side output,這些function的Context有一個output(OutputTag<X> outputTag, X value)方法用於輸出元素到side output

SingleOutputStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.javawindows

@Public
public class SingleOutputStreamOperator<T> extends DataStream<T> {

	protected boolean nonParallel = false;

	private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();

	private boolean wasSplitApplied = false;

	//......

	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
		if (wasSplitApplied) {
			throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
				"As a work-around, please add a no-op map function before the split() call.");
		}

		sideOutputTag = clean(requireNonNull(sideOutputTag));

		// make a defensive copy
		sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());

		TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
		if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
			throw new UnsupportedOperationException("A side output with a matching id was " +
					"already requested with a different type. This is not allowed, side output " +
					"ids need to be unique.");
		}

		requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());

		SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
		return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
	}
}
  • SingleOutputStreamOperator提供了getSideOutput方法,能夠根據OutputTag來獲取以前在function裏頭輸出的site output

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.javaapi

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

	//......

	public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
					@Override
					public void merge(W mergeResult,
							Collection<W> mergedWindows, W stateWindowResult,
							Collection<W> mergedStateWindows) throws Exception {

						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
							throw new UnsupportedOperationException("The end timestamp of an " +
									"event-time window cannot become earlier than the current watermark " +
									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
									" window: " + mergeResult);
						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
							throw new UnsupportedOperationException("The end timestamp of a " +
									"processing-time window cannot become earlier than the current processing time " +
									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
									" window: " + mergeResult);
						}

						triggerContext.key = key;
						triggerContext.window = mergeResult;

						triggerContext.onMerge(mergedWindows);

						for (W m: mergedWindows) {
							triggerContext.window = m;
							triggerContext.clear();
							deleteCleanupTimer(m);
						}

						// merge the merged state windows into the newly resulting state window
						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
					}
				});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				windowState.setCurrentNamespace(stateWindow);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(actualWindow, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}

	protected boolean isElementLate(StreamRecord<IN> element){
		return (windowAssigner.isEventTime()) &&
			(element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
	}

	private long cleanupTime(W window) {
		if (windowAssigner.isEventTime()) {
			long cleanupTime = window.maxTimestamp() + allowedLateness;
			return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
		} else {
			return window.maxTimestamp();
		}
	}

    //......
}
  • WindowOperator裏頭有個isElementLate方法,根據allowedLateness來判斷一個element是否late,其processElement方法最後在isSkippedElement爲true並且isElementLate也爲true的狀況下會執行以下邏輯:在lateDataOutputTag不爲null的狀況下會將late的element輸出到side output,若是lateDataOutputTag爲null,則執行numLateRecordsDropped.inc()來遞增numLateRecordsDropped統計數

小結

  • 當使用event-time window的時候,flink提供了allowedLateness方法用來配置元素容許的遲到時間,超過該值則會被丟棄(在窗口結束時間+容許遲到時間內到達的元素仍然會被添加到窗口內),默認該參數設置爲0;對於使用GlobalWindows這類window assigner,因爲其end時間戳爲Long.MAX_VALUE,所以element就無所謂late
  • OutputTag是一個帶有名稱及類型信息的side output標識;flink容許ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction這些function輸出side output,這些function的Context有一個output(OutputTag<X> outputTag, X value)方法用於輸出元素到side output
  • SingleOutputStreamOperator提供了getSideOutput方法,能夠根據OutputTag來獲取以前在function裏頭輸出的site output;WindowOperator的processElement方法在最後會判斷,若是isSkippedElement爲true並且isElementLate也爲true,則在lateDataOutputTag不爲null的狀況下會將late的element輸出到side output

doc

相關文章
相關標籤/搜索