Flink從入門到真香(2一、Table轉換DataStream及窗口)

Flink提供了Table形式和DataStream兩種形式,能夠根據實際狀況本身選擇用哪些方式來實現,但實際開發過程當中可能會有需求兩種形式互相轉換,這裏介紹下操做方法java

表能夠轉換爲DataStream或DataSet,這樣自定義流處理或批處理程序就能夠繼續在Table API或SQL查詢的結果上運行了
將錶轉換爲DataStream或DataSet時,須要指定生成的數據類型,即要將表的每一行轉換成的數據類型
表做爲流式查詢的結果,是動態更新的
轉換有兩種轉換模式: 追加(Appende)模式和撤回(Retract)模式sql

查看執行計劃數據庫

Table API提供了一種機制來解釋計算表的邏輯和優化查詢計劃apache

查看執行計劃,能夠經過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成,返回一個字符串,描述三個計劃windows

優化的邏輯查詢計劃
優化後的邏輯查詢計劃
實際執行計劃api

val explaination: String = tableEnv.explain(resultTable)
println(explaination)併發

流處理和關係代數的區別

Flink從入門到真香(2一、Table轉換DataStream及窗口)

動態表(Dynamic Tables)
動態表是Flink對流數據的Table API和SQL支持的核心概念
與批處理數據的靜態表不一樣,動態表是隨時間變化的maven

持續查詢(Continuous Query)
動態表能夠像靜態的批處理表 同樣進行查詢,查詢一個動態表會產生持續查詢(Continuous Query)
連續查詢永遠不會終止,並會生成另外一個動態表
查詢會不斷更新其動態結果表,以反映其動態輸入表上的改動ide

動態表和持續查詢的轉換過程
Flink從入門到真香(2一、Table轉換DataStream及窗口)函數

1) 首先輸入的流會被轉換爲動態表,這個動態表只會一直追加
2)對動態表計算連續查詢,生成新的動態表
針對以前查詢的結果加一個狀態,這樣子就不用每次從頭開始查詢,提高效率

3)生成的新的動態表被轉換成流而後輸出

將流轉換成動態表

爲了處理帶有關係查詢的流,必須先將其轉換爲表
從概念上講,流的每一個數據記錄,都被解釋爲對結果表的插入修改操做

第一個場步,讀取訪問日誌,每來一條數據就插入一次

Flink從入門到真香(2一、Table轉換DataStream及窗口)

持續查詢栗子,統計每個用戶點擊了多少次

持續查詢會在動態表上作計算處理,並做爲結果生成新的動態表

Flink從入門到真香(2一、Table轉換DataStream及窗口)

最後一步,將動態錶轉換成DataStream

與常規的數據庫表同樣,動態表能夠經過插入(insert)、更新(update)和刪除(delete)更改,進行持續的修改
將動態錶轉換爲流或將其寫入外部系統時,須要對這些更改進行編碼

1,僅追加流(Append-only)

  • 僅經過插入(insert)更改來修改的動態表,能夠直接轉換爲僅追加流
    2,撤回流(Retract)
  • 撤回流是包含兩類消息的流: 添加(add)消息和撤回(Retract)消息
    3,更新插入流(Upsert)
  • Upsert流也包含兩種類型的消息: Upsert消息和刪除(Delete)消息。

將動態錶轉換成DataStream

Retract操做,每新增一個是insert+操做,撤回一個是delete - 操做,
這裏當Mary第一次來的時候,會有一個insert,當Mary第二次來的時候會觸發兩個操做,Insert和delete,增長一個mary2, 刪掉mary1

Flink從入門到真香(2一、Table轉換DataStream及窗口)
Flink從入門到真香(2一、Table轉換DataStream及窗口)

時間特性(Time Attributes)
基於時間的操做(好比Table API和SQL中的窗口操做),須要定義相關的時間語義和事件數據來源的信息
Table能夠提供一個邏輯上的時間字段,用於在表處理程序中,指示時間和訪問相應的時間戳
時間屬性,能夠是每一個schema的一部分。一旦定義了時間屬性,它就能夠做爲一個字段引用,而且能夠在基於時間的操做中使用
時間屬性的行爲相似於常規時間戳,能夠訪問,而且進行計算

