本文主要研究一下flink的Allowed Latenesshtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.javajava
@Public public class WindowedStream<T, K, W extends Window> { /** The keyed data stream that is windowed by this stream. */ private final KeyedStream<T, K> input; /** The window assigner. */ private final WindowAssigner<? super T, W> windowAssigner; /** The trigger that is used for window evaluation/emission. */ private Trigger<? super T, ? super W> trigger; /** The evictor that is used for evicting elements before window evaluation. */ private Evictor<? super T, ? super W> evictor; /** The user-specified allowed lateness. */ private long allowedLateness = 0L; /** * Side output {@code OutputTag} for late data. If no tag is set late data will simply be * dropped. */ private OutputTag<T> lateDataOutputTag; @PublicEvolving public WindowedStream<T, K, W> allowedLateness(Time lateness) { final long millis = lateness.toMilliseconds(); checkArgument(millis >= 0, "The allowed lateness cannot be negative."); this.allowedLateness = millis; return this; } @PublicEvolving public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) { Preconditions.checkNotNull(outputTag, "Side output tag must not be null."); this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag); return this; } //...... public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction."); } //clean the closures function = input.getExecutionEnvironment().clean(function); reduceFunction = input.getExecutionEnvironment().clean(reduceFunction); final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function); KeySelector<T, K> keySel = input.getKeySelector(); OneInputStreamOperator<T, R> operator; if (evictor != null) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer); operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)), trigger, evictor, allowedLateness, lateDataOutputTag); } else { ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents", reduceFunction, input.getType().createSerializer(getExecutionEnvironment().getConfig())); operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, new InternalSingleValueWindowFunction<>(function), trigger, allowedLateness, lateDataOutputTag); } return input.transform(opName, resultType, operator); } //...... }
evictor不爲null建立的是EvictingWindowOperator,evictor爲null建立的是WindowOperator
)flink-core-1.7.0-sources.jar!/org/apache/flink/util/OutputTag.javaapache
@PublicEvolving public class OutputTag<T> implements Serializable { private static final long serialVersionUID = 2L; private final String id; private final TypeInformation<T> typeInfo; public OutputTag(String id) { Preconditions.checkNotNull(id, "OutputTag id cannot be null."); Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty."); this.id = id; try { this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0); } catch (InvalidTypesException e) { throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " + "The most common reason is forgetting to make the OutputTag an anonymous inner class. " + "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e); } } public OutputTag(String id, TypeInformation<T> typeInfo) { Preconditions.checkNotNull(id, "OutputTag id cannot be null."); Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty."); this.id = id; this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null."); } // ------------------------------------------------------------------------ public String getId() { return id; } public TypeInformation<T> getTypeInfo() { return typeInfo; } // ------------------------------------------------------------------------ @Override public boolean equals(Object obj) { return obj instanceof OutputTag && ((OutputTag) obj).id.equals(this.id); } @Override public int hashCode() { return id.hashCode(); } @Override public String toString() { return "OutputTag(" + getTypeInfo() + ", " + id + ")"; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.javawindows
@Public public class SingleOutputStreamOperator<T> extends DataStream<T> { protected boolean nonParallel = false; private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>(); private boolean wasSplitApplied = false; //...... public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) { if (wasSplitApplied) { throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " + "As a work-around, please add a no-op map function before the split() call."); } sideOutputTag = clean(requireNonNull(sideOutputTag)); // make a defensive copy sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo()); TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag); if (type != null && !type.equals(sideOutputTag.getTypeInfo())) { throw new UnsupportedOperationException("A side output with a matching id was " + "already requested with a different type. This is not allowed, side output " + "ids need to be unique."); } requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo()); SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag); return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.javaapi
@Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> { //...... public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() { @Override public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " + "event-time window cannot become earlier than the current watermark " + "by merging. Current watermark: " + internalTimerService.currentWatermark() + " window: " + mergeResult); } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " + "processing-time window cannot become earlier than the current processing time " + "by merging. Current processing time: " + internalTimerService.currentProcessingTime() + " window: " + mergeResult); } triggerContext.key = key; triggerContext.window = mergeResult; triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) { triggerContext.window = m; triggerContext.clear(); deleteCleanupTimer(m); } // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows); } }); // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = actualWindow; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } } protected boolean isElementLate(StreamRecord<IN> element){ return (windowAssigner.isEventTime()) && (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark()); } private long cleanupTime(W window) { if (windowAssigner.isEventTime()) { long cleanupTime = window.maxTimestamp() + allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { return window.maxTimestamp(); } } //...... }
在窗口結束時間+容許遲到時間內到達的元素仍然會被添加到窗口內
),默認該參數設置爲0;對於使用GlobalWindows這類window assigner,因爲其end時間戳爲Long.MAX_VALUE,所以element就無所謂late