聊聊flink的BoundedOutOfOrdernessTimestampExtractor

本文主要研究一下flink的BoundedOutOfOrdernessTimestampExtractorhtml

BoundedOutOfOrdernessTimestampExtractor

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.javajava

/**
 * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
 * the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
 * help reduce the number of elements that are ignored due to lateness when computing the final result for a
 * given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
 * after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
 * */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

	private static final long serialVersionUID = 1L;

	/** The current maximum timestamp seen so far. */
	private long currentMaxTimestamp;

	/** The timestamp of the last emitted watermark. */
	private long lastEmittedWatermark = Long.MIN_VALUE;

	/**
	 * The (fixed) interval between the maximum seen timestamp seen in the records
	 * and that of the watermark to be emitted.
	 */
	private final long maxOutOfOrderness;

	public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
		if (maxOutOfOrderness.toMilliseconds() < 0) {
			throw new RuntimeException("Tried to set the maximum allowed " +
				"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
		}
		this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
		this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
	}

	public long getMaxOutOfOrdernessInMillis() {
		return maxOutOfOrderness;
	}

	/**
	 * Extracts the timestamp from the given element.
	 *
	 * @param element The element that the timestamp is extracted from.
	 * @return The new timestamp.
	 */
	public abstract long extractTimestamp(T element);

	@Override
	public final Watermark getCurrentWatermark() {
		// this guarantees that the watermark never goes backwards.
		long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
		if (potentialWM >= lastEmittedWatermark) {
			lastEmittedWatermark = potentialWM;
		}
		return new Watermark(lastEmittedWatermark);
	}

	@Override
	public final long extractTimestamp(T element, long previousElementTimestamp) {
		long timestamp = extractTimestamp(element);
		if (timestamp > currentMaxTimestamp) {
			currentMaxTimestamp = timestamp;
		}
		return timestamp;
	}
}
  • BoundedOutOfOrdernessTimestampExtractor抽象類實現AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同時聲明抽象方法extractAscendingTimestamp供子類實現
  • BoundedOutOfOrdernessTimestampExtractor的構造器接收maxOutOfOrderness參數用於指定element容許滯後(t-t_w,t爲element的eventTime,t_w爲前一次watermark的時間)的最大時間,在計算窗口數據時,若是超過該值則會被忽略
  • BoundedOutOfOrdernessTimestampExtractor的extractTimestamp方法會調用子類的extractTimestamp方法抽取時間,若是該時間大於currentMaxTimestamp,則更新currentMaxTimestamp;getCurrentWatermark先計算potentialWM,若是potentialWM大於等於lastEmittedWatermark則更新lastEmittedWatermark(currentMaxTimestamp - lastEmittedWatermark >= maxOutOfOrderness,這裏表示lastEmittedWatermark過小了因此差值超過了maxOutOfOrderness,於是調大lastEmittedWatermark),最後返回Watermark(lastEmittedWatermark)

實例

public static void main(String[] args) throws Exception {

		final int popThreshold = 20; // threshold for popular places

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.getConfig().setAutoWatermarkInterval(1000);

		// configure the Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
		kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
		// always read the Kafka topic from the start
		kafkaProps.setProperty("auto.offset.reset", "earliest");

		// create a Kafka consumer
		FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
				"cleansedRides",
				new TaxiRideSchema(),
				kafkaProps);
		// assign a timestamp extractor to the consumer
		consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

		// create a TaxiRide data stream
		DataStream<TaxiRide> rides = env.addSource(consumer);

		// find popular places
		DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
				// match ride to grid cell and event type (start or end)
				.map(new GridCellMatcher())
				// partition by cell id and event type
				.keyBy(0, 1)
				// build sliding window
				.timeWindow(Time.minutes(15), Time.minutes(5))
				// count ride events in window
				.apply(new RideCounter())
				// filter by popularity threshold
				.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
				// map grid cell to coordinates
				.map(new GridToCoordinates());

		popularPlaces.print();

		// execute the transformation pipeline
		env.execute("Popular Places from Kafka");
	}

	/**
	 * Assigns timestamps to TaxiRide records.
	 * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
	 */
	public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

		public TaxiRideTSExtractor() {
			super(Time.seconds(MAX_EVENT_DELAY));
		}

		@Override
		public long extractTimestamp(TaxiRide ride) {
			if (ride.isStart) {
				return ride.startTime.getMillis();
			}
			else {
				return ride.endTime.getMillis();
			}
		}
	}
  • 該實例使用的是AssignerWithPeriodicWatermarks,經過env.getConfig().setAutoWatermarkInterval(1000)設置了watermark的時間間隔,經過assignTimestampsAndWatermarks指定了AssignerWithPeriodicWatermarks爲TaxiRideTSExtractor,它繼承了BoundedOutOfOrdernessTimestampExtractor抽象類

小結

  • flink爲了方便開發提供了幾個內置的Pre-defined Timestamp Extractors / Watermark Emitters,其中一個就是BoundedOutOfOrdernessTimestampExtractor
  • BoundedOutOfOrdernessTimestampExtractor抽象類實現AssignerWithPeriodicWatermarks接口的extractTimestamp及getCurrentWatermark方法,同時聲明抽象方法extractAscendingTimestamp供子類實現
  • BoundedOutOfOrdernessTimestampExtractor的構造器接收maxOutOfOrderness參數用於指定element容許滯後(t-t_w,t爲element的eventTime,t_w爲前一次watermark的時間)的最大時間,在計算窗口數據時,若是超過該值則會被忽略

doc

相關文章
相關標籤/搜索