自定義Trigger
class Trigger2 extends Trigger[SensorReading, TimeWindow] {
// 每來一條數據,就會執行onElement方法
override def onElement(element: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 經過Trigger.TriggerContext對象建立一個狀態變量,有則獲取,無則建立
val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
// 使用從當前數據提取的時間戳,來建立一個計時器
ctx.registerEventTimeTimer(timestamp + 1000L) // 建立定時器
// TriggerResult.CONTINUE表示這條數據達到以後,對應的窗口什麼都不作
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
// 定義watermark到達計時器時間時要執行的操做
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 表示窗口觸發操做,可是不會關閉窗口
TriggerResult.FIRE
}
// 清除狀態
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
sumValue.clear()
}
}
- Trigger中能夠定義狀態,狀態的做用域是當前的窗口,也就是說狀態只屬於一個單獨窗口,processsWindowFunction函數中的狀態定義也是如此
- 觸發器中定義的狀態變量必定要手動清理,不然在窗口關閉以後,Flink應用不會去清理觸發器中以前全部關閉窗口對應的狀態