Flink從入門到真香(1六、Flink內置的一些Process Function)

以前的轉換算子是沒法訪問事件的時間戳信息和水位線信息的,而這在一些應用場景下,極爲重要。例如MapFunction這樣子的map轉換算子就沒法訪問時間戳或者當前事件的事件事件。java

基於此,DataStream API提供了一系列LOW-LEVEL的轉換算子調用。能夠訪問時間戳,watermark以及註冊定時事件,還能夠輸出特定的一些事件,例如超時時間等。
process function用來構建事件驅動的應用以及實現自定義的業務邏輯(使用以前的window函數和轉換算子沒法實現)。例如Flink SQL就是使用Process Function實現的。apache

Flink提供了8個 Process Function後端

ProcessFunction
KeyedProcessFunction
CoProcessFunction
ProcessJoinFunction
BroadcastProcessFunction
KeyedBroadcastProcessFunction
ProcessWindowFunction

下面幾個栗子來一一說明:api

栗子1- 實現一個連續15秒若是溫度持續上升就報警

package com.mafei.apitest

import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionTest {

  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setAutoWatermarkInterval(200) //直接全局設置watermark的時間爲200毫秒
//    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")

    //接收一個socket文本流
    val inputStream = env.socketTextStream("127.0.0.1",6666)

    env.setParallelism(1)

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })
      .keyBy(_.id)
//      .process(new TestKeydProcessFunction)  //demo
      .process(new TempIncreWarning(15000L))

    dataStream.print()
    env.execute("定時器KeydProcessFunction")

  }

}

/**
 * 定義3個參數: Key  由於上面是按照id作groupby的,因此是string
 * 輸入數據: SensorReadingTest5
 * 輸出數據: 這個直接定,能夠根據實際狀況來改
 */

class TempIncreWarning(alertInterval: Long) extends KeyedProcessFunction[String, SensorReadingTest5,String]{
  //定義狀態: 保存上一個溫度進行比較,保存註冊定時器的時間用於刪除
  lazy val lastTempValue: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempValue", classOf[Double]))

  //定時器時間戳
  lazy val timerTimestampState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTimestampState", classOf[Long]))

  //每條數據都會通過這個方法
  override def processElement(value: SensorReadingTest5, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#Context, out: Collector[String]): Unit = {
    //先把上一次的值,和定時器的時間給拿出來
    var lastTemp = lastTempValue.value()
    var timerTimestamp = timerTimestampState.value()

    //把上一次的值,設置成這一次的,用在下次調用
    lastTempValue.update(value.temperature)
    //用此次的溫度和上一次的溫度值作比較,若是比上次大,那說明在升溫
    if (value.temperature > lastTemp){
      //說明是第一次,沒有定時器被設定(定義的沒有默認值,長整型因此是0
      if (timerTimestamp == 0){
        val ts = ctx.timerService().currentProcessingTime() + alertInterval
        ctx.timerService().registerProcessingTimeTimer(ts)
        timerTimestampState.update(ts)
      }
    }else if( value.temperature <= lastTemp){ //若是溫度值沒有在上升,那就須要把這個定時器給銷燬掉,由於不知足15秒持續上升條件了
      ctx.timerService().deleteProcessingTimeTimer(timerTimestamp)
//      timerTimestampState.update(0L)// 能夠直接設置成0
      timerTimestampState.clear() //調用這個清空方法也是同樣的效果
    }

  }

  //定義觸發的時候實際要作的操做
  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#OnTimerContext, out: Collector[String]): Unit = {
    // ctx.getCurrentKey就是當前的sensor ID ,是上面.keyBy(_.id)這一步定義的
    out.collect("傳感器 "+ctx.getCurrentKey+"溫度持續了"+alertInterval +"秒在持續上升!!!!")
    timerTimestampState.clear() //已經觸發了,那須要把定時器的時間給清空掉
  }
}

/**
 * 在KeyedProcessFunction中,點進去就能夠看到要傳的3個參數
 * * @param <K> Type of the key.
 * * @param <I> Type of the input elements.
 * * @param <O> Type of the output elements.
 */
class TestKeydProcessFunction extends KeyedProcessFunction[String, SensorReadingTest5, String]{
//  var stateTest1: valueState[Int] = _

