以前的轉換算子是沒法訪問事件的時間戳信息和水位線信息的,而這在一些應用場景下,極爲重要。例如MapFunction這樣子的map轉換算子就沒法訪問時間戳或者當前事件的事件事件。java
基於此,DataStream API提供了一系列LOW-LEVEL的轉換算子調用。能夠訪問時間戳,watermark以及註冊定時事件,還能夠輸出特定的一些事件,例如超時時間等。
process function用來構建事件驅動的應用以及實現自定義的業務邏輯(使用以前的window函數和轉換算子沒法實現)。例如Flink SQL就是使用Process Function實現的。apache
Flink提供了8個 Process Function後端
ProcessFunction KeyedProcessFunction CoProcessFunction ProcessJoinFunction BroadcastProcessFunction KeyedBroadcastProcessFunction ProcessWindowFunction
下面幾個栗子來一一說明:api
package com.mafei.apitest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object ProcessFunctionTest { def main(args: Array[String]): Unit = { //建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.setAutoWatermarkInterval(200) //直接全局設置watermark的時間爲200毫秒 // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") //接收一個socket文本流 val inputStream = env.socketTextStream("127.0.0.1",6666) env.setParallelism(1) //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) .keyBy(_.id) // .process(new TestKeydProcessFunction) //demo .process(new TempIncreWarning(15000L)) dataStream.print() env.execute("定時器KeydProcessFunction") } } /** * 定義3個參數: Key 由於上面是按照id作groupby的,因此是string * 輸入數據: SensorReadingTest5 * 輸出數據: 這個直接定,能夠根據實際狀況來改 */ class TempIncreWarning(alertInterval: Long) extends KeyedProcessFunction[String, SensorReadingTest5,String]{ //定義狀態: 保存上一個溫度進行比較,保存註冊定時器的時間用於刪除 lazy val lastTempValue: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTempValue", classOf[Double])) //定時器時間戳 lazy val timerTimestampState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTimestampState", classOf[Long])) //每條數據都會通過這個方法 override def processElement(value: SensorReadingTest5, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#Context, out: Collector[String]): Unit = { //先把上一次的值,和定時器的時間給拿出來 var lastTemp = lastTempValue.value() var timerTimestamp = timerTimestampState.value() //把上一次的值,設置成這一次的,用在下次調用 lastTempValue.update(value.temperature) //用此次的溫度和上一次的溫度值作比較,若是比上次大,那說明在升溫 if (value.temperature > lastTemp){ //說明是第一次,沒有定時器被設定(定義的沒有默認值,長整型因此是0 if (timerTimestamp == 0){ val ts = ctx.timerService().currentProcessingTime() + alertInterval ctx.timerService().registerProcessingTimeTimer(ts) timerTimestampState.update(ts) } }else if( value.temperature <= lastTemp){ //若是溫度值沒有在上升,那就須要把這個定時器給銷燬掉,由於不知足15秒持續上升條件了 ctx.timerService().deleteProcessingTimeTimer(timerTimestamp) // timerTimestampState.update(0L)// 能夠直接設置成0 timerTimestampState.clear() //調用這個清空方法也是同樣的效果 } } //定義觸發的時候實際要作的操做 override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#OnTimerContext, out: Collector[String]): Unit = { // ctx.getCurrentKey就是當前的sensor ID ,是上面.keyBy(_.id)這一步定義的 out.collect("傳感器 "+ctx.getCurrentKey+"溫度持續了"+alertInterval +"秒在持續上升!!!!") timerTimestampState.clear() //已經觸發了,那須要把定時器的時間給清空掉 } } /** * 在KeyedProcessFunction中,點進去就能夠看到要傳的3個參數 * * @param <K> Type of the key. * * @param <I> Type of the input elements. * * @param <O> Type of the output elements. */ class TestKeydProcessFunction extends KeyedProcessFunction[String, SensorReadingTest5, String]{ // var stateTest1: valueState[Int] = _ override def processElement(value: SensorReadingTest5, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#Context, out: Collector[String]): Unit = { // ctx.output() //定義一個側輸出流 ctx.getCurrentKey // 獲取當前key, 跟從value中一個效果 ctx.timerService().currentWatermark() //獲取當前水印 ctx.timerService().currentProcessingTime() //當前處理時間 ctx.timerService().registerEventTimeTimer(ctx.timestamp()+ 30000L) //註冊一個定時器到當前時間30秒以後 ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() * 30000L) //跟上面同樣,換成processTime ctx.timerService().deleteEventTimeTimer(ctx.timestamp()+ 30000L) //刪除一個定時器,這裏的時間跟定義的時間要對的上,由於能夠註冊多個 } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReadingTest5, String]#OnTimerContext, out: Collector[String]): Unit = super.onTimer(timestamp, ctx, out) }
代碼結構和運行效果:socket
package com.mafei.apitest import com.mafei.sinktest.SensorReadingTest5 import jdk.jfr.Threshold import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object SideOutputTest { def main(args: Array[String]): Unit = { //使用ProcessFunction,利用側輸出流實現一個分流操做 //建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.getConfig.setAutoWatermarkInterval(200) //直接全局設置watermark的時間爲200毫秒 // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") //接收一個socket文本流 val inputStream = env.socketTextStream("127.0.0.1",6666) env.setParallelism(1) //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) // .process(new TestKeydProcessFunction) //demo val highTempStream = dataStream.process(new SplitTempProcess(10.1)) highTempStream.print("this is high stream: ") /** * new OutputTag[(String,Long, Double)]("low") * 這裏[]內的是定義輸出的格式,根據實際狀況來改,()裏面的low是標籤的名稱 */ val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Long, Double)]("low")) lowTempStream.print("this is low stream: ") env.execute("side output test") } } //實現自定義的processFunction,利用側輸出流,進行分流操做 /** * * @param threshold * ProcessFunction傳2個參數,第一個是輸入的數據類型,第二個是輸出的數據類型,均可以自定義 * * */ class SplitTempProcess(threshold: Double) extends ProcessFunction[SensorReadingTest5, SensorReadingTest5]{ override def processElement(value: SensorReadingTest5, ctx: ProcessFunction[SensorReadingTest5, SensorReadingTest5]#Context, out: Collector[SensorReadingTest5]): Unit = { //若是溫度值大於設置的閾值,那直接輸出 if (value.temperature > threshold){ out.collect(value) }else{ //若是小於等於就輸出到側輸出流 /** * 這裏側輸出流的定義必須數據類型和id都要跟上面對的上,low後邊的參數表明具體要輸出的數據, */ ctx.output(new OutputTag[(String,Long, Double)]("low"),(value.id, value.timestamp, value.temperature)) } } }
代碼結構及運行效果:maven
Flink提供多種狀態後端的存儲形式
1)MemoryStateBackend
內存級的狀態後端,會將鍵控狀態做爲內存中對象進行管理,將他們存儲在TaskManager的JVM堆上,而將checkpoint存儲在JobManager的內存中
特色: 快速、低延遲、單不穩定(不落盤固然快,可是掉電或者重啓進程之類的就沒了,一般用在測試)
FsStateBackend
將checkpoint存儲到遠程的持久化文件系統(FileSystem)上,而對於本地狀態,跟MemoryStateBackend同樣,也會存在TaskManager的JVM堆上
同時擁有內存級的本地訪問速度,和更好的容錯保證
RocksDBStateBackend
將全部狀態序列化後,存入本地的RocksDB中存儲
RocksDB的支持並不直接包含在flink中,須要單獨引入依賴:ide
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.12</artifactId> <version>1.10.1</version> </dependency>
在代碼中配置:函數
// env.setStateBackend(new MemoryStateBackend()) // env.setStateBackend(new FsStateBackend("")) // env.setStateBackend(new RocksDBStateBackend(""))