相信會看到這篇文章的都對Flink的時間類型(事件時間、處理時間、攝入時間)和Watermark有些瞭解,固然不瞭解能夠先看下官網的介紹:https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html html
這裏就會有這樣一個問題:FLink 是怎麼基於事件時間和Watermark處理遲到數據的呢?apache
在回答這個問題以前,建議你們能夠看下下面的Google 的三篇論文,關於流處理的模型:json
https://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf 《The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing》app
high-level的現代數據處理概念指引:less
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101ide
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102this
---------------------------進入正題--------------------------------idea
如今進入正題:FLink 是怎麼基於事件時間和Watermark處理遲到數據的呢?spa
這個問題能夠分紅兩個部分:debug
1. 基於事件時間建立Timestamp 和Watermark(後面會詳細介紹)
2. 處理遲到數據
1. 基於事件時間建立Timestamp 和Watermark
爲了方便查看,這裏使用 assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] 重載方法基於每一個事件生成水印代碼以下:
val input = env.addSource(source) .map(json => { // json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"} val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt = json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) // assign watermarks every event .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() { // check extractTimestamp emitted watermark is non-null and large than previously override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } // generate next watermark override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = { val eventTime = sdf.parse(element.createTime).getTime eventTime } })
```
擴展:數據在算子中是以StreamRecord 對象做爲流轉抽象結構以下:
public final class StreamRecord<T> extends StreamElement { /** The actual value held by this record. 具體數據*/ private T value; /** The timestamp of the record. 該數據對應的時間戳 */ private long timestamp; }
StreamElement 也是 Watermark 和 StreamStatus的父類,簡單來講就是Flink 承載消息的基類(這裏能夠指定,Watermark 是和事件一個級別的抽象,而Timestamp 是Watermark和事件的成員變量,表明Watermark和事件的時間)
```
assignTimestampsAndWatermarks 是基於事件的數據(extractTimestamp 方法中返回的Timestamp),替換StreamRecord 對象中的Timestamp和發出新的Watermark(若是當前事件的Timestamp 生成的Watermark大於上一次的Watermark)
下面咱們來debug這部分源碼:
首先在extractTimestamp 方法中添加斷點查看Timestamp 和Watermark的生成:
TimestampsAndPunctuatedWatermarksOperator.processElement(使用的類取決於assignTimestampsAndWatermarks 方法的參數) 中處理事件的Timestamp和對應的Watermark
StreamRecord對象的建立在 StreamSourceContexts.processAndCollectWithTimestamp 中,使用的Timestamp 是數據在kafka的時間,在KafkaFetcher.emitRecord方法中從consumerRecord中獲取:
KafkaFetcher.emitRecord 發出從kafka中消費到的數據:
protected void emitRecord( T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord<?, ?> consumerRecord) throws Exception { emitRecordWithTimestamp(record, partition, offset, consumerRecord.timestamp()); }
StreamSourceContexts.processAndCollectWithTimestamp 建立StreamRecord 對象
protected void processAndCollectWithTimestamp(T element, long timestamp) { output.collect(reuse.replace(element, timestamp)); // 放入真正的事件時間戳 }
下面咱們來看 TimestampsAndPunctuatedWatermarksOperator.processElement 的源碼
@Override public void processElement(StreamRecord<T> element) throws Exception { // 獲取這條數據 final T value = element.getValue(); // userFunction 就是代碼裏面建立的匿名類 AssignerWithPunctuatedWatermarks // 調用extractTimestamp,獲取新的Timestamp // element.hasTimestamp 有的話就用,沒有就給默認值long類型 的最小值 final long newTimestamp = userFunction.extractTimestamp(value, element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); // 使用新的Timestamp 替換StreamRecord 舊的Timestamp output.collect(element.replace(element.getValue(), newTimestamp)); // 獲取下一個Watermark,調用實現的 checkAndGetNextWatermark 方法 final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp); // 若是新的Watermark 大於上一個Watermark 就發出新的 if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) { currentWatermark = nextWatermark.getTimestamp(); output.emitWatermark(nextWatermark); } }
至此Timestamp和Watermark的建立(或者說生成)就行了
2. Flink 處理遲到數據
爲了演示這個功能,在上面的程序中添加了window算子和遲到數據側邊輸出的方法 sideOutputLateData,爲了方便查看,這裏再添加一次所有代碼
val source = new FlinkKafkaConsumer[ObjectNode]("late_data", new JsonNodeDeserializationSchema(), Common.getProp) // 側邊輸出的tag val late = new OutputTag[LateDataEvent]("late") val input = env.addSource(source) .map(json => { // json : {"id" : 0, "createTime" : "2019-08-24 11:13:14.942", "amt" : "9.8"} val id = json.get("id").asText() val createTime = json.get("createTime").asText() val amt = json.get("amt").asText() LateDataEvent("key", id, createTime, amt) }) // assign watermarks every event .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[LateDataEvent]() { // check extractTimestamp emitted watermark is non-null and large than previously override def checkAndGetNextWatermark(lastElement: LateDataEvent, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp) } // generate next watermark override def extractTimestamp(element: LateDataEvent, previousElementTimestamp: Long): Long = { val eventTime = sdf.parse(element.createTime).getTime eventTime } }) // after keyBy will have window number of different key .keyBy("key") .window(TumblingEventTimeWindows.of(Time.minutes(1))) // get lateData .sideOutputLateData(late) .process(new ProcessWindowFunction[LateDataEvent, LateDataEvent, Tuple, TimeWindow] { // just for debug window process late data override def process(key: Tuple, context: Context, elements: Iterable[LateDataEvent], out: Collector[LateDataEvent]): Unit = { // print window start timestamp & end timestamp & current watermark time println("window:" + context.window.getStart + "-" + context.window.getEnd + ", currentWatermark : " + context.currentWatermark) val it = elements.toIterator while (it.hasNext) { val current = it.next() out.collect(current) } } }) // print late data input.getSideOutput(late).print("late:") input.print("apply:") env.execute("LateDataProcess")
代碼邏輯很簡單,主要是爲了加入window算子,process算子是爲了方便debug到window算子中
下面開始debug源碼:
在process 方法中添加斷點:
此次直接從window算子接收上游發過來的數據開始看起:
StreamInputProcessor.processInput方法負責將接收到的事件(數據、Watermark、StreamStatus、LatencyMarker),反序列化爲 StreamElement(上文已經說得了,是事件抽象的基類),判斷具體是那種消息,分別進行處理
public boolean processInput() throws Exception { while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } }
注:代碼比較長,挑選了跟此次主題相關的部分
Watermark:
數據:
這裏咱們主要看數據的處理邏輯:
// now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { // metric 的Counter,統計有多少條數據進來 numRecordsIn.inc(); // 選擇當前的key(相似與數據分區,每一個key一個,裏面存儲本身的states) streamOperator.setKeyContextElement1(record); // 真正在進到WindowOperator 中處理數據了 streamOperator.processElement(record); }
就到了 WindowOperator.processElement 方法(主要判斷邏輯都在這裏)
// 判斷windowAssigner 是否是MergingWindowAssigner if (windowAssigner instanceof MergingWindowAssigner)
區分開會話窗口和滑動、跳動窗口的處理邏輯,會話窗口的各個key的窗口是不對齊的
直接到 else部分:
} else { for (W window: elementWindows) { // drop if the window is already late 判斷窗口數據是否遲到 // 是,就直接跳過這條數據,從新處理下一條數據 if (isWindowLate(window)) { continue; }
PS: 寫了這麼久,終於到遲到數據處理的地方了 -_-
下面看下 isWindowLate 部分的處理邏輯:
/** * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness * of the given window. */ protected boolean isWindowLate(W window) { // 只有事件時間下,而且 窗口元素的最大時間 + 容許遲到時間 <= 當前Watermark 的時候爲true(即當前窗口元素遲到了) return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark())); } /** * Returns the cleanup time for a window, which is * {@code window.maxTimestamp + allowedLateness}. In * case this leads to a value greater than {@link Long#MAX_VALUE} * then a cleanup time of {@link Long#MAX_VALUE} is * returned. * 返回窗口的cleanup 時間, 窗口的最大時間 + 容許延遲的時間 * @param window the window whose cleanup time we are computing. */ private long cleanupTime(W window) { if (windowAssigner.isEventTime()) { long cleanupTime = window.maxTimestamp() + allowedLateness; return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; } else { return window.maxTimestamp(); } }
看一條正常到達的數據:
{"id" : 891, "createTime" : "2019-08-24 17:51:44.152", "amt" : "5.6"}
891 這條數據的事件時間是:2019-08-24 17:51:44.152 ,1 分鐘的整分窗口,這條數據對應的窗口就是: [2019-08-24 17:51:00.000, 2019-08-24 17:52:000) ,對應的時間戳是 : [1566640260000, 1566640320000) ,當前的Watermark 是 : 1566640294102,窗口數據的最大時間戳大於 當前的Watermark, 不是遲到數據,不跳過。
如今在來看一條遲到的數據:
{"id" : 892, "createTime" : "2019-08-24 17:51:54.152", "amt" : "3.6"}
892 這條數據的事件時間是:2019-08-24 17:51:54.152 ,1 分鐘的整分窗口,這條數據對應的窗口就是: [2019-08-24 17:51:00.000, 2019-08-24 17:52:000) ,對應的時間戳是 : [1566640260000, 1566640320000) ,當前的Watermark 是 : 1566652224102 ,窗口數據的最大時間戳小於 當前的Watermark, 數據是遲到數據,跳過。
上面就是窗口對遲到數據的處理源碼dubug了,到這裏就已經講完Flink 處理遲到數據的兩個部分:
1. 基於事件時間建立Timestamp 和Watermark(後面會詳細介紹) 2. 窗口處理遲到數據
注: 這裏加上「窗口」,明確是window 算子作的這些事情
下面在來看下窗口遲到輸出的SideOutput ,源碼在:WindowOperator.processElement 方法的最後一段:
// side output input event if 事件時間 // element not handled by any window 沒有window處理過這條數據,上面isSkippedElement 默認值爲true,若是上面判斷爲遲到數據,isSkippedElement就會爲false // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { // 設置了 lateDataOutputTag 即window 算子後面的 .sideOutputLateData(late) if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } /** * Decide if a record is currently late, based on current watermark and allowed lateness. * 事件時間,而且 元素的時間戳 + 容許延遲的時間 <= 當前watermark 是爲true * @param element The element to check * @return The element for which should be considered when sideoutputs */ protected boolean isElementLate(StreamRecord<IN> element){ return (windowAssigner.isEventTime()) && (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark()); } /** * Write skipped late arriving element to SideOutput. * * @param element skipped late arriving element to side output */ protected void sideOutput(StreamRecord<IN> element){ output.collect(lateDataOutputTag, element); }
搞定
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文