本文翻譯自flink官網:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.htmlhtml
Assigning Timestamps程序員
Source Functions with Timestamps and Watermarksapache
Timestamp Assigners / Watermark Generatorside
Timestamps per Kafka Partition性能
本章對應執行在事件時間的程序。事件事件、處理時間、攝取時間(收到數據的時間)的介紹能夠跳轉到 introduction to event time 。spa
要處理事件時間,流程序須要相應地設置時間特性。翻譯
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
爲了處理事件時間,Flink須要知道事件的時間戳,這意味着流中的每一個元素都須要分配其事件時間戳。這一般經過從元素中的某個字段訪問/提取時間戳來完成。code
爲了處理事件時間,Flink須要知道事件的時間戳,這意味着流中的每一個元素都須要指定其事件時間戳(用事件的時間戳來指定)。這一般經過從元素中的某個字段訪問/提取時間戳來完成。orm
時間戳分配與生成水印密切相關,水印告訴系統事件時間的進展。htm
有兩種方法能夠分配時間戳並生成水印:
直接在數據流源中
經過時間戳分配器/水印生成器:在Flink中,時間戳分配器還定義要發出的水印
注意:兩週時間戳和水印都指定爲毫秒,而且自Java紀元 1970-01-01 00:00:00 開始
stream source 能夠直接爲它們生成的元素分配時間戳,它們也能夠發出水印。完成此操做後,不須要時間戳分配器。請注意,若是使用時間戳分配器,則將覆蓋source提供的任什麼時候間戳和水印。
要直接爲source中的元素分配時間戳,source必須在SourceContext上使用collectWithTimestamp(...)方法。要生成水印,source必須調用emitWatermark(水印)功能。
下面是一個簡單的(沒有checkpoint)源代碼示例,用於分配時間戳並生成水印:
override def run(ctx: SourceContext[MyType]): Unit = { while (/* condition */) { val next: MyType = getNext() ctx.collectWithTimestamp(next, next.eventTimestamp) if (next.hasWatermarkTime) { ctx.emitWatermark(new Watermark(next.getWatermarkTime)) } } }
時間戳分配器獲取流並生成帶有帶時間戳元素和水印的新流。若是原始流已經有時間戳和/或水印,則時間戳分配器會覆蓋它們。
時間戳分配器一般在數據源以後當即指定,但並不是嚴格要求這樣作。例如,常見的模式是在時間戳分配器以前解析(MapFunction)和過濾(FilterFunction)。在任何狀況下,須要在事件時間的第一個操做以前指定時間戳分配器(例如第一個窗口操做)。做爲一種特殊狀況,當使用Kafka做爲流式傳輸做業的source時,Flink容許在source(或消費者)內指定時間戳分配器/水印發射器。有關如何執行此操做的更多信息,請參閱Kafka Connector文檔。
注意:本節的其他部分介紹了程序員必須實現的主要接口,以便建立本身的時間戳提取器/水印發射器。要查看Flink附帶的預先實現的提取器,請參閱預約義的時間戳提取器/水印發射器頁面。
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream: DataStream[MyEvent] = env.readFile( myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter()) val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks()) withTimestampsAndWatermarks .keyBy( _.getGroup ) .timeWindow(Time.seconds(10)) .reduce( (a, b) => a.add(b) ) .addSink(...)
AssignerWithPeriodicWatermarks按期分配時間戳並生成水印(可能取決於流元素,或純粹基於處理時間)。
生成水印的間隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定義。每次調用分配器的getCurrentWatermark()方法,若是返回的水印非空且大於前一個水印,則會發出新的水印。
這裏咱們展現了兩個使用週期性水印生成的時間戳分配器的簡單示例。請注意,Flink附帶了一個BoundedOutOfOrdernessTimestampExtractor,相似於下面顯示的BoundedOutOfOrdernessGenerator,您能夠在這裏閱讀。
/** * This generator generates watermarks assuming that elements arrive out of order, * but only to a certain degree. The latest elements for a certain timestamp t will arrive * at most n milliseconds after the earliest elements for timestamp t. */ class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxOutOfOrderness = 3500L // 3.5 seconds var currentMaxTimestamp: Long = _ override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { val timestamp = element.getCreationTime() currentMaxTimestamp = max(timestamp, currentMaxTimestamp) timestamp } override def getCurrentWatermark(): Watermark = { // return the watermark as current highest timestamp minus the out-of-orderness bound new Watermark(currentMaxTimestamp - maxOutOfOrderness) } } /** * This generator generates watermarks that are lagging behind processing time by a fixed amount. * It assumes that elements arrive in Flink after a bounded delay. */ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxTimeLag = 5000L // 5 seconds override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def getCurrentWatermark(): Watermark = { // return the watermark as current time minus the maximum time lag new Watermark(System.currentTimeMillis() - maxTimeLag) } }
要每一個時間均可能生成新水印時生成水印(To generate watermarks whenever a certain event indicates that a new watermark might be generated,注:只會生成大於上一次水印的新水印),請使用AssignerWithPunctuatedWatermarks。對於此類,Flink將首先調用extractTimestamp(...)方法爲元素分配時間戳,而後當即調用該元素上的checkAndGetNextWatermark(...)方法。
checkAndGetNextWatermark(...)方法傳遞在extractTimestamp(...)方法中分配的時間戳,並能夠決定是否要生成水印。每當checkAndGetNextWatermark(...)方法返回非空水印,而且該水印大於最新的先前水印時,將發出該新水印。
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] { override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { element.getCreationTime } override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = { if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null } }
注意:能夠在每一個事件上生成水印。然而,由於每一個水印在下游引發一些計算,因此過多的水印會下降性能。
Timestamps per Kafka Partition
當使用Apache Kafka做爲數據源時,每一個Kafka分區可能具備簡單的事件時間模式(升序時間戳或有界無序)。可是,當從Kafka消費流時,多個分區一般並行消耗,交錯來自分區的事件並破壞每一個分區模式(這是Kafka的消費者客戶端工做的固有方式)。
在這種狀況下,您可使用Flink的Kafka分區感知水印生成。使用該功能,根據Kafka分區在Kafka消費者內部生成水印,而且每一個分區水印的合併方式與在流shuffle上合併水印的方式相同。
例如,若是事件時間戳嚴格按每一個Kafka分區升序,則使用升序時間戳水印生成器生成每分區水印將產生完美的總體水印。
下圖顯示瞭如何使用per-Kafka分區水印生成,以及在這種狀況下水印如何經過流數據流傳播。
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props) kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] { def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp }) val stream: DataStream[MyType] = env.addSource(kafkaSource)
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文