時間戳和Watermark生成html
本文翻譯自Generating Timestamp / Watermarksapache
------------------------------------------------------------------api
本文是Flink在使用事件時間(Event Time)時相關內容,有關事件時間、處理時間和提取時間的介紹,請見event time introduction。app
流程序須要設置時間特徵爲Event time,才能在程序中使用事件時間。ide
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);性能
爲了使事件時間能夠正常使用,Flink須要知道時間的時間戳,即流中的每一個element都須要被賦予它本身的時間戳。Flink一般會從element的一些域訪問/提取時間戳(That happens usually by accessing/extracting the timestamp from some field in the element.)。spa
時間戳的賦值一般與Watermark的生成緊密相關,其中Watermark生成負責通知系統事件時間的增加狀況。翻譯
時間戳賦值和Watermark生成的方式有兩種:orm
1. 直接在數據流源處進行htm
2. 經過一個Timestamp assigner / watermark generator:在Flink中,Timestamp assigner一樣會定義watermark的發送行爲
注意:時間戳和Watermark都是使用從Java曆元(epoch) 「1970-01-01 T00.00.00Z」開始的毫秒數定義的
流的源能夠在它們生產的element中直接賦值時間戳以及發送Watermark。在此狀況下,咱們不須要Timestamp Assigner。
要在Source方法中向element直接賦值時間戳,Source方法必須在SourceContext上調用方法collectWithTimestamp(…)。要在Source中生成Watermark,Source必須調用emitWatermark(Watermark)方法。
在下例的(非檢查點的)Source方法中,方法直接向element賦值時間戳,而且根據特殊事件生成Watermark:
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
注意:若是流程序在已經擁有時間戳的流上繼續使用TimestampAssigner,流中element的原有時間戳將被TimestampAssigner重寫。相似地,Watermark也會一樣被重寫。
Timestamp Assigner接收一個流而且產生一個帶有時間戳賦值element和Watermark的新的流。若是原有的流已經擁有了時間戳或Watermark,則Timestamp Assigner將會重寫它們。
一般在緊接着數據源以後會定義Timestamp Assigner,但這並非嚴格要求的。例如在通用的模式中,會在Timestamp Assigner以前進行parse(MapFunction)和filter(FilterFunction)操做。不論在什麼狀況下,Timestamp Assigner都須要在第一個使用事件時間的Operation(如第一個窗口Operation)以前定義。而在流Job中使用Kafka做爲數據源是一個特殊狀況,Flink容許在數據源(或數據消費者consumer)內部定義Timestamp Assigner和Watermark emitter,更多相關信息請見Kafka Connector documentation。
注意:本節餘下內容呈現了一個開發者建立本身的Timestamp Assigner 和 watermark emitter所須要實現的主要接口。有關Flink自帶的預先實現的extractor,請見Pre-defined Timestamp Extractors / Watermark Emitters
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
週期性Watermark
AssignerWithPeriodicWatermark賦值時間戳並週期性生成(生成方式有多是依靠流的element,或者純粹基於處理時間)。
生成Watermark的時間週期區間(每n毫秒)的大小能夠經過ExecutionConfig.setAutoWatermarkInterval(…)設置。每一次生成時,都將會調用Assigner的getCurrentWatermark()方法,若是返回的Watermark是非null且大於前一個Watermark,則會發送一個新的Watermark。
下面是兩個生成周期性Watermark和Timestamp Assigner的例子
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
public class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a certain amount.
* It assumes that elements arrive in Flink after at most a certain time.
*/
public class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
帶標點(punctuated)Watermark
爲了在某事件下就產生Watermark,咱們須要使用AssignerWithPunctuatedWatermarks。在該類中,Flink會先調用extractTimestamp(…)方法來給element賦值一個時間戳,而後針對該element即刻調用checkAndGetNextWatermark(…)方法來返回一個非null的Watermark。
checkAndGetNextWatermark(…)方法將得到在extractTimestamp(…)方法中得到的時間戳,並決定是否生成Watermark。一旦checkAndGetNextWatermark(…)方法返回一個非null的Watermark,而且該Watermark大於最近的上一個Watermark,則發送該新的Watermark。
public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return element.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
注意:在每一個事件上都生成一個Watermark是可能存在的,可是因爲每一個Watermark都會致使下游的計算開銷,過多的Watermark會下降程序的性能