flink 自定義觸發器 定時或達到數量觸發

觸發器肯定窗口(由窗口分配程序造成)什麼時候準備由窗口函數處理。每一個WindowAssigner都帶有一個默認觸發器。
若是默認觸發器不適合需求,咱們就須要自定義觸發器。java

主要方法

觸發器接口有五種方法,容許觸發器對不一樣的事件做出反應apache

  1. onElement()添加到每一個窗口的元素都會調用此方法。
  2. onEventTime()當註冊的事件時間計時器觸發時,將調用此方法。
  3. onProcessingTime()當註冊的處理時間計時器觸發時,將調用此方法。
  4. onMerge()與有狀態觸發器相關,並在兩個觸發器對應的窗口合併時合併它們的狀態,例如在使用會話窗口時。(目前沒使用過,瞭解很少)
  5. clear()執行刪除相應窗口時所需的任何操做。(通常是刪除定義的狀態、定時器等)

TriggerResult

onElement(),onEventTime(),onProcessingTime()都要求返回一個TriggerResultwindows

TriggerResult包含如下內容api

  1. CONTINUE:表示啥都不作。
  2. FIRE:表示觸發計算,同時保留窗口中的數據
  3. PURGE:簡單地刪除窗口的內容,並保留關於窗口和任何觸發器狀態的任何潛在元信息。
  4. FIRE_AND_PURGE:觸發計算,而後清除窗口中的元素。(默認狀況下,預先實現的觸發器只觸發而不清除窗口狀態。)

案例

  • 需求
  1. 當窗口中的數據量達到必定數量的時候觸發計算
  2. 根據執行時間每隔必定時間且窗口中有數據觸發計算,若是沒有數據不觸發計算
  3. 窗口關閉的時候清除數據

實現過程

案例邏輯圖.png

  • 依賴
<properties>
        <hadoop.version>3.1.1.3.1.0.0-78</hadoop.version>
        <flink.version>1.9.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.7</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>
  • 實現代碼
//調用
dStream
      .keyBy(_.event_id)
      .window(TumblingEventTimeWindows.of(Time.hours(1)))
      .trigger(new CustomTrigger(10, 1 * 60 * 1000L))

//-------------------------------------------------------------------------
package com.meda.demo

import java.text.SimpleDateFormat

import com.meda.utils.DatePattern
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

class CustomTrigger extends Trigger[eventInputDT, TimeWindow] {
  //觸發計算的最大數量
  private var maxCount: Long = _
  //定時觸發間隔時長 (ms)
  private var interval: Long = 60 * 1000
  //記錄當前數量的狀態
  private lazy val countStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("counter", new Sum, classOf[Long])
  //記錄執行時間定時觸發時間的狀態
  private lazy val processTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("processTimer", new Update, classOf[Long])
  //記錄時間時間定時器的狀態
  private lazy val eventTimerStateDescriptor: ReducingStateDescriptor[Long] = new ReducingStateDescriptor[Long]("eventTimer", new Update, classOf[Long])

  def this(maxCount: Int) {
    this()
    this.maxCount = maxCount
  }

  def this(maxCount: Int, interval: Long) {
    this(maxCount)
    this.interval = interval
  }

  override def onElement(element: eventInputDT, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    val countState = ctx.getPartitionedState(countStateDescriptor)
    //計數狀態加1
    countState.add(1L)

    //若是沒有設置事件時間定時器,須要設置一個窗口最大時間觸發器,這個目的是爲了在窗口清除的時候 利用時間時間觸發計算,不然可能會缺乏部分數據
    if (ctx.getPartitionedState(eventTimerStateDescriptor).get() == 0L) {
      ctx.getPartitionedState(eventTimerStateDescriptor).add(window.maxTimestamp())
      ctx.registerEventTimeTimer(window.maxTimestamp())
    }

    if (countState.get() >= this.maxCount) {
      //達到指定指定數量
      //刪除事件時間定時觸發的狀態
      ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
      //清空計數狀態
      countState.clear()
      //觸發計算
      TriggerResult.FIRE
    } else if (ctx.getPartitionedState(processTimerStateDescriptor).get() == 0L) {
      //未達到指定數量,且沒有指定定時器,須要指定定時器
      //當前定時器狀態值加上間隔值
      ctx.getPartitionedState(processTimerStateDescriptor).add(ctx.getCurrentProcessingTime + interval)
      //註冊定執行時間定時器
      ctx.registerProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
      TriggerResult.CONTINUE
    } else {
      TriggerResult.CONTINUE
    }
  }

  // 執行時間定時器觸發
  override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    if (ctx.getPartitionedState(countStateDescriptor).get() > 0 && (ctx.getPartitionedState(processTimerStateDescriptor).get() == time)) {
      println(s"數據量未達到 $maxCount ,由執行時間觸發器 ctx.getPartitionedState(processTimerStateDescriptor).get()) 觸發計算")
      ctx.getPartitionedState(processTimerStateDescriptor).clear()
      ctx.getPartitionedState(countStateDescriptor).clear()
      TriggerResult.FIRE
    } else {
      TriggerResult.CONTINUE
    }
  }

  //事件時間定時器觸發
  override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
    if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() > 0L)) { //還有未觸發計算的數據
      println(s"事件時間到達最大的窗口時間,而且窗口中還有未計算的數據:${ctx.getPartitionedState(countStateDescriptor).get()},觸發計算並清除窗口")
      ctx.getPartitionedState(eventTimerStateDescriptor).clear()
      TriggerResult.FIRE_AND_PURGE
    } else if ((time >= window.maxTimestamp()) && (ctx.getPartitionedState(countStateDescriptor).get() == 0L)) { //沒有未觸發計算的數據
      println("事件時間到達最大的窗口時間,可是窗口中沒有有未計算的數據,清除窗口 可是不觸發計算")
      TriggerResult.PURGE
    } else {
      TriggerResult.CONTINUE

    }
  }

  //窗口結束時清空狀態
  override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
    // println(s"清除窗口狀態,定時器")
    ctx.deleteEventTimeTimer(ctx.getPartitionedState(eventTimerStateDescriptor).get())
    ctx.deleteProcessingTimeTimer(ctx.getPartitionedState(processTimerStateDescriptor).get())
    ctx.getPartitionedState(processTimerStateDescriptor).clear()
    ctx.getPartitionedState(eventTimerStateDescriptor).clear()
    ctx.getPartitionedState(countStateDescriptor).clear()
  }

  //更新狀態爲累加值
  class Sum extends ReduceFunction[Long] {
    override def reduce(value1: Long, value2: Long): Long = value1 + value2
  }

  //更新狀態爲取新的值
  class Update extends ReduceFunction[Long] {
    override def reduce(value1: Long, value2: Long): Long = value2
  }

}

留下的疑問:
以前看資料的時候好像說定時器只能設置一個,你設置多個它也只會選擇一個執行。
可是我這裏事件、執行時間定時器都設置,好像都生效了。這點還沒看懂。
後續研究下啥狀況。ide

本文爲我的原創文章,轉載請註明出處。!!!!函數

相關文章
相關標籤/搜索