本文主要研究一下flink的Sliding Windowhtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.javajava
@PublicEvolving public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long slide; private final long offset; protected SlidingEventTimeWindows(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } else { throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); } @Override public String toString() { return "SlidingEventTimeWindows(" + size + ", " + slide + ")"; } public static SlidingEventTimeWindows of(Time size, Time slide) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) { return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return true; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.javaapache
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; private final long size; private final long offset; private final long slide; private SlidingProcessingTimeWindows(long size, long slide, long offset) { if (offset < 0 || offset >= slide || size <= 0) { throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } @Override public String toString() { return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")"; } public static SlidingProcessingTimeWindows of(Time size, Time slide) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0); } public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) { return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds() % slide.toMilliseconds()); } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; } }
與SlidingEventTimeWindows不一樣的是SlidingProcessingTimeWindows的這個方法裏頭使用context.getCurrentProcessingTime()值重置了timestamp
),而後覺得start + size > timestamp爲循環條件,每次對start減去slide,挨個計算TimeWindow(start, start + size);getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的爲false