本文主要研究一下flink的TimeCharacteristichtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/TimeCharacteristic.javajava
/** * The time characteristic defines how the system determines time for time-dependent * order and operations that depend on time (such as time windows). */ @PublicEvolving public enum TimeCharacteristic { /** * Processing time for operators means that the operator uses the system clock of the machine * to determine the current time of the data stream. Processing-time windows trigger based * on wall-clock time and include whatever elements happen to have arrived at the operator at * that point in time. * * <p>Using processing time for window operations results in general in quite non-deterministic * results, because the contents of the windows depends on the speed in which elements arrive. * It is, however, the cheapest method of forming windows and the method that introduces the * least latency. */ ProcessingTime, /** * Ingestion time means that the time of each individual element in the stream is determined * when the element enters the Flink streaming data flow. Operations like windows group the * elements based on that time, meaning that processing speed within the streaming dataflow * does not affect windowing, but only the speed at which sources receive elements. * * <p>Ingestion time is often a good compromise between processing time and event time. * It does not need and special manual form of watermark generation, and events are typically * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can * only be introduced by streaming shuffles or split/join/union operations. The fact that * elements are not very much out-of-order means that the latency increase is moderate, * compared to event * time. */ IngestionTime, /** * Event time means that the time of each individual element in the stream (also called event) * is determined by the event's individual custom timestamp. These timestamps either exist in * the elements from before they entered the Flink streaming dataflow, or are user-assigned at * the sources. The big implication of this is that it allows for elements to arrive in the * sources and in all operators out of order, meaning that elements with earlier timestamps may * arrive after elements with later timestamps. * * <p>Operators that window or order data with respect to event time must buffer data until they * can be sure that all timestamps for a certain time interval have been received. This is * handled by the so called "time watermarks". * * <p>Operations based on event time are very predictable - the result of windowing operations * is typically identical no matter when the window is executed and how fast the streams * operate. At the same time, the buffering and tracking of event time is also costlier than * operating with processing time, and typically also introduces more latency. The amount of * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the * time span between the arrival of early and late elements is. With respect to the * "time watermarks", this means that the cost typically depends on how early or late the * watermarks can be generated for their timestamp. * * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the * event's original time, rather than the time assigned at the data source. Practically, that * means that event time has generally more meaning, but also that it takes longer to determine * that all elements for a certain time have arrived. */ EventTime }
各個時間的區別如上圖
public static void main(String[] args) throws Exception { final int popThreshold = 20; // threshold for popular places // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP); // always read the Kafka topic from the start kafkaProps.setProperty("auto.offset.reset", "earliest"); // create a Kafka consumer FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>( "cleansedRides", new TaxiRideSchema(), kafkaProps); // assign a timestamp extractor to the consumer consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor()); // create a TaxiRide data stream DataStream<TaxiRide> rides = env.addSource(consumer); // find popular places DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides // match ride to grid cell and event type (start or end) .map(new GridCellMatcher()) // partition by cell id and event type .keyBy(0, 1) // build sliding window .timeWindow(Time.minutes(15), Time.minutes(5)) // count ride events in window .apply(new RideCounter()) // filter by popularity threshold .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold)) // map grid cell to coordinates .map(new GridToCoordinates()); popularPlaces.print(); // execute the transformation pipeline env.execute("Popular Places from Kafka"); } /** * Assigns timestamps to TaxiRide records. * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted. */ public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> { public TaxiRideTSExtractor() { super(Time.seconds(MAX_EVENT_DELAY)); } @Override public long extractTimestamp(TaxiRide ride) { if (ride.isStart) { return ride.startTime.getMillis(); } else { return ride.endTime.getMillis(); } } }