本文主要研究一下flink的EventTimehtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javajava
/** * Interface that source functions use to emit elements, and possibly watermarks. * * @param <T> The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext<T> { /** * Emits one element from the source, without attaching a timestamp. In most cases, * this is the default way of emitting elements. * * <p>The timestamp that the element will get assigned depends on the time characteristic of * the streaming program: * <ul> * <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li> * <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's * current time as the timestamp.</li> * <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially. * It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent * operation (like time windows).</li> * </ul> * * @param element The element to emit */ void collect(T element); /** * Emits one element from the source, and attaches the given timestamp. This method * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner} * on the stream. * * <p>On certain time characteristics, this timestamp may be ignored or overwritten. * This allows programs to switch between the different time characteristics and behaviors * without changing the code of the source functions. * <ul> * <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored, * because processing time never works with element timestamps.</li> * <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the * system's current time, to realize proper ingestion time semantics.</li> * <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li> * </ul> * * @param element The element to emit * @param timestamp The timestamp in milliseconds since the Epoch */ @PublicEvolving void collectWithTimestamp(T element, long timestamp); /** * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no * elements with a timestamp {@code t' <= t} will occur any more. If further such * elements will be emitted, those elements are considered <i>late</i>. * * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}. * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the * automatic ingestion time watermarks. * * @param mark The Watermark to emit */ @PublicEvolving void emitWatermark(Watermark mark); /** * Marks the source to be temporarily idle. This tells the system that this source will * temporarily stop emitting records and watermarks for an indefinite amount of time. This * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their * watermarks without the need to wait for watermarks from this source while it is idle. * * <p>Source functions should make a best effort to call this method as soon as they * acknowledge themselves to be idle. The system will consider the source to resume activity * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)}, * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source. */ @PublicEvolving void markAsTemporarilyIdle(); /** * Returns the checkpoint lock. Please refer to the class-level comment in * {@link SourceFunction} for details about how to write a consistent checkpointed * source. * * @return The object to use as the lock */ Object getCheckpointLock(); /** * This method is called by the system to shut down the context. */ void close(); }
public abstract class TestSource implements SourceFunction { private volatile boolean running = true; protected Object[] testStream; @Override public void run(SourceContext ctx) throws Exception { for (int i = 0; (i < testStream.length) && running; i++) { if (testStream[i] instanceof TaxiRide) { TaxiRide ride = (TaxiRide) testStream[i]; ctx.collectWithTimestamp(ride, ride.getEventTime()); } else if (testStream[i] instanceof TaxiFare) { TaxiFare fare = (TaxiFare) testStream[i]; ctx.collectWithTimestamp(fare, fare.getEventTime()); } else if (testStream[i] instanceof String) { String s = (String) testStream[i]; ctx.collectWithTimestamp(s, 0); } else if (testStream[i] instanceof Long) { Long ts = (Long) testStream[i]; ctx.emitWatermark(new Watermark(ts)); } else { throw new RuntimeException(testStream[i].toString()); } } // test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes } @Override public void cancel() { running = false; } }
collectWithTimestamp
)以及emit watermark(emitWatermark
)flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache
/** * Assigns timestamps to the elements in the data stream and periodically creates * watermarks to signal event time progress. * * <p>This method creates watermarks periodically (for example every second), based * on the watermarks indicated by the given watermark generator. Even when no new elements * in the stream arrive, the given watermark generator will be periodically checked for * new watermarks. The interval in which watermarks are generated is defined in * {@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * <p>Use this method for the common cases, where some characteristic over all elements * should generate the watermarks, or where watermarks are simply trailing behind the * wall clock time by a certain amount. * * <p>For the second case and when the watermarks are required to lag behind the maximum * timestamp seen so far in the elements of the stream by a fixed amount of time, and this * amount is known in advance, use the * {@link BoundedOutOfOrdernessTimestampExtractor}. * * <p>For cases where watermarks should be created in an irregular fashion, for example * based on certain markers that some element carry, use the * {@link AssignerWithPunctuatedWatermarks}. * * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and * watermark generator. * @return The stream after the transformation, with assigned timestamps and watermarks. * * @see AssignerWithPeriodicWatermarks * @see AssignerWithPunctuatedWatermarks * @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks) */ public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); } /** * Assigns timestamps to the elements in the data stream and creates watermarks to * signal event time progress based on the elements themselves. * * <p>This method creates watermarks based purely on stream elements. For each element * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)}, * the {@link AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark(Object, long)} * method is called, and a new watermark is emitted, if the returned watermark value is * non-negative and greater than the previous watermark. * * <p>This method is useful when the data stream embeds watermark elements, or certain elements * carry a marker that can be used to determine the current event time watermark. * This operation gives the programmer full control over the watermark generation. Users * should be aware that too aggressive watermark generation (i.e., generating hundreds of * watermarks every second) can cost some performance. * * <p>For cases where watermarks should be created in a regular fashion, for example * every x milliseconds, use the {@link AssignerWithPeriodicWatermarks}. * * @param timestampAndWatermarkAssigner The implementation of the timestamp assigner and * watermark generator. * @return The stream after the transformation, with assigned timestamps and watermarks. * * @see AssignerWithPunctuatedWatermarks * @see AssignerWithPeriodicWatermarks * @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks) */ public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPunctuatedWatermarksOperator<T> operator = new TimestampsAndPunctuatedWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks類型
),告知flink如何提取eventTimeflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.javabootstrap
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> { /** * Returns the current watermark. This method is periodically called by the * system to retrieve the current watermark. The method may return {@code null} to * indicate that no new Watermark is available. * * <p>The returned watermark will be emitted only if it is non-null and its timestamp * is larger than that of the previously emitted watermark (to preserve the contract of * ascending watermarks). If the current watermark is still * identical to the previous one, no progress in event time has happened since * the previous call to this method. If a null value is returned, or the timestamp * of the returned watermark is smaller than that of the last emitted one, then no * new watermark will be generated. * * <p>The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. * * @see org.apache.flink.streaming.api.watermark.Watermark * @see ExecutionConfig#getAutoWatermarkInterval() * * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. */ @Nullable Watermark getCurrentWatermark(); }
定義了extractTimestamp方法
),這裏定義了getCurrentWatermark方法,該方法會被週期性調用返回current watermark,若是沒有的話返回nullpublic static void main(String[] args) throws Exception { final int popThreshold = 20; // threshold for popular places // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); // configure the Kafka consumer Properties kafkaProps = new Properties(); kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP); // always read the Kafka topic from the start kafkaProps.setProperty("auto.offset.reset", "earliest"); // create a Kafka consumer FlinkKafkaConsumer011<TaxiRide> consumer = new FlinkKafkaConsumer011<>( "cleansedRides", new TaxiRideSchema(), kafkaProps); // assign a timestamp extractor to the consumer consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor()); // create a TaxiRide data stream DataStream<TaxiRide> rides = env.addSource(consumer); // find popular places DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides // match ride to grid cell and event type (start or end) .map(new GridCellMatcher()) // partition by cell id and event type .keyBy(0, 1) // build sliding window .timeWindow(Time.minutes(15), Time.minutes(5)) // count ride events in window .apply(new RideCounter()) // filter by popularity threshold .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold)) // map grid cell to coordinates .map(new GridToCoordinates()); popularPlaces.print(); // execute the transformation pipeline env.execute("Popular Places from Kafka"); } /** * Assigns timestamps to TaxiRide records. * Watermarks are a fixed time interval behind the max timestamp and are periodically emitted. */ public static class TaxiRideTSExtractor extends BoundedOutOfOrdernessTimestampExtractor<TaxiRide> { public TaxiRideTSExtractor() { super(Time.seconds(MAX_EVENT_DELAY)); } @Override public long extractTimestamp(TaxiRide ride) { if (ride.isStart) { return ride.startTime.getMillis(); } else { return ride.endTime.getMillis(); } } }
BoundedOutOfOrdernessTimestampExtractor實現了AssignerWithPeriodicWatermarks接口
);這裏經過env.getConfig().setAutoWatermarkInterval(1000)來設置AssignerWithPeriodicWatermarks的間隔flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.javac#
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> { /** * Asks this implementation if it wants to emit a watermark. This method is called right after * the {@link #extractTimestamp(Object, long)} method. * * <p>The returned watermark will be emitted only if it is non-null and its timestamp * is larger than that of the previously emitted watermark (to preserve the contract of * ascending watermarks). If a null value is returned, or the timestamp of the returned * watermark is smaller than that of the last emitted one, then no new watermark will * be generated. * * <p>For an example how to use this method, see the documentation of * {@link AssignerWithPunctuatedWatermarks this class}. * * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. */ @Nullable Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp); }
定義了extractTimestamp方法
),這裏定義了checkAndGetNextWatermark方法,該方法會在extractTimestamp方法執行以後被調用(調用時經過方法參數傳遞剛獲取的extractedTimestamp
)public static void main(String[] args) throws Exception { // read parameters ParameterTool params = ParameterTool.fromArgs(args); String input = params.getRequired("input"); // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // connect to the data file DataStream<String> carData = env.readTextFile(input); // map to events DataStream<ConnectedCarEvent> events = carData .map((String line) -> ConnectedCarEvent.fromString(line)) .assignTimestampsAndWatermarks(new ConnectedCarAssigner()); // sort events events.keyBy((ConnectedCarEvent event) -> event.carId) .process(new SortFunction()) .print(); env.execute("Sort Connected Car Events"); } public class ConnectedCarAssigner implements AssignerWithPunctuatedWatermarks<ConnectedCarEvent> { @Override public long extractTimestamp(ConnectedCarEvent event, long previousElementTimestamp) { return event.timestamp; } @Override public Watermark checkAndGetNextWatermark(ConnectedCarEvent event, long extractedTimestamp) { // simply emit a watermark with every event return new Watermark(extractedTimestamp - 30000); } }
在flink中,timestamp assigners也定義瞭如何emit watermark,它們使用的是距離1970-01-01T00:00:00Z以來的毫秒數
)定義了getCurrentWatermark方法,用於返回當前的watermark;periodic間隔參數經過env.getConfig().setAutoWatermarkInterval(1000)來設置
);AssignerWithPunctuatedWatermarks(定義了checkAndGetNextWatermark方法,該方法會在extractTimestamp方法執行以後被調用(
調用時經過方法參數傳遞剛獲取的extractedTimestamp`)