聊聊flink的consecutive windowed operations

本文主要研究一下flink的consecutive windowed operationshtml

實例

DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());
  • 本實例首先根據key進行partition,而後再按指定的window對這些key進行計數,以後對該dataStream進行windowAll操做,其時間WindowAssigner與前面的相同,這樣能夠達到在一樣的時間窗口內先partition彙總,再全局彙總的效果(能夠解決相似top-k elements的問題)

TimestampsAndPeriodicWatermarksOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.javajava

public class TimestampsAndPeriodicWatermarksOperator<T>
		extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

	private static final long serialVersionUID = 1L;

	private transient long watermarkInterval;

	private transient long currentWatermark;

	public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
		super(assigner);
		this.chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();

		currentWatermark = Long.MIN_VALUE;
		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

		if (watermarkInterval > 0) {
			long now = getProcessingTimeService().getCurrentProcessingTime();
			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
		}
	}

	@Override
	public void processElement(StreamRecord<T> element) throws Exception {
		final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
				element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);

		output.collect(element.replace(element.getValue(), newTimestamp));
	}

	@Override
	public void onProcessingTime(long timestamp) throws Exception {
		// register next timer
		Watermark newWatermark = userFunction.getCurrentWatermark();
		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
			currentWatermark = newWatermark.getTimestamp();
			// emit watermark
			output.emitWatermark(newWatermark);
		}

		long now = getProcessingTimeService().getCurrentProcessingTime();
		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
	}

	/**
	 * Override the base implementation to completely ignore watermarks propagated from
	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
	 * watermarks from here).
	 */
	@Override
	public void processWatermark(Watermark mark) throws Exception {
		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
		// to signal the end of input and to not block watermark progress downstream
		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
			currentWatermark = Long.MAX_VALUE;
			output.emitWatermark(mark);
		}
	}

	@Override
	public void close() throws Exception {
		super.close();

		// emit a final watermark
		Watermark newWatermark = userFunction.getCurrentWatermark();
		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
			currentWatermark = newWatermark.getTimestamp();
			// emit watermark
			output.emitWatermark(newWatermark);
		}
	}
}
  • 假設assignTimestampsAndWatermarks使用的是AssignerWithPeriodicWatermarks類型的參數,那麼建立的是TimestampsAndPeriodicWatermarksOperator;它在open的時候根據指定的watermarkInterval註冊了一個延時任務
  • 該延時任務會回調onProcessingTime方法,而onProcessingTime在這裏則會調用AssignerWithPeriodicWatermarks的getCurrentWatermark方法獲取watermark,而後從新註冊新的延時任務,延時時間爲getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;這裏的watermarkInterval即爲env.getConfig().setAutoWatermarkInterval設置的值
  • AssignerWithPeriodicWatermarks的getCurrentWatermark方法除了註冊延時任務實現不判定時的效果外,還會在新的watermark值大於currentWatermark的條件下發射watermark

SystemProcessingTimeService

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javareact

public class SystemProcessingTimeService extends ProcessingTimeService {

