Flink提供了Table形式和DataStream兩種形式,能夠根據實際狀況本身選擇用哪些方式來實現,但實際開發過程當中可能會有需求兩種形式互相轉換,這裏介紹下操做方法java
表能夠轉換爲DataStream或DataSet,這樣自定義流處理或批處理程序就能夠繼續在Table API或SQL查詢的結果上運行了
將錶轉換爲DataStream或DataSet時,須要指定生成的數據類型,即要將表的每一行轉換成的數據類型
表做爲流式查詢的結果,是動態更新的
轉換有兩種轉換模式: 追加(Appende)模式和撤回(Retract)模式sql
查看執行計劃數據庫
Table API提供了一種機制來解釋計算表的邏輯和優化查詢計劃apache
查看執行計劃,能夠經過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成,返回一個字符串,描述三個計劃windows
優化的邏輯查詢計劃
優化後的邏輯查詢計劃
實際執行計劃api
val explaination: String = tableEnv.explain(resultTable)
println(explaination)併發
動態表(Dynamic Tables)
動態表是Flink對流數據的Table API和SQL支持的核心概念
與批處理數據的靜態表不一樣,動態表是隨時間變化的maven
持續查詢(Continuous Query)
動態表能夠像靜態的批處理表 同樣進行查詢,查詢一個動態表會產生持續查詢(Continuous Query)
連續查詢永遠不會終止,並會生成另外一個動態表
查詢會不斷更新其動態結果表,以反映其動態輸入表上的改動ide
動態表和持續查詢的轉換過程
函數
1) 首先輸入的流會被轉換爲動態表,這個動態表只會一直追加
2)對動態表計算連續查詢,生成新的動態表
針對以前查詢的結果加一個狀態,這樣子就不用每次從頭開始查詢,提高效率
3)生成的新的動態表被轉換成流而後輸出
爲了處理帶有關係查詢的流,必須先將其轉換爲表
從概念上講,流的每一個數據記錄,都被解釋爲對結果表的插入修改操做
持續查詢會在動態表上作計算處理,並做爲結果生成新的動態表
與常規的數據庫表同樣,動態表能夠經過插入(insert)、更新(update)和刪除(delete)更改,進行持續的修改
將動態錶轉換爲流或將其寫入外部系統時,須要對這些更改進行編碼
1,僅追加流(Append-only)
Retract操做,每新增一個是insert+操做,撤回一個是delete - 操做,
這裏當Mary第一次來的時候,會有一個insert,當Mary第二次來的時候會觸發兩個操做,Insert和delete,增長一個mary2, 刪掉mary1
時間特性(Time Attributes)
基於時間的操做(好比Table API和SQL中的窗口操做),須要定義相關的時間語義和事件數據來源的信息
Table能夠提供一個邏輯上的時間字段,用於在表處理程序中,指示時間和訪問相應的時間戳
時間屬性,能夠是每一個schema的一部分。一旦定義了時間屬性,它就能夠做爲一個字段引用,而且能夠在基於時間的操做中使用
時間屬性的行爲相似於常規時間戳,能夠訪問,而且進行計算
處理時間語義下,容許表處理程序根據機器的本地時間生成結果。他是時間最簡單的概念,既不須要提取時間戳,也不須要生成watermark
幾種定義的方法:
一、由DataStream轉換成表時指定(最簡單的一種)
在定義schema期間,能夠使用.proctime,指定字段名定義處理時間字段
這個proctime屬性只能經過附加邏輯字段,來拓展物理schema,所以只能在schema定義的末尾定義它
val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'temperature,'timestamp, 'pt.proctime)
增長pom.xml
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.10.1</version> </dependency>
package com.mafei.apitest.tabletest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.scala._ import org.apache.flink.types.Row object TimeAndWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) //設置1個併發 //設置處理時間爲流處理的時間 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //設置環境信息(能夠不用) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() // Flink 10的時候默認是用的useOldPlanner 11就改成了BlinkPlanner .inStreamingMode() .build() // 設置flink table運行環境 val tableEnv = StreamTableEnvironment.create(env, settings) //流轉換成表 val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp,'temperature,'pt.proctime) sensorTables.printSchema() sensorTables.toAppendStream[Row].print() env.execute() } }
代碼結構及運行效果:
第二種,定義處理時間(Processing Time)
val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) //由於txt裏頭是以,分割的跟csv同樣,因此能夠用oldCsv .withSchema(new Schema() //這個表結構要跟你txt中的內容對的上 .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("tem", DataTypes.DOUBLE()) .field("pt", DataTypes.TIMESTAMP(3)) .proctime() //須要注意,只有輸出的sink目標裏面實現了DefineRowTimeAttributes才能用,不然會報錯,文件中不能,但kafka中是能夠用的 ).createTemporaryTable("inputTable")
第三種,定義處理時間(Processing Time)另外一種實現,必須使用blink引擎
val sinkDDlL: String = """ |create table dataTable( | id varchar(20) not null | ts bigint, | temperature double, | pt AS PROCTIME() |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = '/sensor.txt', | 'format.type' = 'csv' |) |""".stripMargin tableEnv.sqlUpdate(sinkDDlL)
這種就不是flink從本地取處理時間了,而是取事件中的時間來處理
事件時間語義,容許表處理程序根據每一個記錄中包含的時間生成結果。這樣子即便在亂序事件或者延遲事件時,也能夠得到正確的結果。
爲了處理無序事件,並區分流中的準時和遲到事件;Flink須要從事件數據中,提取時間戳,並用來推動事件時間的進展
定義事件時間有3種方式
第一種,由DataStream轉換成表時指定
在DataStream轉換成Table,使用rowtime能夠定義事件時間屬性
//先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReadingTest5](Time.seconds(1000L)) { override def extractTimestamp(t: SensorReadingTest5): Long =t.timestamp * 1000L }) //指定watermark //流轉換成表,指定處理時間-上面的實現方式 // val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp,'temperature,'pt.proctime) //將DataStream轉換爲Table,並指定事件時間字段 val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.proctime,'temperature) //將DataStream轉換爲Table,並指定事件時間字段-直接追加字段 val sensorTables = tableEnv.fromDataStream(dataStream,"id","temperature","timestamp","rt".rowtime)
第二種,定義Table Schema時指定
val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt" tableEnv.connect(new FileSystem().path(filePath)) .withFormat(new Csv()) //由於txt裏頭是以,分割的跟csv同樣,因此能夠用oldCsv .withSchema(new Schema() //這個表結構要跟你txt中的內容對的上 .field("id", DataTypes.STRING()) .field("tem", DataTypes.DOUBLE()) .rowtime( new Rowtime() .timestampsFromField("timestamp") //從數據字段中提取時間戳 .watermarksPeriodicBounded(2000) //watermark延遲2秒 ) ).createTemporaryTable("inputTable")
在建立表的DDL中定義
//在建立表的DDL中定義 val sinkDDlL: String = """ |create table dataTable( | id varchar(20) not null | ts bigint, | temperature double, | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts)), | watermark for rt as rt - interval '1' second //基於ts減去1秒生成watermark,也就是watermark的窗口時1秒 |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = '/sensor.txt', | 'format.type' = 'csv' |) |""".stripMargin tableEnv.sqlUpdate(sinkDDlL)
時間語義須要配合窗口操做才能發揮真正的做用,
在Table ApI和SQL中,主要有兩種窗口
先定義組長什麼樣子,在根據key進行groupby,最後一步執行聚合函數
根據時間或行計數間隔,將行聚合到有限的組(Group)中,並對每一個組的數據執行一次聚合函數
Group Windows是使用window(w:GroupWindow)子句定義的,而且必須由as子句制定一個別名.
爲了按照窗口對錶進行分組,窗口的別名必須在group by子句中,像常規的分組字段同樣引用
val table = input
.window([w;: GroupWindow] as 'w) //定義窗口,別名爲w
.groupBy('w,'a) //按照字段a和窗口w分組
.select('a,'b.sum) //聚合操做
Table API提供了一組具備特定語義的預約義window類,這些類會被轉換爲底層DataStream或DataSet的窗口操做
滾動窗口要用Tumble類來定義
//定義事件時間的滾動窗口(Tumbling Event-time window
.window(Tumble over 10.minutes on 'rowtime as 'w)
//定義處理時間的滾動窗口(Tumbling Processing-time window)
.window(Tumble over 10.minutes on 'proctime as 'w)
//定義數據數量的滾動窗口(Tumbling Row-count window)
.window(Tumble over 10.rows on 'proctime as 'w)
10分鐘一個滑動窗口,每5分鐘滑動一次
//Sliding Event-time window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)
//Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)
//Sliding Row-count window
.window(Slide over 10.minutes every 5.rows on 'proctime as 'w)
會話窗口要用Session類來定義
//Sesion Evnet-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)
//Session Processing-time Window
.window(Session withGap 10.minutes on 'procetime as 'w)
Group Windows定義在SQL查詢的Group By子句中
TUMBLE(time_attr, interval)
定義一個滾動窗口,第一個參數是時間字段,第二個參數是窗口長度
HOP(time_attr,interval,interval)
定義的一個滑動窗口,第一個參數是時間字段,第二個參數是窗口滑動步長,第三個是窗口長度
SESSION(time_attr, interval)
定義一個會話窗口,第一個參數是時間字段,第二個參數是窗口間隔
Over Windows
針對每一個輸入行,計算相鄰行範圍內的聚合