Flink自定義窗口Trigger

自定義Trigger

class Trigger2 extends Trigger[SensorReading, TimeWindow] {

    // 每來一條數據,就會執行onElement方法
    override def onElement(element: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      // 經過Trigger.TriggerContext對象建立一個狀態變量,有則獲取,無則建立
      val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
      // 使用從當前數據提取的時間戳,來建立一個計時器
      ctx.registerEventTimeTimer(timestamp + 1000L) // 建立定時器
      // TriggerResult.CONTINUE表示這條數據達到以後,對應的窗口什麼都不作
      TriggerResult.CONTINUE
    }

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
    // 定義watermark到達計時器時間時要執行的操做
    override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
      // 表示窗口觸發操做,可是不會關閉窗口
      TriggerResult.FIRE

    }
    // 清除狀態
    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
      val sumValue: ValueState[Int] = ctx.getPartitionedState(new ValueStateDescriptor[Int]("sumValue", classOf[Int]))
      sumValue.clear()
    }
  }
  • Trigger中能夠定義狀態,狀態的做用域是當前的窗口,也就是說狀態只屬於一個單獨窗口,processsWindowFunction函數中的狀態定義也是如此
  • 觸發器中定義的狀態變量必定要手動清理,不然在窗口關閉以後,Flink應用不會去清理觸發器中以前全部關閉窗口對應的狀態
相關文章
相關標籤/搜索