Flink從入門到真香(1七、Flink 重磅功能 Table API(Flink SQL))

Flink對批處理和流處理,提供了統一的上層API
Table API是一套內嵌在java和scala語言中的查詢api,它容許以很是直觀的方式組合來自一些關係運算符的查詢
Flink的sql支持基於實現了sql標準的Apache calcitejava

Flink從入門到真香(1七、Flink 重磅功能 Table API(Flink SQL))

先來個栗子感覺下: sql

demo效果: 在數據源txt中讀取,輸出id和temperature 這2個字段,按照id作篩選,輸出,分別用table api和sql來實現

在pom.xml中加入依賴apache

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

<!--        也能夠不用引入下面的包,由於上面已經包含了-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.10.1</version>
</dependency>

在tabletest包下建一個Example object:api

package com.mafei.apitest.tabletest

import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._

object Example {
  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setAutoWatermarkInterval(200) //直接全局設置watermark的時間爲200毫秒
    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")

    env.setParallelism(1)

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })

    //首先建立表執行環境
    val tableEnv = StreamTableEnvironment.create(env)

    //基於流建立一張表
    val dataTable: Table = tableEnv.fromDataStream(dataStream)

    //調用table api進行轉換
    val resultTable = dataTable
      .select("id, temperature")
      .filter("id == 'sensor3'")

    resultTable.toAppendStream[(String,Double)].print("result")

    //第二種,直接寫sql來實現
    tableEnv.createTemporaryView("table1", dataTable)
    val sql: String = "select id, temperature from table1 where id='sensor1'"
    val resultSqlTable = tableEnv.sqlQuery(sql)
    resultSqlTable.toAppendStream[(String, Double)].print("result sql")

    env.execute("table api example")
  }

}

代碼結構及運行效果:

Flink從入門到真香(1七、Flink 重磅功能 Table API(Flink SQL))

看到效果以後再來分析結構:
Table API和SQL的程序結構,與流式處理的程序結構十分相似maven

//建立表執行環境
val tableEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment)

//建立一張表,用於讀取數據
tableEnv.connect(....).createTemporayTable("inputTable")

//註冊一張表,用於把計算結果輸出
tableEnv.connect(....).createTemporaryTable("outputTable")

//經過Table API查詢算子,獲得一張結果表
val result = tableEnv.from("inputTable").select()

//經過sql查詢語句,獲得一張表
val sqlResult = tableEnv.sqlQuery("select id, temperature from table1 where id='sensor1'")

//將結果表寫入到輸出表中
result.insertInto("outputTable")

幾種引擎實現方式

Flink SQL有好幾種實現方式,其中blink 是阿里內部使用後來開源合併到flink的引擎,來看看幾種使用方式ide

/**
 *
 * @author mafei
 * @date 2020/11/22
 */

package com.mafei.apitest.tabletest

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.api.scala._

object TableApi1 {
  def main(args: Array[String]): Unit = {
    //1 、建立環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)

    //1,1 基於老版本的planner的流處理
    val settings = EnvironmentSettings.newInstance()
      .useOldPlanner()
      .inStreamingMode()
      .build()
    val oldStreamTableEnv = StreamTableEnvironment.create(env, settings)

    //1.2 基於老版本的批處理環境
    val batchEnv = ExecutionEnvironment.getExecutionEnvironment

    val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)

    //1.3基於blink planner的流處理
    val blinkStreamSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)

    //基於blink planner的批處理
    val blinkBatchSettings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inBatchMode()
      .build()
    val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)

  }
}
相關文章
相關標籤/搜索