聊聊flink的EventTime

本文主要研究一下flink的EventTimehtml

SourceFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javajava

/**
	 * Interface that source functions use to emit elements, and possibly watermarks.
	 *
	 * @param <T> The type of the elements produced by the source.
	 */
	@Public // Interface might be extended in the future with additional methods.
	interface SourceContext<T> {

		/**
		 * Emits one element from the source, without attaching a timestamp. In most cases,
		 * this is the default way of emitting elements.
		 *
		 * <p>The timestamp that the element will get assigned depends on the time characteristic of
		 * the streaming program:
		 * <ul>
		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
		 *         current time as the timestamp.</li>
		 *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
		 *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
		 *         operation (like time windows).</li>
		 * </ul>
		 *
		 * @param element The element to emit
		 */
		void collect(T element);

		/**
		 * Emits one element from the source, and attaches the given timestamp. This method
		 * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
		 * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
		 * on the stream.
		 *
		 * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
		 * This allows programs to switch between the different time characteristics and behaviors
		 * without changing the code of the source functions.
		 * <ul>
		 *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
		 *         because processing time never works with element timestamps.</li>
		 *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
		 *         system's current time, to realize proper ingestion time semantics.</li>
		 *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
		 * </ul>
		 *
		 * @param element The element to emit
		 * @param timestamp The timestamp in milliseconds since the Epoch
		 */
		@PublicEvolving
		void collectWithTimestamp(T element, long timestamp);

		/**
		 * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
		 * elements with a timestamp {@code t' <= t} will occur any more. If further such
		 * elements will be emitted, those elements are considered <i>late</i>.
		 *
		 * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
		 * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
		 * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
		 * automatic ingestion time watermarks.
		 *
		 * @param mark The Watermark to emit
		 */
		@PublicEvolving
		void emitWatermark(Watermark mark);

		/**
		 * Marks the source to be temporarily idle. This tells the system that this source will
		 * temporarily stop emitting records and watermarks for an indefinite amount of time. This
		 * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
		 * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
		 * watermarks without the need to wait for watermarks from this source while it is idle.
		 *
		 * <p>Source functions should make a best effort to call this method as soon as they
		 * acknowledge themselves to be idle. The system will consider the source to resume activity
		 * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
		 * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
		 */
		@PublicEvolving
		void markAsTemporarilyIdle();

		/**
		 * Returns the checkpoint lock. Please refer to the class-level comment in
		 * {@link SourceFunction} for details about how to write a consistent checkpointed
		 * source.
		 *
		 * @return The object to use as the lock
		 */
		Object getCheckpointLock();

		/**
		 * This method is called by the system to shut down the context.
		 */
		void close();
	}
  • SourceFunction裏頭定義了SourceContext接口,它裏頭定義了collectWithTimestamp、emitWatermark方法,前者用來assign event timestamp,後者用來emit watermark

實例

public abstract class TestSource implements SourceFunction {
	private volatile boolean running = true;
	protected Object[] testStream;

	@Override
	public void run(SourceContext ctx) throws Exception {
		for (int i = 0; (i < testStream.length) && running; i++) {
			if (testStream[i] instanceof TaxiRide) {
				TaxiRide ride = (TaxiRide) testStream[i];
				ctx.collectWithTimestamp(ride, ride.getEventTime());
			} else if (testStream[i] instanceof TaxiFare) {
				TaxiFare fare = (TaxiFare) testStream[i];
				ctx.collectWithTimestamp(fare, fare.getEventTime());
			} else if (testStream[i] instanceof String) {
				String s = (String) testStream[i];
				ctx.collectWithTimestamp(s, 0);
			} else if (testStream[i] instanceof Long) {
				Long ts = (Long) testStream[i];
				ctx.emitWatermark(new Watermark(ts));
			} else {
				throw new RuntimeException(testStream[i].toString());
			}
		}
		// test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes
	}

	@Override
	public void cancel() {
		running = false;
	}
}
  • 這裏展現瞭如何在SourceFunction裏頭來assign timestamp(collectWithTimestamp)以及emit watermark(emitWatermark)

DataStream.assignTimestampsAndWatermarks

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache

/**
	 * Assigns timestamps to the elements in the data stream and periodically creates
	 * watermarks to signal event time progress.
	 *
	 * <p>This method creates watermarks periodically (for example every second), based
	 * on the watermarks indicated by the given watermark generator. Even when no new elements
	 * in the stream arrive, the given watermark generator will be periodically checked for
	 * new watermarks. The interval in which watermarks are generated is defined in
	 * {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
	 *
	 * <p>Use this method for the common cases, where some characteristic over all elements
	 * should generate the watermarks, or where watermarks are simply trailing behind the
	 * wall clock time by a certain amount.
	 *
	 * <p>For the second case and when the watermarks are required to lag behind the maximum
	 * timestamp seen so far in the elements of the stream by a fixed amount of time, and this
	 * amount is known in advance, use the
	 * {@link BoundedOutOfOrdernessTimestampExtractor}.
	 *
	 * <p>For cases where watermarks should be created in an irregular fashion, for example
	 * based on certain markers that some element carry, use the
	 * {@link AssignerWithPunctuatedWatermarks}.
	 *
	 * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
	 *                                      watermark generator.
	 * @return The stream after the transformation, with assigned timestamps and watermarks.
	 *
	 * @see AssignerWithPeriodicWatermarks
	 * @see AssignerWithPunctuatedWatermarks
	 * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
	 */
	public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
			AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

		// match parallelism to input, otherwise dop=1 sources could lead to some strange
		// behaviour: the watermark will creep along very slowly because the elements
		// from the source go to each extraction operator round robin.
		final int inputParallelism = getTransformation().getParallelism();
		final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

		TimestampsAndPeriodicWatermarksOperator<T> operator =
				new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

		return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
				.setParallelism(inputParallelism);
	}

	/**
	 * Assigns timestamps to the elements in the data stream and creates watermarks to
	 * signal event time progress based on the elements themselves.
	 *
	 * <p>This method creates watermarks based purely on stream elements. For each element
	 * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
	 * the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)}
	 * method is called, and a new watermark is emitted, if the returned watermark value is
	 * non-negative and greater than the previous watermark.
	 *
	 * <p>This method is useful when the data stream embeds watermark elements, or certain elements
	 * carry a marker that can be used to determine the current event time watermark.
	 * This operation gives the programmer full control over the watermark generation. Users
	 * should be aware that too aggressive watermark generation (i.e., generating hundreds of
	 * watermarks every second) can cost some performance.
	 *
	 * <p>For cases where watermarks should be created in a regular fashion, for example
	 * every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}.
	 *
	 * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and
	 *                                      watermark generator.
	 * @return The stream after the transformation, with assigned timestamps and watermarks.
	 *
	 * @see AssignerWithPunctuatedWatermarks
	 * @see AssignerWithPeriodicWatermarks
	 * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
	 */
	public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
			AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) {

		// match parallelism to input, otherwise dop=1 sources could lead to some strange
		// behaviour: the watermark will creep along very slowly because the elements
		// from the source go to each extraction operator round robin.
		final int inputParallelism = getTransformation().getParallelism();
		final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

		TimestampsAndPunctuatedWatermarksOperator<T> operator =
				new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner);

		return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
				.setParallelism(inputParallelism);
	}
  • DataStream定義了assignTimestampsAndWatermarks方法,用來在source外頭設置timestampAndWatermarkAssigner(AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks類型),告知flink如何提取eventTime

AssignerWithPeriodicWatermarks

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.javabootstrap

public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {

	/**
	 * Returns the current watermark. This method is periodically called by the
	 * system to retrieve the current watermark. The method may return {@code null} to
	 * indicate that no new Watermark is available.
	 *
	 * <p>The returned watermark will be emitted only if it is non-null and its timestamp
	 * is larger than that of the previously emitted watermark (to preserve the contract of
	 * ascending watermarks). If the current watermark is still
	 * identical to the previous one, no progress in event time has happened since
	 * the previous call to this method. If a null value is returned, or the timestamp
	 * of the returned watermark is smaller than that of the last emitted one, then no
	 * new watermark will be generated.
	 *
	 * <p>The interval in which this method is called and Watermarks are generated
	 * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
	 *
	 * @see org.apache.flink.streaming.api.watermark.Watermark
	 * @see ExecutionConfig#getAutoWatermarkInterval()
	 *
	 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
	 */
	@Nullable
	Watermark getCurrentWatermark();
}
  • AssignerWithPeriodicWatermarks繼承了TimestampAssigner接口(定義了extractTimestamp方法),這裏定義了getCurrentWatermark方法,該方法會被週期性調用返回current watermark,若是沒有的話返回null

AssignerWithPeriodicWatermarks實例