	private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);

	private static final int STATUS_ALIVE = 0;
	private static final int STATUS_QUIESCED = 1;
	private static final int STATUS_SHUTDOWN = 2;

	// ------------------------------------------------------------------------

	/** The containing task that owns this time service provider. */
	private final AsyncExceptionHandler task;

	/** The lock that timers acquire upon triggering. */
	private final Object checkpointLock;

	/** The executor service that schedules and calls the triggers of this task. */
	private final ScheduledThreadPoolExecutor timerService;

	private final AtomicInteger status;

	public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
		this(failureHandler, checkpointLock, null);
	}

	public SystemProcessingTimeService(
			AsyncExceptionHandler task,
			Object checkpointLock,
			ThreadFactory threadFactory) {

		this.task = checkNotNull(task);
		this.checkpointLock = checkNotNull(checkpointLock);

		this.status = new AtomicInteger(STATUS_ALIVE);

		if (threadFactory == null) {
			this.timerService = new ScheduledThreadPoolExecutor(1);
		} else {
			this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
		}

		// tasks should be removed if the future is canceled
		this.timerService.setRemoveOnCancelPolicy(true);

		// make sure shutdown removes all pending tasks
		this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
		this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
	}

	@Override
	public long getCurrentProcessingTime() {
		return System.currentTimeMillis();
	}

	@Override
	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

		// delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
		// T says we won't see elements in the future with a timestamp smaller or equal to T.
		// With processing time, we therefore need to delay firing the timer by one ms.
		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
			return timerService.schedule(
					new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException e) {
			final int status = this.status.get();
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(delay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

	//......
}
  • SystemProcessingTimeService的registerTimer方法根據指定的timestamp註冊了一個延時任務TriggerTask;timerService爲JDK自帶的ScheduledThreadPoolExecutor;TriggerTask的run方法會在service狀態爲STATUS_LIVE時,觸發ProcessingTimeCallback(這裏爲TimestampsAndPeriodicWatermarksOperator)的onProcessingTime方法

WindowOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.javaapache

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
	extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
	implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {

	//......
	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {

			//......

		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}

	/**
	 * Emits the contents of the given window using the {@link InternalWindowFunction}.
	 */
	@SuppressWarnings("unchecked")
	private void emitWindowContents(W window, ACC contents) throws Exception {
		timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
		processContext.window = window;
		userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
	}

	//......
}
  • WindowOperator的processElement方法會把element添加到windowState,這裏爲HeapAggregatingState,即在內存中累積,以後調用triggerContext.onElement方法(裏頭使用的是trigger.onElement方法,這裏的trigger爲EventTimeTrigger)獲取TriggerResult,若是須要fire,則會觸發emitWindowContents,若是須要purge則會清空windowState;emitWindowContents則是調用userFunction.process執行用戶定義的窗口操做

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.javawindows

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private EventTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteEventTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the watermark is not yet past the end of the merged window
		// this is in line with the logic in onElement(). If the watermark is past the end of
		// the window onElement() will fire and setting a timer here would fire the window twice.
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
			ctx.registerEventTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "EventTimeTrigger()";
	}

	public static EventTimeTrigger create() {
		return new EventTimeTrigger();
	}
}
  • EventTimeTrigger的onElement方法會判斷,若是window.maxTimestamp() <= ctx.getCurrentWatermark()則會返回TriggerResult.FIRE,告知WindowOperator能夠emitWindowContents

小結

  • flink支持consecutive windowed operations,好比先根據key進行partition,而後再按指定的window對這些key進行計數,以後對該dataStream進行windowAll操做,其時間WindowAssigner與前面的相同,這樣能夠達到在一樣的時間窗口內先partition彙總,再全局彙總的效果(能夠解決相似top-k elements的問題)
  • AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks它們有兩個功能,一個是從element提取timestamp做爲eventTime,一個就是發射watermark;因爲element實際上不必定是嚴格按eventTime時間到來的,可能存在亂序,於是watermark的做用就是限制遲到的數據進入窗口,不讓窗口無限等待遲到的可能屬於該窗口的element,即告知窗口eventTime小於等於該watermark的元素能夠認爲都到達了(窗口能夠根據本身設定的時間範圍,藉助trigger判斷是否能夠關閉窗口而後開始對該窗口數據執行相關操做);對於consecutive windowed operations來講,上游的watermark會forward給下游的operations
  • Trigger的做用就是告知WindowOperator何時能夠對關閉該窗口開始對該窗口數據執行相關操做(返回TriggerResult.FIRE的狀況下),對於EventTimeTrigger來講,其onElement方法的判斷邏輯跟watermark相關,若是window.maxTimestamp() <= ctx.getCurrentWatermark()則會返回TriggerResult.FIRE

doc

相關文章
相關標籤/搜索