Flink Table API&SQL的概念和通用API

Table API和SQL經過join API集成在一塊兒,這個join API的核心概念是Table,Table能夠做爲查詢的輸入和輸出。這篇文檔展現了使用Table API和SQL查詢的程序的通用結構,如何註冊一個Table,如何查詢一個Table以及如何將數據發給Table。html

Table API和SQL查詢程序的結構

全部批處理和流處理的Table API、SQL程序都有以下相同的模式,下面例子的代碼展現了Table API和SQL程序的通用結構:react

// 對於批處理程序來講使用 ExecutionEnvironment 來替換 StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 建立一個TableEnvironment
// 對於批處理程序來講使用 BatchTableEnvironment 替換 StreamTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 註冊一個 Table
tableEnv.registerTable("table1", ...)            // 或者
tableEnv.registerTableSource("table2", ...);     // 或者
tableEnv.registerExternalCatalog("extCat", ...);

// 從Table API的查詢中建立一個Table
Table tapiResult = tableEnv.scan("table1").select(...);
// 從SQL查詢中建立一個Table
Table sqlResult  = tableEnv.sql("SELECT ... FROM table2 ... ");

// 將Table API 種的結果 Table 發射到TableSink中 , SQL查詢也是同樣的
tapiResult.writeToSink(...);

// 執行
env.execute();

注意:Table API 和 SQL查詢能夠輕易地進行集成並嵌入到DataStream或者DataSet程序中,請參考Integration With DataStream and DataSet API部分來了解DataStream和DataSet如何轉換成Table及Table如何轉換成DataStream和DataSet。sql

建立一個TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要負責:
  一、在內部目錄中註冊一個Table
  二、註冊一個外部目錄
  三、執行SQL查詢
  四、註冊一個用戶自定義函數(標量、表及聚合)
  五、將DataStream或者DataSet轉換成Table
  六、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一個Table老是會綁定到一個指定的TableEnvironment中,相同的查詢不一樣的TableEnvironment是沒法經過join、union合併在一塊兒。數據庫

TableEnvironment能夠經過調用帶有參數StreamExecutionEnvironment或者ExecutionEnvironment和一個可選參數TableConfig的靜態方法TableEnvironment.getTableEnvironment()來建立。TableConf能夠用來配置TableEnvironment或者自定義查詢優化器和翻譯過程(參考查詢優化器)apache

// ***************
// STREAMING QUERY
// ***************
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 爲streaming查詢建立一個 TableEnvironment
StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv);

// ***********
// BATCH QUERY
// ***********
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
// 爲批查詢建立一個 TableEnvironment
BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv);
// ***************
// STREAMING QUERY
// ***************
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 爲流查詢建立一個 TableEnvironment
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// ***********
// BATCH QUERY
// ***********
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// 爲批查詢建立一個 TableEnvironment
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

在Catalog(目錄)中註冊一個Table

TableEnvironment有一個在內部經過表名組織起來的表目錄,Table API或者SQL查詢能夠訪問註冊在目錄中的表,並經過名稱來引用它們。
TableEnvironment容許經過各類源來註冊一個表:
  一、一個已存在的Table對象,一般是Table API或者SQL查詢的結果
  二、TableSource,能夠訪問外部數據如文件、數據庫或者消息系統
  三、DataStream或者DataSet程序中的DataStream或者DataSetapi

將DataStream或者DataSet註冊爲一個表將在Integration With DataStream and DataSet API中討論。安全

註冊一個Table

一個Table能夠在TableEnvironment中按照下面程序註冊:app

// 獲取一個 StreamTableEnvironment,  BatchTableEnvironment也是一樣的方法
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Table 是簡單的投影查詢的結果 
Table projTable = tableEnv.scan("X").project(...);

// 將 Table projTable 註冊爲表 "projectedX"
tableEnv.registerTable("projectedTable", projTable);
// 獲取一個TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table 是簡單的投影查詢的結果 
val projTable: Table = tableEnv.scan("X").project(...)

// 將 Table projTable 註冊爲表 "projectedX"
tableEnv.registerTable("projectedTable", projTable)

注意:一個註冊的Table被當作是與關係型數據庫中的視圖相似,即定義Table的查詢不會被優化,可是當其餘查詢引用到已註冊的Table時會被內聯。若是多個查詢引用同一個已註冊的Table,這個Table會跟每一個查詢內聯並進行屢次執行,即:已註冊的Table的結果不會共享。函數