public static void main(String[] args) throws Exception {

		final int popThreshold = 20; // threshold for popular places

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.getConfig().setAutoWatermarkInterval(1000);

		// configure the Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
		kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
		// always read the Kafka topic from the start
		kafkaProps.setProperty("auto.offset.reset", "earliest");

		// create a Kafka consumer
		FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
				"cleansedRides",
				new TaxiRideSchema(),
				kafkaProps);
		// assign a timestamp extractor to the consumer
		consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

		// create a TaxiRide data stream
		DataStream<TaxiRide> rides = env.addSource(consumer);

		// find popular places
		DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
				// match ride to grid cell and event type (start or end)
				.map(new GridCellMatcher())
				// partition by cell id and event type
				.keyBy(0, 1)
				// build sliding window
				.timeWindow(Time.minutes(15), Time.minutes(5))
				// count ride events in window
				.apply(new RideCounter())
				// filter by popularity threshold
				.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
				// map grid cell to coordinates
				.map(new GridToCoordinates());

		popularPlaces.print();

		// execute the transformation pipeline
		env.execute("Popular Places from Kafka");
	}

	/**
	 * Assigns timestamps to TaxiRide records.
	 * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
	 */
	public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

		public TaxiRideTSExtractor() {
			super(Time.seconds(MAX_EVENT_DELAY));
		}

		@Override
		public long extractTimestamp(TaxiRide ride) {
			if (ride.isStart) {
				return ride.startTime.getMillis();
			}
			else {
				return ride.endTime.getMillis();
			}
		}
	}
  • 這裏使用了DataStream的assignTimestampsAndWatermarks方法,設置的timestampAndWatermarkAssigner實現了AssignerWithPeriodicWatermarks接口(BoundedOutOfOrdernessTimestampExtractor實現了AssignerWithPeriodicWatermarks接口);這裏經過env.getConfig().setAutoWatermarkInterval(1000)來設置AssignerWithPeriodicWatermarks的間隔

AssignerWithPunctuatedWatermarks

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.javac#

public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {

	/**
	 * Asks this implementation if it wants to emit a watermark. This method is called right after
	 * the {@link #extractTimestamp(Object, long)} method.
	 *
	 * <p>The returned watermark will be emitted only if it is non-null and its timestamp
	 * is larger than that of the previously emitted watermark (to preserve the contract of
	 * ascending watermarks). If a null value is returned, or the timestamp of the returned
	 * watermark is smaller than that of the last emitted one, then no new watermark will
	 * be generated.
	 *
	 * <p>For an example how to use this method, see the documentation of
	 * {@link AssignerWithPunctuatedWatermarks this class}.
	 *
	 * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit.
	 */
	@Nullable
	Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
  • AssignerWithPunctuatedWatermarks接口繼承了TimestampAssigner接口(定義了extractTimestamp方法),這裏定義了checkAndGetNextWatermark方法,該方法會在extractTimestamp方法執行以後被調用(調用時經過方法參數傳遞剛獲取的extractedTimestamp)

AssignerWithPunctuatedWatermarks實例

public static void main(String[] args) throws Exception {

		// read parameters
		ParameterTool params = ParameterTool.fromArgs(args);
		String input = params.getRequired("input");

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.setParallelism(1);

		// connect to the data file
		DataStream<String> carData = env.readTextFile(input);

		// map to events
		DataStream<ConnectedCarEvent> events = carData
				.map((String line) -> ConnectedCarEvent.fromString(line))
				.assignTimestampsAndWatermarks(new ConnectedCarAssigner());

		// sort events
		events.keyBy((ConnectedCarEvent event) -> event.carId)
				.process(new SortFunction())
				.print();

		env.execute("Sort Connected Car Events");
	}

public class ConnectedCarAssigner implements AssignerWithPunctuatedWatermarks<ConnectedCarEvent> {
	@Override
	public long extractTimestamp(ConnectedCarEvent event, long previousElementTimestamp) {
		return event.timestamp;
	}

	@Override
	public Watermark checkAndGetNextWatermark(ConnectedCarEvent event, long extractedTimestamp) {
		// simply emit a watermark with every event
		return new Watermark(extractedTimestamp - 30000);
	}
}
  • 這裏使用了DataStream的assignTimestampsAndWatermarks方法,設置的timestampAndWatermarkAssigner實現了AssignerWithPunctuatedWatermarks接口

小結

  • 使用EventTime的話就須要告知flink每一個數據的eventTime從哪裏取,這個一般跟generate watermarks操做一塊兒告知flink eventTime;有兩種方式,一種是data stream source內部處理,一種是經過timestam assigner/watermark generator(在flink中,timestamp assigners也定義瞭如何emit watermark,它們使用的是距離1970-01-01T00:00:00Z以來的毫秒數)
  • 在source裏頭定義的話,即便用SourceFunction裏頭定義的SourceContext接口的collectWithTimestamp、emitWatermark方法,前者用來assign event timestamp,後者用來emit watermark
  • 在source外頭定義的話,就是經過DataStream的assignTimestampsAndWatermarks方法,設置timestampAndWatermarkAssigner;它有兩種類型:AssignerWithPeriodicWatermarks(定義了getCurrentWatermark方法,用於返回當前的watermark;periodic間隔參數經過env.getConfig().setAutoWatermarkInterval(1000)來設置);AssignerWithPunctuatedWatermarks(定義了checkAndGetNextWatermark方法,該方法會在extractTimestamp方法執行以後被調用(調用時經過方法參數傳遞剛獲取的extractedTimestamp`)

doc

相關文章
相關標籤/搜索