定義處理時間 (Processing Time)

處理時間語義下,容許表處理程序根據機器的本地時間生成結果。他是時間最簡單的概念,既不須要提取時間戳,也不須要生成watermark

幾種定義的方法:
一、由DataStream轉換成表時指定(最簡單的一種)

在定義schema期間,能夠使用.proctime,指定字段名定義處理時間字段
這個proctime屬性只能經過附加邏輯字段,來拓展物理schema,所以只能在schema定義的末尾定義它

val sensorTables = tableEnv.fromDataStream(dataStream, 'id,'temperature,'timestamp, 'pt.proctime)

增長pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

栗子:

package com.mafei.apitest.tabletest

import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

object TimeAndWindowTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //設置1個併發

    //設置處理時間爲流處理的時間
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")
    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })

    //設置環境信息(能夠不用)
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner() // Flink 10的時候默認是用的useOldPlanner 11就改成了BlinkPlanner
      .inStreamingMode()
      .build()

    // 設置flink table運行環境
    val tableEnv = StreamTableEnvironment.create(env, settings)

    //流轉換成表
    val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp,'temperature,'pt.proctime)

    sensorTables.printSchema()

    sensorTables.toAppendStream[Row].print()

    env.execute()

  }
}

代碼結構及運行效果:
Flink從入門到真香(2一、Table轉換DataStream及窗口)

第二種,定義處理時間(Processing Time)

val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
  .withFormat(new Csv()) //由於txt裏頭是以,分割的跟csv同樣,因此能夠用oldCsv
  .withSchema(new Schema() //這個表結構要跟你txt中的內容對的上
    .field("id", DataTypes.STRING())
    .field("timestamp", DataTypes.BIGINT())
    .field("tem", DataTypes.DOUBLE())
    .field("pt", DataTypes.TIMESTAMP(3))
    .proctime()   //須要注意,只有輸出的sink目標裏面實現了DefineRowTimeAttributes才能用,不然會報錯,文件中不能,但kafka中是能夠用的
  ).createTemporaryTable("inputTable")

第三種,定義處理時間(Processing Time)另外一種實現,必須使用blink引擎

val sinkDDlL: String =
  """
    |create table dataTable(
    | id varchar(20) not null
    | ts bigint,
    | temperature double,
    | pt AS PROCTIME()
    |) with (
    | 'connector.type' = 'filesystem',
    | 'connector.path' = '/sensor.txt',
    | 'format.type' = 'csv'
    |)
    |""".stripMargin
tableEnv.sqlUpdate(sinkDDlL)

定義事件時間(Event Time)

這種就不是flink從本地取處理時間了,而是取事件中的時間來處理
事件時間語義,容許表處理程序根據每一個記錄中包含的時間生成結果。這樣子即便在亂序事件或者延遲事件時,也能夠得到正確的結果。
爲了處理無序事件,並區分流中的準時和遲到事件;Flink須要從事件數據中,提取時間戳,並用來推動事件時間的進展
定義事件時間有3種方式
第一種,由DataStream轉換成表時指定
在DataStream轉換成Table,使用rowtime能夠定義事件時間屬性

//先轉換成樣例類類型
val dataStream = inputStream
  .map(data => {
    val arr = data.split(",") //按照,分割數據,獲取結果
    SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
  }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReadingTest5](Time.seconds(1000L)) {
  override def extractTimestamp(t: SensorReadingTest5): Long =t.timestamp * 1000L
}) //指定watermark

    //流轉換成表,指定處理時間-上面的實現方式