註冊一個TableSource

TableSource能夠訪問保存在外部存儲系統如數據庫系統(MySQL、HBase...),指定編碼格式的文件(CSV, Apache [Parquet, Avro, ORC],...)或者消息系統(Apache Kafka,RabbitMQ,...)中的數據。測試

Flink的目標是爲通用的數據格式和存儲系統提供TableSource,請參考Table Sources和Sinks頁來了解Flink所支持的TableSource列表及如何自定義一個TableSource。

一個TableSource能夠在TableEnvironment中按以下方式來定義:

// 獲取一個StreamTableEnvironment, 一樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 建立一個 TableSource
TableSource csvSource = new CsvTableSource("/path/to/file", ...);

// 將TableSource註冊爲表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource);
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 建立一個TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// 將 TableSource 註冊爲表 "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

註冊一個外部Catalog(目錄)

一個外部目錄提供了關於外部數據庫和表的信息如:它們的名稱、模式、統計及如何訪問保存在外部數據庫、表和文件中的數據。

一個外部目錄能夠經過實現ExternalCatalog接口來建立並在TableEnvironment中註冊,以下:

// 獲取一個 StreamTableEnvironment, 一樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 建立一個外部catalog
ExternalCatalog catalog = new InMemoryExternalCatalog();

// 註冊 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog);
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 建立一個 catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 註冊 ExternalCatalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

一旦在TableEnvironment中註冊以後,全部定義在ExternalCatalog中的表均可以經過指定全路徑如:catalog.database.table 在Table API或者SQL查詢來訪問。

目前,Flink提供InMemoryExternalCatalog來作demo或者測試。然而,ExternalCatalog接口還能夠被用來鏈接HCatalog或者Metastore到Table API。

查詢一個Table

Table API

Table API是一個Scala和Java的語言集成查詢API,與SQL相反,查詢並不指定爲字符串而是根據主機語言一步一步的構建。

Table API是基於Table類來的,Table類表明了一個流或者批表,而且提供方法來使用關係型操做。這些方法返回一個新的Table對象,這個Table對象表明着輸入的Table應用關係型操做後的結果。一些關係型操做是由多個方法調用組成的如:table.groupBy(...).select(), 其中groupBy(...)指定了table的分組,而select(...)則是table分組的映射。

Table API文檔描述了streaming和batch表所支持的全部Table API操做。

下面的例子展現了一個簡單的Table API聚合查詢:

// 獲取一個 StreamTableEnvironment, 一樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 註冊一個名叫 Orders 的表

// 掃描註冊的 Orders 表
Table orders = tableEnv.scan("Orders");
// 計算全部來自法國的客戶的收入
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");

// 發射或者轉換一個 Table
// 執行查詢
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 註冊一個名叫 Orders 的表

// 掃描已註冊的 Orders 表
Table orders = tableEnv.scan("Orders")
// 計算全部來自法國偶的客戶的收入
Table revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName)
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 發射或者轉換一個Table
// 執行查詢

注意:Scala Table API使用Scala的符號在引用表屬性時,以'`'開始,Table API使用Scala的隱式轉換,爲了使用Scala的隱式轉換,請確保導入org.apache.flink.api.scala._org.apache.flink.table.api.scala._

SQL

Flink的SQL集成是基於Apache Calcite的,Apache Calcite實現了標準的SQL,SQL查詢被指定爲常規字符串。

SQL文檔描述了Flink對流和批表的SQL支持。
下面的例子展現瞭如何指定一個查詢並返回一個Table結果;

// 獲取一個 StreamTableEnvironment, 一樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 註冊一個名叫Orders 的表

