本文主要研究一下flink的BoundedOutOfOrdernessTimestampExtractorhtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.javajava
/** * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can * help reduce the number of elements that are ignored due to lateness when computing the final result for a * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp. * */ public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; /** The current maximum timestamp seen so far. */ private long currentMaxTimestamp; /** The timestamp of the last emitted watermark. */ private long lastEmittedWatermark = Long.MIN_VALUE; /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; } public long getMaxOutOfOrdernessInMillis() { return maxOutOfOrderness; } /** * Extracts the timestamp from the given element. * * @param element The element that the timestamp is extracted from. * @return The new timestamp. */ public abstract long extractTimestamp(T element); @Override public final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. long potentialWM = currentMaxTimestamp - maxOutOfOrderness; if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark); } @Override public final long extractTimestamp(T element, long previousElementTimestamp) { long timestamp = extractTimestamp(element); if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } return timestamp; } }
t-t_w,t爲element的eventTime,t_w爲前一次watermark的時間
)的最大時間,在計算窗口數據時,若是超過該值則會被忽略currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,這裏表示lastEmittedWatermark過小了因此差值超過了maxOutOfOrderness,於是調大lastEmittedWatermark
),最後返回Watermark(lastEmittedWatermark)public static void main(String[] args) throws Exception { final int popThreshold = 20; // threshold for popular places // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP); // always read the Kafka topic from the start kafkaProps.setProperty("auto.offset.reset", "earliest"); // create a Kafka consumer FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>( "cleansedRides", new TaxiRideSchema(), kafkaProps); // assign a timestamp extractor to the consumer consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor()); // create a TaxiRide data stream DataStream<TaxiRide> rides = env.addSource(consumer); // find popular places DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides // match ride to grid cell and event type (start or end) .map(new GridCellMatcher()) // partition by cell id and event type .keyBy(0, 1) // build sliding window .timeWindow(Time.minutes(15), Time.minutes(5)) // count ride events in window .apply(new RideCounter()) // filter by popularity threshold .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold)) // map grid cell to coordinates .map(new GridToCoordinates()); popularPlaces.print(); // execute the transformation pipeline env.execute("Popular Places from Kafka"); } /** * Assigns timestamps to TaxiRide records. * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted. */ public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> { public TaxiRideTSExtractor() { super(Time.seconds(MAX_EVENT_DELAY)); } @Override public long extractTimestamp(TaxiRide ride) { if (ride.isStart) { return ride.startTime.getMillis(); } else { return ride.endTime.getMillis(); } } }
t-t_w,t爲element的eventTime,t_w爲前一次watermark的時間
)的最大時間,在計算窗口數據時,若是超過該值則會被忽略