window+watermark 來處理亂序數據
對於 TumblingEventTimeWindows
window 的元數據startTime,endTime
和程序啓動時間無關,當你指定出 window.size 時, window的startTime,endTime
就分配好了java
allowedLateness 來處理遲到的數據
至關於延遲了window 的生命週期, 【startTime,endTime) -> [startTime,endTime+ allowedLateness]apache
sideOutput 是最後的兜底策略, 當window 的生命週期結束後, 延遲的數據能夠經過側輸出收集起來,自定義後續的處理流程api
import java.util.Date import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger object LastElement { case class Goods(var id: Int = 0, var count: Int = 0, var time: Long = 0L) { override def toString: String = s"Goods(id=$id,count=$count,time=$time)" } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 建立延遲數據 OutputTag, 標記爲 late-data val lateOutputTag = OutputTag[Goods]("late-data") val stream = env .socketTextStream("localhost", 9999) .filter(_.nonEmpty) .map(x => { val arr = x.split(",") Goods(arr(0).toInt, arr(1).toInt, arr(2).toLong) // id,count,time }) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Goods] { val maxOutOfOrderness = 2L // 最大無序數據到達的時間,用來生成水印2ms var currentMaxTimestamp: Long = _ override def getCurrentWatermark: Watermark = { new Watermark(currentMaxTimestamp - maxOutOfOrderness) } override def extractTimestamp(element: Goods, previousElementTimestamp: Long): Long = { currentMaxTimestamp = Math.max(element.time, currentMaxTimestamp) element.time } }) val streamFunc = stream .keyBy(_.id) .timeWindow(Time.milliseconds(10)) .trigger(EventTimeTrigger.create()) .allowedLateness(Time.milliseconds(3)) // 容許延時的最大時間 .sideOutputLateData(lateOutputTag) // 對延時數據進行標記 .reduce { (v1, v2) => Goods(v1.id, v1.count + v2.count, v2.time) } // lateOutputTag 從窗口結果中獲取遲到數據局產生的統計結果 val lateStream = streamFunc.getSideOutput(lateOutputTag) stream .print() streamFunc .map(("_________sum: ", _)) .print() lateStream .map(("+++++++++++late: ", _)) .print() env.execute(this.getClass.getSimpleName) } }
input:socket
1,1,0 1,1,9 1,2,10 1,1,5 1,2,11 1,1,8 1,2,13 1,1,2 1,2,17 1,1,3 1,3,20 1,3,21
output:ide
Goods(id=1,count=1,time=0) Goods(id=1,count=1,time=9) Goods(id=1,count=2,time=10) Goods(id=1,count=1,time=5) Goods(id=1,count=2,time=11) (_________sum: ,Goods(id=1,count=3,time=5)) Goods(id=1,count=1,time=8) (_________sum: ,Goods(id=1,count=4,time=8)) Goods(id=1,count=2,time=13) Goods(id=1,count=1,time=2) (_________sum: ,Goods(id=1,count=5,time=2)) Goods(id=1,count=2,time=17) Goods(id=1,count=1,time=3) (+++++++++++late: ,Goods(id=1,count=1,time=3)) Goods(id=1,count=3,time=20) Goods(id=1,count=3,time=21) (_________sum: ,Goods(id=1,count=8,time=17))
分析:測試
1,1,0 // win1 start 1,1,9 // win1 end 注意此時win1 沒有關閉 1,2,10 // win2 start 1,1,5 // win1 這一條數據屬於win1無序的數據,此時 watermark=7 < win1.endTime=9. 1,2,11 // win2 && win1 觸發計算,緣由是 watermark=9 >= win1.endTime=9 && win1中有數據。若是沒有 allowedLateness(3ms)的話此時就已經關閉 win1 了,可是有延時3ms 因此尚未關閉 1,1,8 // win1 因爲有 allowedLateness(3ms),這一條數據屬於win1無序的數據,並觸發 update;而不是 win1的 sideOutput 數據 1,2,13 // win2 && win1 處於 close 邊緣,win1 真正的生命週期從 [0,9+2) -> [0,9+2+3] 1,1,2 // win1 allowedLateness(3ms) 致使 update 1,2,17 // win2 && win1 close 1,1,3 // win1 此時win1 已經close, 這條數據屬於win1 的 sideOutput 1,3,20 // win3 start 1,3,21 // win3 && win2 觸發計算 // 因此最後的結果: win1: 1,5,2 + sideOutPut: 1,1,3 win2: 1,8,17 win3: 1,6,21