// 計算全部來自法國的客戶的收入
Table revenue = tableEnv.sql(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 發射或者轉換一個Table
// 執行查詢
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

//註冊一個名叫 Orders的表

// 計算全部來自法國的客戶的收入
Table revenue = tableEnv.sql(""" 
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 發射或者轉換 Table
// 執行查詢

混合使用Table API和SQL

Table API和SQL查詢能夠很容易地合併由於它們都返回Table對象:
  一、Table API查詢能夠基於SQL查詢結果的Table來進行
  二、SQL查詢能夠基於Table API查詢的結果來定義

發射一個Table

爲了發射一個Table,能夠將其寫入一個TableSink中,TableSink 是支持各類文件格式(如:CSV, Apache Parquet, Apache Avro)、存儲系統(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系統(如:Apache Kafka,RabbitMQ)的通用接口。

一個批Table只能寫入BatchTableSink中,而流Table須要一個AppendStreamTableSinkRetractStreamTableSink或者UpsertStreamTableSink

請參考Table Sources & Sinks文檔來了解更多可用sink的信息和如何實現一個自定義的TableSink。

// 獲取一個StreamTableEnvironment, 一樣適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 使用Table API和/或SQL查詢獲取一個 Table
Table result = ...

// 建立一個TableSink
TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|");

// 將結果Table寫入TableSink中
result.writeToSink(sink);

// 執行程序
// 獲取一個TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查詢獲取一個 Table
val result: Table = ...

//建立一個 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// 將結果 Table寫入TableSink中
result.writeToSink(sink)

// 執行程序

翻譯和執行一個查詢

Table API和SQL查詢根據輸入是流仍是批翻譯成DataStream或者DataSet,查詢內部表示爲一個邏輯查詢計劃,並分兩個階段進行翻譯:
  一、優化邏輯計劃
  二、翻譯成一個DataStream或者DataSet程序

Table API或者SQL查詢會在下面狀況下觸發:
  當調用Table.writeToSink()時,Table會發射到TableSink中
  Table轉換DataStream或者DataSet時(參考與DataStream和DataSet API集成)
一旦翻譯,Table API或者SQL查詢就會像常規DataStream或DataSet處理同樣,而且當StreamExecutionEnvironment.execute()或者ExecutionEnvironment.execute()調用時執行。

與DataStream和DataSet API集成

Table API和SQL查詢能夠很容易地進行集成並嵌入到DataStreamDataSet程序中。例如:咱們能夠查詢一個外部表(如:來自關係型數據庫的表)、作一些預處理,如過濾、映射、聚合或者與元數據關聯,而後使用DataStream或者DataSet API(及其餘基於這些API的庫,如CEP或Gelly)進行進一步處理。一樣,Table API或者SQL查詢也能夠應用於DataStream或者DataSet程序的結果中。

這種交互能夠經過將DataStream或者DataSet轉換成一個Table及將Table轉換成DataStream或者DataSet來實現。在本節,咱們將描述這些轉換是如何完成的。

Scala 隱式轉換

Scala Table API爲DataSet、DataStream和Table類提供了隱式轉換功能。這些轉換能夠經過導入Scala DataStream API中的org.apache.flink.table.api.scala._org.apache.flink.api.scala._包來啓用。

註冊一個DataStream或者DataSet爲Table

一個DataStream或者DataSet能夠在TableEnvironment中註冊爲Table,表的結果模式根據註冊的DataStream或者DataSet的數據類型來定。請參考數據類型映射到表模式來了解更詳細的信息。

// 獲取 StreamTableEnvironment
// 註冊一個DataSet 到BatchTableEnvironment也是等效的
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// 註冊DataStream 爲表  "myTable" ,並有兩個字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream);

// 註冊 DataStream 爲表 "myTable2" 並有兩個字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
// 獲取 TableEnvironment 
// 註冊一個 DataSet 是等價的
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 註冊 DataStream 爲表 "myTable" 並有兩個字段 "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// 註冊 DataStream 爲 "myTable2" 並有兩個字段 "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

將Table轉換爲DataStream或者DataSet

Table能夠轉換爲DataStream或者DataSet,這樣的話,自定義的DataStream或者DataSet程序就能夠基於Table API或者SQL查詢的結果來執行了。

當將一個Table轉換爲DataStream或者DataSet時,你須要指定生成的DataStream或者DataSet的數據類型,即須要轉換表的行的數據類型,一般最方便的轉換類型是Row,下面列表概述了不一樣選項的功能:
  一、Row:字段經過位置映射、能夠是任意數量字段,支持空值,非類型安全訪問
  二、POJO:字段經過名稱(POJO字段做爲Table字段時,必須命名)映射,能夠是任意數量字段,支持空值,類型安全訪問
  三、Case Class:字段經過位置映射,不支持空值,類型安全訪問
  四、Tuple:字段經過位置映射,不得多於22(Scala)或者25(Java)個字段,不支持空值,類型安全訪問
  五、Atomic Type:Table必須有一個字段,不支持空值,類型安全訪問。

將Table轉換爲DataStream

流式查詢的結果Table會被動態地更新,即每一個新的記錄到達輸入流時結果就會發生變化。所以,轉換此動態查詢的DataStream須要對錶的更新進行編碼。

有兩種模式來將Table轉換爲DataStream:
  一、Append Mode:這種模式只適用於當動態表僅由INSERT更改修改時,即僅附加,以前發射的結果不會被更新。
  二、Retract Mode:始終均可以使用此模式,它使用一個boolean標識來編碼INSERT和DELETE更改。

// 獲取一個 StreamTableEnvironment. 
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

//  有兩個字段(String name, Integer age)的Table
Table table = ...

// 經過指定類將Table轉換爲Row的Append DataStream
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);

// 經過一個TypeInformation將Table轉換爲Tuple2<String, Integer> 類型的Append DataStream  
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);

// 將Table轉換爲Row的react形式的DataStream
//   一個reactDataStream的類型X爲 DataStream<Tuple2<Boolean, X>>. 
//   boolean字段指定了更改的類型. 
//   True 是 INSERT, false 是 DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

注意:有關動態表及其屬性的詳細討論在Streaming Queries文檔中給出。

將Table轉換爲DataSet

Table能夠按照以下方式轉換爲DataSet:

// 獲取 BatchTableEnvironment
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 有兩個字段(String name, Integer age)的Table
Table table = ...

// 經過指定類將Table轉換爲Row類型的DataSet
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);

// 經過TypeInformation 將Table轉換爲Tuple2<String, Integer>類型的DataSet
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 獲取 TableEnvironment 
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 有兩個字段(String name, Integer age)的Table
val table: Table = ...

// 將Table轉換爲Row類型的DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

// 將Table轉換爲Tuple2[String, Int]類型的DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

數據類型映射到表模式

Flink的DataStream和DataSet API支持多種數據類型,如Tuple,POJO, case class及原始數據類型。接下來咱們描述Table API如何將這些類型轉換爲內部行表示及展現將DataStream轉換爲Table的例子。

原子類型

Flink將原生類型(如:Integer, Double, String)或者通用類型(不能再被分析或者分解的類型)視爲原子類型,一個原子類型的DataStream或者DataSet能夠轉換爲只有一個屬性的Table,屬性的類型根據原子類型推算,而且必須得指定屬性的名稱。

// 獲取一個 StreamTableEnvironment, 一樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Long> stream = ...
// 將 DataStream轉換爲具備屬性"myLong"的Table
Table table = tableEnv.fromDataStream(stream, "myLong");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[Long] = ...
// 將 DataStream 轉換爲具備屬性'myLong的Table
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

Tuple(Java和Scala都支持)和Case Class(僅Scala支持)

Flink支持Scala內置的Tuple和Flink爲Java提供的Tuple,DataStream和DataSet類型的Tuple均可以被轉換爲表。字段能夠經過爲全部字段(經過位置來映射)提供的名稱來重命名,若是沒有爲字段指定名稱的話,就會採用默認的字段名。

// 獲取一個 StreamTableEnvironment, 一樣適用於BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Long, String>> stream = ...

// 將 DataStream爲具備字段名爲"myLong", "myString"的Table
Table table1 = tableEnv.fromDataStream(stream, "myLong, myString");

// 將 DataStream 轉換爲具備默認字段名 "f0", "f1"的 Table
Table table2 = tableEnv.fromDataStream(stream);
//獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// 將 DataStream 轉換爲具備字段名 'myLong, 'myString' 的Table
val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// 將 DataStream 轉換爲具備默認字段名 '_1, '_2的Table
val table2: Table = tableEnv.fromDataStream(stream)

// 定義一個 case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// 將 DataStream 轉換爲具備默認字段名 'name, 'age'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC)

// 將 DataStream 轉換爲具備字段名 'myName, 'myAge'的Table
val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

POJO(Java 和 Scala)

Flink支持使用POJO做爲複合類型,決定POJO規則的文檔請參考這裏

當將一個POJO類型的DataStream或者DataSet轉換爲Table而不指定字段名稱時,Table的字段名稱將採用JOPO原生的字段名稱做爲字段名稱。重命名原始的POJO字段須要關鍵字AS,由於POJO沒有固定的順序,名稱映射須要原始名稱而且不能經過位置來完成。

//獲取一個 StreamTableEnvironment, 一樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// Person 是一個有兩個字段"name" and "age" 的POJO
DataStream<Person> stream = ...

// 將 DataStream 轉換爲有字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);

// 將 DataStream 轉換爲有字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person 是一個有字段 "name" and "age" 的POJO
val stream: DataStream[Person] = ...

// 將 DataStream 轉換爲具備字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)

// 將 DataStream 轉換爲具備字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

Row

Row數據類型支持任意數量的字段,而且字段能夠是null值,字段名稱能夠經過RowTypeInformation來指定或者將一個Row DataStream或者DataSet轉換爲Table時(根據位置)指定。

// 獲取一個 StreamTableEnvironment, 一樣原理適用於 BatchTableEnvironment
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// 在`RowTypeInfo`中指定字段"name" and "age"的Row類型DataStream
DataStream<Row> stream = ...

// 將 DataStream 轉換爲具備字段 "name", "age" 的Table
Table table1 = tableEnv.fromDataStream(stream);

// 將 DataStream 轉換爲具備字段 "myName", "myAge" 的Table
Table table2 = tableEnv.fromDataStream(stream, "myName, myAge");
// 獲取一個 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 在`RowTypeInfo`中指定字段"name" and "age"的Row類型DataStream
val stream: DataStream[Row] = ...

// 將 DataStream 轉換爲具備字段 'name, 'age' 的Table
val table1: Table = tableEnv.fromDataStream(stream)

// 將 DataStream 轉換爲具備字段 'myName, 'myAge' 的Table
val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

查詢優化

Apache Flink使用Apache Calcite來優化和翻譯查詢,當前的查詢優化包括投影、過濾下推、子查詢去相關及各類形式的查詢重寫。Flink不去優化join的順序,可是會根據它們的順序去執行(FROM子句中表的順序或者WHERE子句中鏈接謂詞的順序)。

能夠經過提供一個CalciteConfig對象來調整在不一樣階段應用的優化規則集,這個能夠經過調用CalciteConfig.createBuilder())得到的builder來建立,而且能夠經過調用tableEnv.getConfig.setCalciteConfig(calciteConfig)來提供給TableEnvironment。

解析一個Table

Table API爲計算一個Table提供了一個機制來解析邏輯和優化查詢計劃,這個能夠經過調用TableEnvironment.explain(table)方法來完成,它會返回描述三個計劃的字符串:
  一、關係查詢語法樹,即未優化的查詢計劃
  二、優化後的邏輯查詢計劃
  三、物理執行計劃

如下代碼顯示了一個示例和相應的輸出:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));

Table table1 = tEnv.fromDataStream(stream1, "count, word");
Table table2 = tEnv.fromDataStream(stream2, "count, word");
Table table = table1
  .where("LIKE(word, 'F%')")
  .unionAll(table2);

String explanation = tEnv.explain(table);
System.out.println(explanation);
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
  .where('word.like("F%"))
  .unionAll(table2)

val explanation: String = tEnv.explain(table)
println(explanation)

輸出以下:

== Abstract Syntax Tree ==
LogicalUnion(all=[true])
  LogicalFilter(condition=[LIKE($1, 'F%')])
    LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalTableScan(table=[[_DataStreamTable_1]])

== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
  DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')])
    DataStreamScan(table=[[_DataStreamTable_0]])
  DataStreamScan(table=[[_DataStreamTable_1]])

== Physical Execution Plan ==
Stage 1 : Data Source
  content : collect elements with CollectionInputFormat

Stage 2 : Data Source
  content : collect elements with CollectionInputFormat

  Stage 3 : Operator
    content : from: (count, word)
    ship_strategy : REBALANCE

    Stage 4 : Operator
      content : where: (LIKE(word, 'F%')), select: (count, word)
      ship_strategy : FORWARD

      Stage 5 : Operator
        content : from: (count, word)
        ship_strategy : REBALANCE
相關文章
相關標籤/搜索