Flink Program Guide (4) -- 時間戳和Watermark生成(DataStream API編程指導 -- For Java)

時間戳和Watermark生成html

本文翻譯自Generating Timestamp / Watermarksapache

------------------------------------------------------------------api

本文是Flink在使用事件時間(Event Time)時相關內容,有關事件時間、處理時間和提取時間的介紹,請見event time introductionapp

 

流程序須要設置時間特徵Event time,才能在程序中使用事件時間。ide

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.
setStreamTimeCharacteristic(TimeCharacteristic.EventTime);性能

 

1、時間戳賦值

爲了使事件時間能夠正常使用,Flink須要知道時間的時間戳,即流中的每一個element都須要被賦予它本身的時間戳。Flink一般會從element的一些域訪問/提取時間戳(That happens usually by accessing/extracting the timestamp from some field in the element.)。spa

 

時間戳的賦值一般與Watermark的生成緊密相關,其中Watermark生成負責通知系統事件時間的增加狀況。翻譯

 

時間戳賦值和Watermark生成的方式有兩種:orm

1.    直接在數據流源處進行htm

2.    經過一個Timestamp assigner / watermark generator:在Flink中,Timestamp assigner一樣會定義watermark的發送行爲

 

注意:時間戳和Watermark都是使用從Java曆元(epoch 「1970-01-01 T00.00.00Z」開始的毫秒數定義的

 

1.1 帶有時間戳和WatermarkSource方法

流的源能夠在它們生產的element中直接賦值時間戳以及發送Watermark。在此狀況下,咱們不須要Timestamp Assigner

要在Source方法中向element直接賦值時間戳,Source方法必須在SourceContext上調用方法collectWithTimestamp(…)。要在Source中生成WatermarkSource必須調用emitWatermark(Watermark)方法。

 

在下例的(非檢查點的)Source方法中,方法直接向element賦值時間戳,而且根據特殊事件生成Watermark

@Override
public
void run(SourceContext<MyType> ctx) throws Exception {
  while (
/* condition */) {
    MyType next = getNext();
     ctx.
collectWithTimestamp(next, next.getEventTimestamp());

    if (next.hasWatermarkTime()) {
      ctx.
emitWatermark(new Watermark(next.getWatermarkTime()));
    }
  }
}

 

注意:若是流程序在已經擁有時間戳的流上繼續使用TimestampAssigner,流中element的原有時間戳將被TimestampAssigner重寫。相似地,Watermark也會一樣被重寫。

 

1.2 Timestamp Assigner / Watermark Generators

Timestamp Assigner接收一個流而且產生一個帶有時間戳賦值elementWatermark的新的流。若是原有的流已經擁有了時間戳或Watermark,則Timestamp Assigner將會重寫它們。

 

一般在緊接着數據源以後會定義Timestamp Assigner,但這並非嚴格要求的。例如在通用的模式中,會在Timestamp Assigner以前進行parse(MapFunction)filter(FilterFunction)操做。不論在什麼狀況下,Timestamp Assigner都須要在第一個使用事件時間的Operation(如第一個窗口Operation)以前定義。而在流Job中使用Kafka做爲數據源是一個特殊狀況,Flink容許在數據源(或數據消費者consumer)內部定義Timestamp AssignerWatermark emitter,更多相關信息請見Kafka Connector documentation

 

注意:本節餘下內容呈現了一個開發者建立本身的Timestamp Assigner watermark emitter所須要實現的主要接口。有關Flink自帶的預先實現的extractor,請見Pre-defined Timestamp Extractors / Watermark Emitters

 

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.
setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
  myFormat, myFilePath, FileProcessingMode.
PROCESS_CONTINUOUSLY, 100,
  FilePathFilter.
createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
  .
filter( event -> event.severity() == WARNING )
  .
assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
  .
keyBy( (event) -> event.getGroup() )
  .
timeWindow(Time.seconds(10))
  .
reduce( (a, b) -> a.add(b) )
  .
addSink(...);

 

週期性Watermark

AssignerWithPeriodicWatermark賦值時間戳並週期性生成(生成方式有多是依靠流的element,或者純粹基於處理時間)。

 

生成Watermark的時間週期區間(每n毫秒)的大小能夠經過ExecutionConfig.setAutoWatermarkInterval(…)設置。每一次生成時,都將會調用AssignergetCurrentWatermark()方法,若是返回的Watermark是非null且大於前一個Watermark,則會發送一個新的Watermark

 

下面是兩個生成周期性WatermarkTimestamp Assigner的例子

/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
public class
BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

private final long maxOutOfOrderness = 3500; // 3.5 seconds

private long currentMaxTimestamp;

@Override
public
long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  
long timestamp = element.getCreationTime();
  currentMaxTimestamp = Math.
max(timestamp, currentMaxTimestamp);
  return timestamp;
}

@Override
public Watermark
getCurrentWatermark() {
  
// return the watermark as current highest timestamp minus the out-of-orderness bound
  return new
Watermark(currentMaxTimestamp - maxOutOfOrderness);
}

}

/**
* This generator generates watermarks that are lagging behind processing time by a certain amount.
* It assumes that elements arrive in Flink after at most a certain time.
*/
public class
TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks<MyEvent> {

private final long maxTimeLag = 5000; // 5 seconds

@Override
public
long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.
getCreationTime();
}

@Override
public Watermark
getCurrentWatermark() {
        
// return the watermark as current time minus the maximum time lag
        return new
Watermark(System.currentTimeMillis() - maxTimeLag);
}

}

 

帶標點(punctuatedWatermark

爲了在某事件下就產生Watermark,咱們須要使用AssignerWithPunctuatedWatermarks。在該類中,Flink會先調用extractTimestamp(…)方法來給element賦值一個時間戳,而後針對該element即刻調用checkAndGetNextWatermark(…)方法來返回一個非nullWatermark

 

checkAndGetNextWatermark(…)方法將得到在extractTimestamp(…)方法中得到的時間戳,並決定是否生成Watermark。一旦checkAndGetNextWatermark(…)方法返回一個非nullWatermark,而且該Watermark大於最近的上一個Watermark,則發送該新的Watermark

 

public class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks<MyEvent> {

@Override
public
long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.
getCreationTime();
}

@Override
public Watermark
checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return element.
hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}

}

 

注意:在每一個事件上都生成一個Watermark是可能存在的,可是因爲每一個Watermark都會致使下游的計算開銷,過多的Watermark會下降程序的性能

相關文章
相關標籤/搜索