聊聊flink的TimeCharacteristic

本文主要研究一下flink的TimeCharacteristichtml

TimeCharacteristic

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.javajava

/**
 * The time characteristic defines how the system determines time for time-dependent
 * order and operations that depend on time (such as time windows).
 */
@PublicEvolving
public enum TimeCharacteristic {

	/**
	 * Processing time for operators means that the operator uses the system clock of the machine
	 * to determine the current time of the data stream. Processing-time windows trigger based
	 * on wall-clock time and include whatever elements happen to have arrived at the operator at
	 * that point in time.
	 *
	 * <p>Using processing time for window operations results in general in quite non-deterministic
	 * results, because the contents of the windows depends on the speed in which elements arrive.
	 * It is, however, the cheapest method of forming windows and the method that introduces the
	 * least latency.
	 */
	ProcessingTime,

	/**
	 * Ingestion time means that the time of each individual element in the stream is determined
	 * when the element enters the Flink streaming data flow. Operations like windows group the
	 * elements based on that time, meaning that processing speed within the streaming dataflow
	 * does not affect windowing, but only the speed at which sources receive elements.
	 *
	 * <p>Ingestion time is often a good compromise between processing time and event time.
	 * It does not need and special manual form of watermark generation, and events are typically
	 * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
	 * only be introduced by streaming shuffles or split/join/union operations. The fact that
	 * elements are not very much out-of-order means that the latency increase is moderate,
	 * compared to event
	 * time.
	 */
	IngestionTime,

	/**
	 * Event time means that the time of each individual element in the stream (also called event)
	 * is determined by the event's individual custom timestamp. These timestamps either exist in
	 * the elements from before they entered the Flink streaming dataflow, or are user-assigned at
	 * the sources. The big implication of this is that it allows for elements to arrive in the
	 * sources and in all operators out of order, meaning that elements with earlier timestamps may
	 * arrive after elements with later timestamps.
	 *
	 * <p>Operators that window or order data with respect to event time must buffer data until they
	 * can be sure that all timestamps for a certain time interval have been received. This is
	 * handled by the so called "time watermarks".
	 *
	 * <p>Operations based on event time are very predictable - the result of windowing operations
	 * is typically identical no matter when the window is executed and how fast the streams
	 * operate. At the same time, the buffering and tracking of event time is also costlier than
	 * operating with processing time, and typically also introduces more latency. The amount of
	 * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
	 * time span between the arrival of early and late elements is. With respect to the
	 * "time watermarks", this means that the cost typically depends on how early or late the
	 * watermarks can be generated for their timestamp.
	 *
	 * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
	 * event's original time, rather than the time assigned at the data source. Practically, that
	 * means that event time has generally more meaning, but also that it takes longer to determine
	 * that all elements for a certain time have arrived.
	 */
	EventTime
}
  • ProcessingTime是以operator處理的時間爲準,它使用的是機器的系統時間來做爲data stream的時間
  • IngestionTime是以數據進入flink streaming data flow的時間爲準
  • EventTime是以數據自帶的時間戳字段爲準,應用程序須要指定如何從record中抽取時間戳字段

區別

各個時間的區別如上圖apache

實例

public static void main(String[] args) throws Exception {

		final int popThreshold = 20; // threshold for popular places

		// set up streaming execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.getConfig().setAutoWatermarkInterval(1000);

		// configure the Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
		kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
		// always read the Kafka topic from the start
		kafkaProps.setProperty("auto.offset.reset", "earliest");

		// create a Kafka consumer
		FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>(
				"cleansedRides",
				new TaxiRideSchema(),
				kafkaProps);
		// assign a timestamp extractor to the consumer
		consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

		// create a TaxiRide data stream
		DataStream<TaxiRide> rides = env.addSource(consumer);

		// find popular places
		DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
				// match ride to grid cell and event type (start or end)
				.map(new GridCellMatcher())
				// partition by cell id and event type
				.keyBy(0, 1)
				// build sliding window
				.timeWindow(Time.minutes(15), Time.minutes(5))
				// count ride events in window
				.apply(new RideCounter())
				// filter by popularity threshold
				.filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold))
				// map grid cell to coordinates
				.map(new GridToCoordinates());

		popularPlaces.print();

		// execute the transformation pipeline
		env.execute("Popular Places from Kafka");
	}

	/**
	 * Assigns timestamps to TaxiRide records.
	 * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted.
	 */
	public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> {

		public TaxiRideTSExtractor() {
			super(Time.seconds(MAX_EVENT_DELAY));
		}

		@Override
		public long extractTimestamp(TaxiRide ride) {
			if (ride.isStart) {
				return ride.startTime.getMillis();
			}
			else {
				return ride.endTime.getMillis();
			}
		}
	}
  • 這裏消費kafka的時候setStreamTimeCharacteristic爲TimeCharacteristic.EventTime,同時assignTimestampsAndWatermarks指定爲TaxiRideTSExtractor,它繼承了BoundedOutOfOrdernessTimestampExtractor,這裏的extractTimestamp根據ride的start與否返回ride.startTime.getMillis()或者ride.endTime.getMillis(),來自定義了eventTime

小結

  • flink的TimeCharacteristic枚舉定義了三類值,分別是ProcessingTime、IngestionTime、EventTime
  • ProcessingTime是以operator處理的時間爲準,它使用的是機器的系統時間來做爲data stream的時間;IngestionTime是以數據進入flink streaming data flow的時間爲準;EventTime是以數據自帶的時間戳字段爲準,應用程序須要指定如何從record中抽取時間戳字段
  • 指定爲EventTime的source須要本身定義event time以及emit watermark,或者在source以外經過assignTimestampsAndWatermarks在程序手工指定

doc

相關文章
相關標籤/搜索