聊聊flink的Sliding Window

本文主要研究一下flink的Sliding Windowhtml

SlidingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.javajava

@PublicEvolving
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long slide;

    private final long offset;

    protected SlidingEventTimeWindows(long size, long slide, long offset) {
        if (offset < 0 || offset >= slide || size <= 0) {
            throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
        }

        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    public long getSize() {
        return size;
    }

    public long getSlide() {
        return slide;
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
    }

    public static SlidingEventTimeWindows of(Time size, Time slide) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }

    public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
        return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
            offset.toMilliseconds() % slide.toMilliseconds());
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }
}
  • SlidingEventTimeWindows繼承了Window,其中元素類型爲Object,而窗口類型爲TimeWindow;它有三個參數,一個是size,一個是slide,一個是offset,其中offset必須大於等於0,offset必須大於slide,size必須大於0
  • assignWindows方法以slide做爲size經過TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)計算lastStart,而後覺得start + size > timestamp爲循環條件,每次對start減去slide,挨個計算TimeWindow(start, start + size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的爲true
  • SlidingEventTimeWindows提供了of靜態工廠方法,能夠指定size、slide及offset參數,它對於傳入的offset參數轉爲毫秒而後與slide.toMilliseconds()取餘做爲最後的offset值

SlidingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.javaapache

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private final long size;

    private final long offset;

    private final long slide;

    private SlidingProcessingTimeWindows(long size, long slide, long offset) {
        if (offset < 0 || offset >= slide || size <= 0) {
            throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
        }

        this.size = size;
        this.slide = slide;
        this.offset = offset;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        timestamp = context.getCurrentProcessingTime();
        List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
        long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
        for (long start = lastStart;
            start > timestamp - size;
            start -= slide) {
            windows.add(new TimeWindow(start, start + size));
        }
        return windows;
    }

    public long getSize() {
        return size;
    }

    public long getSlide() {
        return slide;
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
    }

    public static SlidingProcessingTimeWindows of(Time size, Time slide) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
    }

    public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
        return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
            offset.toMilliseconds() % slide.toMilliseconds());
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }
}
  • SlidingProcessingTimeWindows繼承了Window,其中元素類型爲Object,而窗口類型爲TimeWindow;它有三個參數,一個是size,一個是slide,一個是offset,其中offset必須大於等於0,offset必須大於slide,size必須大於0
  • assignWindows方法以slide做爲size經過TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)計算lastStart(與SlidingEventTimeWindows不一樣的是SlidingProcessingTimeWindows的這個方法裏頭使用context.getCurrentProcessingTime()值重置了timestamp),而後覺得start + size > timestamp爲循環條件,每次對start減去slide,挨個計算TimeWindow(start, start + size);getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的爲false
  • SlidingEventTimeWindows提供了of靜態工廠方法,能夠指定size、slide及offset參數,它對於傳入的offset參數轉爲毫秒而後與slide.toMilliseconds()取餘做爲最後的offset值

小結

  • flink的Sliding Window分爲SlidingEventTimeWindows及SlidingProcessingTimeWindows,它們都繼承了WindowAssigner,其中元素類型爲Object,而窗口類型爲TimeWindow;它有三個參數,一個是size,一個是slide,一個是offset,其中offset必須大於等於0,offset必須大於slide,size必須大於0
  • WindowAssigner定義了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime這幾個抽象方法,同時定義了抽象靜態類WindowAssignerContext;它有兩個泛型,其中T爲元素類型,而W爲窗口類型;SlidingEventTimeWindows及SlidingProcessingTimeWindows的窗口類型爲TimeWindow,它有start及end屬性,其中start爲inclusive,而end爲exclusive,maxTimestamp返回的是end-1,它還提供了mergeWindows及getWindowStartWithOffset靜態方法;前者用於合併重疊的時間窗口,後者用於獲取指定timestamp、offset、windowSize的window start
  • SlidingEventTimeWindows及SlidingProcessingTimeWindows的不一樣在於assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是參數中的timestamp,然後者使用的是context.getCurrentProcessingTime();前者的getDefaultTrigger返回的是EventTimeTrigger,然後者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的爲true,然後者返回的爲false

doc

相關文章
相關標籤/搜索