Flink Table API和SQL提供了一批用於數據轉換的內置函數,也是在平常開發過程當中最經常使用、最重要的一個點html
SQL中支持不少的函數,Table API和SQL都已經作了實現,基本經常使用的都已經全覆蓋,通常能夠不用本身寫方法java
像sql裏面比較用的: =, <>, >, >=, <=,is,is not,BETWEEN,EXISTS,IN等等這種操做符基本都覆蓋 邏輯類的: or,and,is FALSE 計算類的: +,-,*,/,POWER,ABS, 字符類的: || ,upper,lower,LTRIM 聚合類的: count(*),count(1),avg,sum,max,min,rank
最全的官網已經所有列出來了,能夠直接用: https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/functions/systemFunctions.htmlsql
但一些特殊場景可能內置的這些函數不能知足需求,這時候咱們可能須要本身去寫,這時候Flink提供了自定義的函數(UDF)apache
用戶自定義函數(User-defined functions, udf)是一個重要的特性,它們顯著擴展了查詢的表達能力
在大多數狀況下,用戶定義的函數必須先註冊,而後才能在查詢中使用
用戶經過調用registerFunction()方法在TableEnvironment中註冊.當用戶定義的函數被註冊時,它被插入到TableEnvironment的函數目錄中,這樣Table API或SQL解析器就能夠識別並正確的解釋它
Flink提供了3大類內置函數api
傳入1個或多個字段,返回一個值,相似map操做
用戶定義的標量函數,能夠將0、1或多個標量值,映射到新的標量值
爲了定義標量函數,必須在org.apache.flink.table.functions中擴展基類Scalar Function,並實現(一個或多個)求值(eval)方法數據結構
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row object ScalarFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置1個併發 //設置處理時間爲流處理的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //設置環境信息(能夠不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的時候默認是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 設置flink table運行環境 val tableEnv = StreamTableEnvironment.create(env, settings) //流轉換成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //若是要看效果,能夠直接打印出來 // sensorTables.toAppendStream[Row].print("sensorTables: ") //調用自定義UDF函數,對id進行hash運算 //1. table api實現 // 首先須要new一個實例 val hashCode = new HashCode(1) val resultTable = sensorTables .select('id,'ts,hashCode('id)) // resultTable.toAppendStream[Row].print("resultTable: ") /**輸出效果: * resultTable: > sensor1,2020-12-13T13:53:57.630,1980364880 resultTable: > sensor2,2020-12-13T13:53:57.632,1980364881 resultTable: > sensor3,2020-12-13T13:53:57.632,1980364882 resultTable: > sensor4,2020-12-13T13:53:57.632,1980364883 resultTable: > sensor4,2020-12-13T13:53:57.632,1980364883 resultTable: > sensor4,2020-12-13T13:53:57.633,1980364883 */ //2. 用sql來實現,須要先在環境中註冊好udf函數 tableEnv.createTemporaryView("sensor",sensorTables) tableEnv.registerFunction("hashCode", hashCode) val sqlResultTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor") sqlResultTable.toRetractStream[Row].print("sqlResultTable") env.execute() } } //自定義一個標量函數 class HashCode(factor: Int) extends ScalarFunction{ def eval(s :String): Int={ s.hashCode * factor - 11111 } }
代碼結構及運行效果併發
若是 標量函數是輸入一行輸出一個值得話,那表函數就是輸入一行,輸出獲得了一張表,一對多,相似側寫函數maven
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.types.Row object TableFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置1個併發 //設置處理時間爲流處理的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //設置環境信息(能夠不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的時候默認是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 設置flink table運行環境 val tableEnv = StreamTableEnvironment.create(env, settings) //流轉換成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //若是要看效果,能夠直接打印出來 // sensorTables.toAppendStream[Row].print("sensorTables: ") //調用自定義UDF函數,先實例化,定義以_爲分隔符 val split = new Split("_") val resultTable = sensorTables .joinLateral(split('id) as ('word, 'length)) //作個關聯,以id做爲key,拿到1個元組,定義爲world和length名字 .select('id,'ts,'word,'length) // resultTable.toRetractStream[Row].print("resultTable") /** 輸出效果: * resultTable> (true,sensor1,2020-12-13T14:43:01.121,sensor1,7) resultTable> (true,sensor2,2020-12-13T14:43:01.124,sensor2,7) resultTable> (true,sensor3,2020-12-13T14:43:01.125,sensor3,7) resultTable> (true,sensor4,2020-12-13T14:43:01.125,sensor4,7) resultTable> (true,sensor4,2020-12-13T14:43:01.125,sensor4,7) resultTable> (true,sensor4,2020-12-13T14:43:01.126,sensor4,7) */ //2. 用sql實現 tableEnv.createTemporaryView("sensor", sensorTables) tableEnv.registerFunction("split", split) val sqlResultTables = tableEnv.sqlQuery( """ |select |id,ts,word,length |from sensor,lateral table( split(id)) as splitid(word,length) |""".stripMargin) sqlResultTables.toRetractStream[Row].print("sqlResultTables") env.execute() } } //自定義一個UDF函數 //定義以傳入的字符串做爲分隔符,定義輸出一個元祖,String和Int class Split(separator: String) extends TableFunction[(String,Int)]{ def eval(str:String):Unit={ str.split(separator).foreach( wold => collect((wold, wold.length)) ) } }
代碼結構及運行效果:ide
用戶自定義聚合函數(User-Defined Aggregate Functions,UDAGGs)能夠把一個表中的數據,聚合成一個標量值函數
舉個栗子,要算全部傳感器,每一個傳感器的平均值,分別用tableapi和sql來實現,新建一個AggregateFunctionTest.scala
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.types.Row object AggregateFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置1個併發 //設置處理時間爲流處理的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //設置環境信息(能夠不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的時候默認是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 設置flink table運行環境 val tableEnv = StreamTableEnvironment.create(env, settings) //流轉換成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //table api實現: val avgTemp = new AggTemp() val resultTable = sensorTables .groupBy('id) .aggregate(avgTemp('temperature) as 'tempAvg) .select('id,'tempAvg) resultTable.toRetractStream[Row].print("resultTable") //sql實現 //註冊表 tableEnv.createTemporaryView("sensor", sensorTables) //註冊函數 tableEnv.registerFunction("avgTemp", avgTemp) val sqlResult = tableEnv.sqlQuery( """ |select id,avgTemp(temperature) as tempAvg |from sensor |group by id |""".stripMargin ) sqlResult.toRetractStream[Row].print("sqlResult") env.execute() } } //定義一個類,存儲聚合狀態,若是不設置,在AggregateFunction 傳入的第二個值就是(Double, Int) 溫度的總數和溫度的數量 class AggTempAcc{ var sum: Double = 0.0 var count: Int = 0 } //自定義一個聚合函數,求每一個傳感器的平均溫度值,保存狀態(tempSum,tempCount) //傳入的第一個Double是最終的返回值,這裏求的是平均值,因此是Double //第二個傳入的是中間狀態存儲的值,須要求平均值,那就須要保存全部溫度加起來的總溫度和溫度的數量(多少個),那就是(Double,Int) // 若是不傳AggTempAcc ,那就傳入(Double,Int)同樣的效果 class AggTemp extends AggregateFunction[Double,AggTempAcc]{ override def getValue(acc: AggTempAcc): Double = acc.sum / acc.count // override def createAccumulator(): (Double, Int) = (0.0,0) override def createAccumulator(): AggTempAcc = new AggTempAcc //還要實現一個具體的處理計算函數, accumulate(父方法),具體計算的邏輯, def accumulate(acc:AggTempAcc, temp:Double): Unit={ acc.sum += temp acc.count += 1 } }
#表聚合函數(Table Aggregate Functions)
用戶定義的表聚合函數(User-Defined Table Aggregate Functions UDTAGGs),能夠把一個表中數據,聚合爲具備多行和多列的結果表
用戶定義表聚合函數,是經過繼承TablAggregateFunction 抽象類來實現的
輸入和輸出都是一張表,應用場景能夠用在相似top10等這種場景,要輸出多行值的狀況
AggregationFunction必需要實現的方法:
---- createAccumulator()
---- accumlate()
---- emitValue()
TableAggregateFunction的工做原理:
舉個栗子, 使用表聚合函數實現一個對全部傳感器top n的場景
package com.mafei.udftest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala._ import org.apache.flink.table.functions.TableAggregateFunction import org.apache.flink.types.Row import org.apache.flink.util.Collector object TableAggregateFunctionTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置1個併發 //設置處理時間爲流處理的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") // val inputStream = env.readTextFile("D:\\java2020_study\\maven\\flink1\\src\\main\\resources\\sensor.txt") //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //設置環境信息(能夠不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的時候默認是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 設置flink table運行環境 val tableEnv = StreamTableEnvironment.create(env, settings) //流轉換成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'timestamp, 'temperature, 'tp.proctime as 'ts) //一、使用table api方式實現 val top2Temp = new Top2Temp() val resultTable = sensorTables .groupBy('id) .flatAggregate(top2Temp('temperature) as ('temp, 'rank)) .select('id,'temp,'rank) // resultTable.toAppendStream[Row].print() //表聚合中間有更改,因此不能直接用toAppendStream resultTable.toRetractStream[Row].print("table aggregate") /** * 輸出效果: * (true,sensor1,1.0,1) (true,sensor1,-1.7976931348623157E308,2) (true,sensor2,42.0,1) (true,sensor2,-1.7976931348623157E308,2) (true,sensor3,43.0,1) (true,sensor3,-1.7976931348623157E308,2) (true,sensor4,40.1,1) (true,sensor4,-1.7976931348623157E308,2) (false,sensor4,40.1,1) (false,sensor4,-1.7976931348623157E308,2) (true,sensor4,40.1,1) (true,sensor4,20.0,2) (false,sensor4,40.1,1) (false,sensor4,20.0,2) (true,sensor4,40.2,1) (true,sensor4,40.1,2) */ env.execute("表聚合函數-取每一個傳感器top2") } } //定義要輸出的結構 class Top2TempAcc{ var highestTemp: Double = Double.MinValue var secondHighestTemp: Double = Double.MinValue } // 自定義表聚合函數,提取全部溫度值中最高的兩個溫度,輸出(temp,rank) class Top2Temp extends TableAggregateFunction[(Double,Int),Top2TempAcc]{ override def createAccumulator(): Top2TempAcc = new Top2TempAcc() //實現計算聚合結果的函數 accumulate // 第一個參數是 accumulate,第二個是當前作聚合傳入的參數是什麼,這裏只須要把溫度傳入就能夠(Double) def accumulate(acc: Top2TempAcc, temp : Double): Unit={ // 要判斷當前溫度值,是否比狀態中保存的溫度值大 //第一步先判斷溫度是否是比最大的都大 if(temp > acc.highestTemp){ //若是比最高溫度還高,那排在第一,原來的第一高移動到第二高 acc.secondHighestTemp = acc.highestTemp acc.highestTemp = temp } else if(temp > acc.secondHighestTemp){ //這種是比最高的小,比第二高的大,那就直接把第二高換成當前溫度值 acc.secondHighestTemp = temp } } //再實現一個輸出結果的方法,最終處理完表中全部數據時調用 def emitValue(acc: Top2TempAcc,out: Collector[(Double, Int)]): Unit ={ out.collect((acc.highestTemp,1)) out.collect((acc.secondHighestTemp,2)) } } sensor.txt內容: sensor1,1603766281,1 sensor2,1603766282,42 sensor3,1603766283,43 sensor4,1603766240,40.1 sensor4,1603766284,20 sensor4,1603766249,40.2
代碼結構及運行效果圖: