觸發器肯定窗口(由窗口分配程序造成)什麼時候準備由窗口函數處理。每一個WindowAssigner都帶有一個默認觸發器。
若是默認觸發器不適合需求,咱們就須要自定義觸發器。java
觸發器接口有五種方法,容許觸發器對不一樣的事件做出反應apache
onElement()
添加到每一個窗口的元素都會調用此方法。onEventTime()
當註冊的事件時間計時器觸發時,將調用此方法。onProcessingTime()
當註冊的處理時間計時器觸發時,將調用此方法。onMerge()
與有狀態觸發器相關,並在兩個觸發器對應的窗口合併時合併它們的狀態,例如在使用會話窗口時。(目前沒使用過,瞭解很少)clear()
執行刪除相應窗口時所需的任何操做。(通常是刪除定義的狀態、定時器等)onElement(),onEventTime(),onProcessingTime()
都要求返回一個TriggerResultwindows
TriggerResult包含如下內容api
<properties> <hadoop.version>3.1.1.3.1.0.0-78</hadoop.version> <flink.version>1.9.1</flink.version> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.7</scala.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
//調用 dStream .keyBy(_.event_id) .window(TumblingEventTimeWindows.of(Time.hours(1))) .trigger(new CustomTrigger(10, 1 * 60 * 1000L)) //------------------------------------------------------------------------- package com.meda.demo import java.text.SimpleDateFormat import com.meda.utils.DatePattern import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.ReducingStateDescriptor import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult} import org.apache.flink.streaming.api.windowing.windows.TimeWindow class CustomTrigger extends Trigger[eventInputDT, TimeWindow] { //觸發計算的最大數量 private var maxCount: Long = _ //定時觸發間隔時長 (ms) private var interval: Long = 60 * 1000 //記錄當前數量的狀態 private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long]) //記錄執行時間定時觸發時間的狀態 private lazy val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long]) //記錄時間時間定時器的狀態 private lazy val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long]) def this(maxCount: Int) { this() this.maxCount = maxCount } def this(maxCount: Int, interval: Long) { this(maxCount) this.interval = interval } override def onElement(element: eventInputDT, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { val countState = ctx.getPartitionedState(countStateDescriptor) //計數狀態加1 countState.add(1L) //若是沒有設置事件時間定時器,須要設置一個窗口最大時間觸發器,這個目的是爲了在窗口清除的時候 利用時間時間觸發計算,不然可能會缺乏部分數據 if (ctx.getPartitionedState(eventTimerStateDescriptor).get() == 0L) { ctx.getPartitionedState(eventTimerStateDescriptor).add(window.maxTimestamp()) ctx.registerEventTimeTimer(window.maxTimestamp()) } if (countState.get() >= this.maxCount) { //達到指定指定數量 //刪除事件時間定時觸發的狀態 ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get()) //清空計數狀態 countState.clear() //觸發計算 TriggerResult.FIRE } else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) { //未達到指定數量,且沒有指定定時器,須要指定定時器 //當前定時器狀態值加上間隔值 ctx.getPartitionedState(processTimerStateDescriptor).add(ctx.getCurrentProcessingTime + interval) //註冊定執行時間定時器 ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get()) TriggerResult.CONTINUE } else { TriggerResult.CONTINUE } } // 執行時間定時器觸發 override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() == time)) { println(s"數據量未達到 $maxCount ,由執行時間觸發器 ctx.getPartitionedState(processTimerStateDescriptor).get()) 觸發計算") ctx.getPartitionedState(processTimerStateDescriptor).clear() ctx.getPartitionedState(countStateDescriptor).clear() TriggerResult.FIRE } else { TriggerResult.CONTINUE } } //事件時間定時器觸發 override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() > 0L)) { //還有未觸發計算的數據 println(s"事件時間到達最大的窗口時間,而且窗口中還有未計算的數據:${ctx.getPartitionedState(countStateDescriptor).get()},觸發計算並清除窗口") ctx.getPartitionedState(eventTimerStateDescriptor).clear() TriggerResult.FIRE_AND_PURGE } else if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() == 0L)) { //沒有未觸發計算的數據 println("事件時間到達最大的窗口時間,可是窗口中沒有有未計算的數據,清除窗口 可是不觸發計算") TriggerResult.PURGE } else { TriggerResult.CONTINUE } } //窗口結束時清空狀態 override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { // println(s"清除窗口狀態,定時器") ctx.deleteEventTimeTimer(ctx.getPartitionedState(eventTimerStateDescriptor).get()) ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get()) ctx.getPartitionedState(processTimerStateDescriptor).clear() ctx.getPartitionedState(eventTimerStateDescriptor).clear() ctx.getPartitionedState(countStateDescriptor).clear() } //更新狀態爲累加值 class Sum extends ReduceFunction[Long] { override def reduce(value1: Long, value2: Long): Long = value1 + value2 } //更新狀態爲取新的值 class Update extends ReduceFunction[Long] { override def reduce(value1: Long, value2: Long): Long = value2 } }
留下的疑問:
以前看資料的時候好像說定時器只能設置一個,你設置多個它也只會選擇一個執行。
可是我這裏事件、執行時間定時器都設置,好像都生效了。這點還沒看懂。
後續研究下啥狀況。ide
本文爲我的原創文章,轉載請註明出處。!!!!函數