聊聊flink的Tumbling Window

本文主要研究一下flink的Tumbling Windowhtml

WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.javajava

@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
	private static final long serialVersionUID = 1L;

	/**
	 * Returns a {@code Collection} of windows that should be assigned to the element.
	 *
	 * @param element The element to which windows should be assigned.
	 * @param timestamp The timestamp of the element.
	 * @param context The {@link WindowAssignerContext} in which the assigner operates.
	 */
	public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);

	/**
	 * Returns the default trigger associated with this {@code WindowAssigner}.
	 */
	public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

	/**
	 * Returns a {@link TypeSerializer} for serializing windows that are assigned by
	 * this {@code WindowAssigner}.
	 */
	public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);

	/**
	 * Returns {@code true} if elements are assigned to windows based on event time,
	 * {@code false} otherwise.
	 */
	public abstract boolean isEventTime();

	/**
	 * A context provided to the {@link WindowAssigner} that allows it to query the
	 * current processing time.
	 *
	 * <p>This is provided to the assigner by its containing
	 * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
	 * which, in turn, gets it from the containing
	 * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
	 */
	public abstract static class WindowAssignerContext {

		/**
		 * Returns the current processing time.
		 */
		public abstract long getCurrentProcessingTime();

	}
}
  • WindowAssigner定義了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime這幾個抽象方法,同時定義了抽象靜態類WindowAssignerContext;它有兩個泛型,其中T爲元素類型,而W爲窗口類型

Window

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.javaapache

@PublicEvolving
public abstract class Window {

	/**
	 * Gets the largest timestamp that still belongs to this window.
	 *
	 * @return The largest timestamp that still belongs to this window.
	 */
	public abstract long maxTimestamp();
}
  • Window對象表明把無限流數據劃分爲有限buckets的集合,它有一個maxTimestamp,表明該窗口數據在該時間點內到達;它有兩個子類,一個是GlobalWindow,一個是TimeWindow

TimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.javawindows

@PublicEvolving
public class TimeWindow extends Window {

	private final long start;
	private final long end;

	public TimeWindow(long start, long end) {
		this.start = start;
		this.end = end;
	}

	/**
	 * Gets the starting timestamp of the window. This is the first timestamp that belongs
	 * to this window.
	 *
	 * @return The starting timestamp of this window.
	 */
	public long getStart() {
		return start;
	}

	/**
	 * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
	 * is the first timestamp that does not belong to this window any more.
	 *
	 * @return The exclusive end timestamp of this window.
	 */
	public long getEnd() {
		return end;
	}

	/**
	 * Gets the largest timestamp that still belongs to this window.
	 *
	 * <p>This timestamp is identical to {@code getEnd() - 1}.
	 *
	 * @return The largest timestamp that still belongs to this window.
	 *
	 * @see #getEnd()
	 */
	@Override
	public long maxTimestamp() {
		return end - 1;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}
		if (o == null || getClass() != o.getClass()) {
			return false;
		}

		TimeWindow window = (TimeWindow) o;

		return end == window.end && start == window.start;
	}

	@Override
	public int hashCode() {
		return MathUtils.longToIntWithBitMixing(start + end);
	}

	@Override
	public String toString() {
		return "TimeWindow{" +
				"start=" + start +
				", end=" + end +
				'}';
	}

	/**
	 * Returns {@code true} if this window intersects the given window.
	 */
	public boolean intersects(TimeWindow other) {
		return this.start <= other.end && this.end >= other.start;
	}

	/**
	 * Returns the minimal window covers both this window and the given window.
	 */
	public TimeWindow cover(TimeWindow other) {
		return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
	}

	// ------------------------------------------------------------------------
	// Serializer
	// ------------------------------------------------------------------------

	//......

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	/**
	 * Merge overlapping {@link TimeWindow}s. For use by merging
	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
	 */
	public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {

		// sort the windows by the start time and then merge overlapping windows

		List<TimeWindow> sortedWindows = new ArrayList<>(windows);

		Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
			@Override
			public int compare(TimeWindow o1, TimeWindow o2) {
				return Long.compare(o1.getStart(), o2.getStart());
			}
		});

		List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
		Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;

		for (TimeWindow candidate: sortedWindows) {
			if (currentMerge == null) {
				currentMerge = new Tuple2<>();
				currentMerge.f0 = candidate;
				currentMerge.f1 = new HashSet<>();
				currentMerge.f1.add(candidate);
			} else if (currentMerge.f0.intersects(candidate)) {
				currentMerge.f0 = currentMerge.f0.cover(candidate);
				currentMerge.f1.add(candidate);
			} else {
				merged.add(currentMerge);
				currentMerge = new Tuple2<>();
				currentMerge.f0 = candidate;
				currentMerge.f1 = new HashSet<>();
				currentMerge.f1.add(candidate);
			}
		}

		if (currentMerge != null) {
			merged.add(currentMerge);
		}

		for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
			if (m.f1.size() > 1) {
				c.merge(m.f1, m.f0);
			}
		}
	}

	/**
	 * Method to get the window start for a timestamp.
	 *
	 * @param timestamp epoch millisecond to get the window start.
	 * @param offset The offset which window start would be shifted by.
	 * @param windowSize The size of the generated windows.
	 * @return window start
	 */
	public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
		return timestamp - (timestamp - offset + windowSize) % windowSize;
	}
}
  • TimeWindow有start及end屬性,其中start爲inclusive,而end爲exclusive,因此maxTimestamp返回的是end-1;這裏重寫了equals及hashcode方法
  • TimeWindow提供了intersects方法用於表示本窗口與指定窗口是否有交叉;而cover方法用於返回本窗口與指定窗口的重疊窗口
  • TimeWindow還提供了mergeWindows及getWindowStartWithOffset靜態方法;前者用於合併重疊的時間窗口,後者用於獲取指定timestamp、offset、windowSize的window start

TumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.javaapi

@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private final long size;

	private final long offset;

	protected TumblingEventTimeWindows(long size, long offset) {
		if (offset < 0 || offset >= size) {
			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
		}

		this.size = size;
		this.offset = offset;
	}

	@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		if (timestamp > Long.MIN_VALUE) {
			// Long.MIN_VALUE is currently assigned when no timestamp is present
			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
			return Collections.singletonList(new TimeWindow(start, start + size));
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}

	@Override
	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		return EventTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "TumblingEventTimeWindows(" + size + ")";
	}

	public static TumblingEventTimeWindows of(Time size) {
		return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
	}

	public static TumblingEventTimeWindows of(Time size, Time offset) {
		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
	}

	@Override
	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return true;
	}
}
  • TumblingEventTimeWindows繼承了Window,其中元素類型爲Object,而窗口類型爲TimeWindow;它有兩個參數,一個是size,一個是offset,其中offset必須大於等於0,size必須大於offset
  • assignWindows方法獲取的窗口爲start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回true
  • TumblingEventTimeWindows提供了of靜態工廠方法,能夠指定size及offset參數

TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.javaapp

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private final long size;

	private final long offset;

	private TumblingProcessingTimeWindows(long size, long offset) {
		if (offset < 0 || offset >= size) {
			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");
		}

		this.size = size;
		this.offset = offset;
	}

	@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		final long now = context.getCurrentProcessingTime();
		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
		return Collections.singletonList(new TimeWindow(start, start + size));
	}

	public long getSize() {
		return size;
	}

	@Override
	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		return ProcessingTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "TumblingProcessingTimeWindows(" + size + ")";
	}

	public static TumblingProcessingTimeWindows of(Time size) {
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
	}

	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
	}

	@Override
	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}
  • TumblingProcessingTimeWindows繼承了WindowAssigner,其中元素類型爲Object,而窗口類型爲TimeWindow;它有兩個參數,一個是size,一個是offset,其中offset必須大於等於0,size必須大於offset
  • assignWindows方法獲取的窗口爲start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值則爲context.getCurrentProcessingTime(),則是與TumblingEventTimeWindows的不一樣之處,TumblingProcessingTimeWindows不使用timestamp參數來計算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的爲false
  • TumblingProcessingTimeWindows也提供了of靜態工廠方法,能夠指定size及offset參數

小結

  • flink的Tumbling Window分爲TumblingEventTimeWindows及TumblingProcessingTimeWindows,它們都繼承了WindowAssigner,其中元素類型爲Object,而窗口類型爲TimeWindow;它有兩個參數,一個是size,一個是offset,其中offset必須大於等於0,size必須大於offset
  • WindowAssigner定義了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime這幾個抽象方法,同時定義了抽象靜態類WindowAssignerContext;它有兩個泛型,其中T爲元素類型,而W爲窗口類型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口類型爲TimeWindow,它有start及end屬性,其中start爲inclusive,而end爲exclusive,maxTimestamp返回的是end-1,它還提供了mergeWindows及getWindowStartWithOffset靜態方法;前者用於合併重疊的時間窗口,後者用於獲取指定timestamp、offset、windowSize的window start
  • TumblingEventTimeWindows及TumblingProcessingTimeWindows的不一樣在於assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是參數中的timestamp,然後者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,然後者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的爲true,然後者返回的爲false

doc

相關文章
相關標籤/搜索