本文主要研究一下flink的Tumbling Windowhtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.javajava
@PublicEvolving public abstract class WindowAssigner<T, W extends Window> implements Serializable { private static final long serialVersionUID = 1L; /** * Returns a {@code Collection} of windows that should be assigned to the element. * * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The {@link WindowAssignerContext} in which the assigner operates. */ public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context); /** * Returns the default trigger associated with this {@code WindowAssigner}. */ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env); /** * Returns a {@link TypeSerializer} for serializing windows that are assigned by * this {@code WindowAssigner}. */ public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig); /** * Returns {@code true} if elements are assigned to windows based on event time, * {@code false} otherwise. */ public abstract boolean isEventTime(); /** * A context provided to the {@link WindowAssigner} that allows it to query the * current processing time. * * <p>This is provided to the assigner by its containing * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, * which, in turn, gets it from the containing * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ public abstract static class WindowAssignerContext { /** * Returns the current processing time. */ public abstract long getCurrentProcessingTime(); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.javaapache
@PublicEvolving public abstract class Window { /** * Gets the largest timestamp that still belongs to this window. * * @return The largest timestamp that still belongs to this window. */ public abstract long maxTimestamp(); }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.javawindows
@PublicEvolving public class TimeWindow extends Window { private final long start; private final long end; public TimeWindow(long start, long end) { this.start = start; this.end = end; } /** * Gets the starting timestamp of the window. This is the first timestamp that belongs * to this window. * * @return The starting timestamp of this window. */ public long getStart() { return start; } /** * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it * is the first timestamp that does not belong to this window any more. * * @return The exclusive end timestamp of this window. */ public long getEnd() { return end; } /** * Gets the largest timestamp that still belongs to this window. * * <p>This timestamp is identical to {@code getEnd() - 1}. * * @return The largest timestamp that still belongs to this window. * * @see #getEnd() */ @Override public long maxTimestamp() { return end - 1; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } TimeWindow window = (TimeWindow) o; return end == window.end && start == window.start; } @Override public int hashCode() { return MathUtils.longToIntWithBitMixing(start + end); } @Override public String toString() { return "TimeWindow{" + "start=" + start + ", end=" + end + '}'; } /** * Returns {@code true} if this window intersects the given window. */ public boolean intersects(TimeWindow other) { return this.start <= other.end && this.end >= other.start; } /** * Returns the minimal window covers both this window and the given window. */ public TimeWindow cover(TimeWindow other) { return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end)); } // ------------------------------------------------------------------------ // Serializer // ------------------------------------------------------------------------ //...... // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ /** * Merge overlapping {@link TimeWindow}s. For use by merging * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}. */ public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) { // sort the windows by the start time and then merge overlapping windows List<TimeWindow> sortedWindows = new ArrayList<>(windows); Collections.sort(sortedWindows, new Comparator<TimeWindow>() { @Override public int compare(TimeWindow o1, TimeWindow o2) { return Long.compare(o1.getStart(), o2.getStart()); } }); List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>(); Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null; for (TimeWindow candidate: sortedWindows) { if (currentMerge == null) { currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } else if (currentMerge.f0.intersects(candidate)) { currentMerge.f0 = currentMerge.f0.cover(candidate); currentMerge.f1.add(candidate); } else { merged.add(currentMerge); currentMerge = new Tuple2<>(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet<>(); currentMerge.f1.add(candidate); } } if (currentMerge != null) { merged.add(currentMerge); } for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) { if (m.f1.size() > 1) { c.merge(m.f1, m.f0); } } } /** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.javaapi
@PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long offset; protected TumblingEventTimeWindows(long size, long offset) { if (offset < 0 || offset >= size) { throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size"); } this.size = size; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { // Long.MIN_VALUE is currently assigned when no timestamp is present long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "TumblingEventTimeWindows(" + size + ")"; } public static TumblingEventTimeWindows of(Time size) { return new TumblingEventTimeWindows(size.toMilliseconds(), 0); } public static TumblingEventTimeWindows of(Time size, Time offset) { return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.javaapp
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long offset; private TumblingProcessingTimeWindows(long size, long offset) { if (offset < 0 || offset >= size) { throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size"); } this.size = size; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); long start = TimeWindow.getWindowStartWithOffset(now, offset, size); return Collections.singletonList(new TimeWindow(start, start + size)); } public long getSize() { return size; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } @Override public String toString() { return "TumblingProcessingTimeWindows(" + size + ")"; } public static TumblingProcessingTimeWindows of(Time size) { return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0); } public static TumblingProcessingTimeWindows of(Time size, Time offset) { return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; } }