



 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;

    /** The current maximum timestamp seen so far. */
    private long currentMaxTimestamp;

    /** The timestamp of the last emitted watermark. */
    private long lastEmittedWatermark = Long.MIN_VALUE;

     * The (fixed) interval between the maximum seen timestamp seen in the records
     * and that of the watermark to be emitted.
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;

     * Extracts the timestamp from the given element.
     * @param element The element that the timestamp is extracted from.
     * @return The new timestamp.
    public abstract long extractTimestamp(T element);

    public final Watermark getCurrentWatermark() {
        // this guarantees that the watermark never goes backwards.
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        return new Watermark(lastEmittedWatermark);

    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        return timestamp;
  • BoundedOutOfOrdernessTimestampExtractor抽象類實現AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同時聲明抽象方法extractAscendingTimestamp供子類實現
  • BoundedOutOfOrdernessTimestampExtractor的構造器接收maxOutOfOrderness參數用於指定element容許滯後(t-t_w,t爲element的eventTime,t_w爲前一次watermark的時間)的最大時間,在計算窗口數據時,若是超過該值則會被忽略
  • BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法會調用子類的extractTimestamp方法抽取時間,若是該時間大於currentMaxTimestamp,則更新currentMaxTimestamp;getCurrentWatermark先計算potentialWM,若是potentialWM大於等於lastEmittedWatermark則更新lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,這裏表示lastEmittedWatermark過小了因此差值超過了maxOutOfOrderness,於是調大lastEmittedWatermark),最後返回Watermark(lastEmittedWatermark)


public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
                new TaxiRideSchema(),
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
                // map grid cell to coordinates
                .map(new GridToCoordinates());


        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");

     * Assigns timestamps to TaxiRide records.
     * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
    public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

        public TaxiRideTSExtractor() {

        public long extractTimestamp(TaxiRide ride) {
            if (ride.isStart) {
                return ride.startTime.getMillis();
            else {
                return ride.endTime.getMillis();
  • 該實例使用的是AssignerWithPeriodicWatermarks,經過env.getConfig().setAutoWatermarkInterval(1000)設置了watermark的時間間隔,經過assignTimestampsAndWatermarks指定了AssignerWithPeriodicWatermarks爲TaxiRideTSExtractor,它繼承了BoundedOutOfOrdernessTimestampExtractor抽象類


  • flink爲了方便開發提供了幾個內置的Pre-defined Timestamp Extractors / Watermark Emitters,其中一個就是BoundedOutOfOrdernessTimestampExtractor
  • BoundedOutOfOrdernessTimestampExtractor抽象類實現AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同時聲明抽象方法extractAscendingTimestamp供子類實現
  • BoundedOutOfOrdernessTimestampExtractor的構造器接收maxOutOfOrderness參數用於指定element容許滯後(t-t_w,t爲element的eventTime,t_w爲前一次watermark的時間)的最大時間,在計算窗口數據時,若是超過該值則會被忽略