  override def processElement(value: SensorReadingTest5, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#Context, out: Collector[String]): Unit = {
//    ctx.output()    //定義一個側輸出流
    ctx.getCurrentKey // 獲取當前key, 跟從value中一個效果
    ctx.timerService().currentWatermark() //獲取當前水印
    ctx.timerService().currentProcessingTime() //當前處理時間
    ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 30000L) //註冊一個定時器到當前時間30秒以後
    ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() * 30000L) //跟上面同樣,換成processTime
    ctx.timerService().deleteEventTimeTimer(ctx.timestamp()+ 30000L) //刪除一個定時器,這裏的時間跟定義的時間要對的上,由於能夠註冊多個

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#OnTimerContext, out: Collector[String]): Unit = super.onTimer(timestamp, ctx, out)
}

代碼結構和運行效果:
Flink從入門到真香(1六、Flink內置的一些Process Function)socket

栗子2-實現一個若是溫度超過15,則輸出到主流上,不然輸出到側流上,實現一個分流操做

package com.mafei.apitest

import com.mafei.sinktest.SensorReadingTest5
import jdk.jfr.Threshold
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object SideOutputTest {
  def main(args: Array[String]): Unit = {
    //使用ProcessFunction,利用側輸出流實現一個分流操做
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setAutoWatermarkInterval(200) //直接全局設置watermark的時間爲200毫秒
    //    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")

    //接收一個socket文本流
    val inputStream = env.socketTextStream("127.0.0.1",6666)

    env.setParallelism(1)

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })
      //      .process(new TestKeydProcessFunction)  //demo
      val highTempStream = dataStream.process(new SplitTempProcess(10.1))
      highTempStream.print("this is high stream: ")

    /**
     * new OutputTag[(String,Long, Double)]("low")
     * 這裏[]內的是定義輸出的格式,根據實際狀況來改,()裏面的low是標籤的名稱
     */
    val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Long, Double)]("low"))
    lowTempStream.print("this is low stream: ")

    env.execute("side output test")
  }
}

//實現自定義的processFunction,利用側輸出流,進行分流操做
/**
 *
 * @param threshold
 * ProcessFunction傳2個參數,第一個是輸入的數據類型,第二個是輸出的數據類型,均可以自定義
 *
 *
 */
class SplitTempProcess(threshold: Double) extends ProcessFunction[SensorReadingTest5, SensorReadingTest5]{
  override def processElement(value: SensorReadingTest5, ctx: ProcessFunction[SensorReadingTest5, SensorReadingTest5]#Context, out: Collector[SensorReadingTest5]): Unit = {
    //若是溫度值大於設置的閾值,那直接輸出
    if (value.temperature > threshold){
      out.collect(value)
    }else{   //若是小於等於就輸出到側輸出流
      /**
       * 這裏側輸出流的定義必須數據類型和id都要跟上面對的上,low後邊的參數表明具體要輸出的數據,
       */
      ctx.output(new OutputTag[(String,Long, Double)]("low"),(value.id, value.timestamp, value.temperature))
    }

  }
}

代碼結構及運行效果:
Flink從入門到真香(1六、Flink內置的一些Process Function)maven

狀態後端

Flink提供多種狀態後端的存儲形式
1)MemoryStateBackend
內存級的狀態後端,會將鍵控狀態做爲內存中對象進行管理,將他們存儲在TaskManager的JVM堆上,而將checkpoint存儲在JobManager的內存中
特色: 快速、低延遲、單不穩定(不落盤固然快,可是掉電或者重啓進程之類的就沒了,一般用在測試)
FsStateBackend
將checkpoint存儲到遠程的持久化文件系統(FileSystem)上,而對於本地狀態,跟MemoryStateBackend同樣,也會存在TaskManager的JVM堆上
同時擁有內存級的本地訪問速度,和更好的容錯保證
RocksDBStateBackend
將全部狀態序列化後,存入本地的RocksDB中存儲
RocksDB的支持並不直接包含在flink中,須要單獨引入依賴:ide

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

在代碼中配置:函數

//    env.setStateBackend(new MemoryStateBackend())
//    env.setStateBackend(new FsStateBackend(""))
//    env.setStateBackend(new RocksDBStateBackend(""))
相關文章
相關標籤/搜索