//    val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp,'temperature,'pt.proctime)

    //將DataStream轉換爲Table,並指定事件時間字段
    val sensorTables = tableEnv.fromDataStream(dataStream, 'id, 'timestamp.proctime,'temperature)
    //將DataStream轉換爲Table,並指定事件時間字段-直接追加字段
    val sensorTables = tableEnv.fromDataStream(dataStream,"id","temperature","timestamp","rt".rowtime)

第二種,定義Table Schema時指定

val filePath = "/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt"
tableEnv.connect(new FileSystem().path(filePath))
  .withFormat(new Csv()) //由於txt裏頭是以,分割的跟csv同樣,因此能夠用oldCsv
  .withSchema(new Schema() //這個表結構要跟你txt中的內容對的上
    .field("id", DataTypes.STRING())
    .field("tem", DataTypes.DOUBLE())
    .rowtime(
      new Rowtime()
        .timestampsFromField("timestamp") //從數據字段中提取時間戳
        .watermarksPeriodicBounded(2000) //watermark延遲2秒

    )

  ).createTemporaryTable("inputTable")

在建立表的DDL中定義

//在建立表的DDL中定義
  val sinkDDlL: String =
    """
      |create table dataTable(
      | id varchar(20) not null
      | ts bigint,
      | temperature double,
      | rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts)),
      | watermark for rt as rt - interval '1' second   //基於ts減去1秒生成watermark,也就是watermark的窗口時1秒
      |) with (
      | 'connector.type' = 'filesystem',
      | 'connector.path' = '/sensor.txt',
      | 'format.type' = 'csv'
      |)
      |""".stripMargin
  tableEnv.sqlUpdate(sinkDDlL)

Flink 窗口

時間語義須要配合窗口操做才能發揮真正的做用,
在Table ApI和SQL中,主要有兩種窗口

Group Windows (分組窗口)

先定義組長什麼樣子,在根據key進行groupby,最後一步執行聚合函數

根據時間或行計數間隔,將行聚合到有限的組(Group)中,並對每一個組的數據執行一次聚合函數

Group Windows是使用window(w:GroupWindow)子句定義的,而且必須由as子句制定一個別名.
爲了按照窗口對錶進行分組,窗口的別名必須在group by子句中,像常規的分組字段同樣引用
val table = input
.window([w;: GroupWindow] as 'w) //定義窗口,別名爲w
.groupBy('w,'a) //按照字段a和窗口w分組
.select('a,'b.sum) //聚合操做

Table API提供了一組具備特定語義的預約義window類,這些類會被轉換爲底層DataStream或DataSet的窗口操做

滾動窗口(Tumbling windows)

滾動窗口要用Tumble類來定義

//定義事件時間的滾動窗口(Tumbling Event-time window
.window(Tumble over 10.minutes on 'rowtime as 'w)

//定義處理時間的滾動窗口(Tumbling Processing-time window)
.window(Tumble over 10.minutes on 'proctime as 'w)

//定義數據數量的滾動窗口(Tumbling Row-count window)
.window(Tumble over 10.rows on 'proctime as 'w)

滑動窗口(Sliding windows)

10分鐘一個滑動窗口,每5分鐘滑動一次
//Sliding Event-time window
.window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

//Sliding Processing-time window
.window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)

//Sliding Row-count window
.window(Slide over 10.minutes every 5.rows on 'proctime as 'w)

會話窗口(Session windows)

會話窗口要用Session類來定義

//Sesion Evnet-time Window
.window(Session withGap 10.minutes on 'rowtime as 'w)

//Session Processing-time Window
.window(Session withGap 10.minutes on 'procetime as 'w)

SQL中的Group Windows

Group Windows定義在SQL查詢的Group By子句中
TUMBLE(time_attr, interval)
定義一個滾動窗口,第一個參數是時間字段,第二個參數是窗口長度

HOP(time_attr,interval,interval)
定義的一個滑動窗口,第一個參數是時間字段,第二個參數是窗口滑動步長,第三個是窗口長度
SESSION(time_attr, interval)
定義一個會話窗口,第一個參數是時間字段,第二個參數是窗口間隔

Over Windows

針對每一個輸入行,計算相鄰行範圍內的聚合

相關文章
相關標籤/搜索