Table API和SQL經過join API集成在一塊兒,這個join API的核心概念是Table,Table能夠做爲查詢的輸入和輸出。這篇文檔展現了使用Table API和SQL查詢的程序的通用結構,如何註冊一個Table,如何查詢一個Table以及如何將數據發給Table。html
全部批處理和流處理的Table API、SQL程序都有以下相同的模式,下面例子的代碼展現了Table API和SQL程序的通用結構:react
// 對於批處理程序來講使用 ExecutionEnvironment 來替換 StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 建立一個TableEnvironment // 對於批處理程序來講使用 BatchTableEnvironment 替換 StreamTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 註冊一個 Table tableEnv.registerTable("table1", ...) // 或者 tableEnv.registerTableSource("table2", ...); // 或者 tableEnv.registerExternalCatalog("extCat", ...); // 從Table API的查詢中建立一個Table Table tapiResult = tableEnv.scan("table1").select(...); // 從SQL查詢中建立一個Table Table sqlResult = tableEnv.sql("SELECT ... FROM table2 ... "); // 將Table API 種的結果 Table 發射到TableSink中 , SQL查詢也是同樣的 tapiResult.writeToSink(...); // 執行 env.execute();
注意:Table API 和 SQL查詢能夠輕易地進行集成並嵌入到DataStream或者DataSet程序中,請參考Integration With DataStream and DataSet API部分來了解DataStream和DataSet如何轉換成Table及Table如何轉換成DataStream和DataSet。sql
TableEnvironment是Table API和SQL集成的核心概念,它主要負責:
一、在內部目錄中註冊一個Table
二、註冊一個外部目錄
三、執行SQL查詢
四、註冊一個用戶自定義函數(標量、表及聚合)
五、將DataStream或者DataSet轉換成Table
六、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用
一個Table老是會綁定到一個指定的TableEnvironment中,相同的查詢不一樣的TableEnvironment是沒法經過join、union合併在一塊兒。數據庫
TableEnvironment能夠經過調用帶有參數StreamExecutionEnvironment或者ExecutionEnvironment和一個可選參數TableConfig的靜態方法TableEnvironment.getTableEnvironment()
來建立。TableConf能夠用來配置TableEnvironment或者自定義查詢優化器和翻譯過程(參考查詢優化器)apache
// *************** // STREAMING QUERY // *************** StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // 爲streaming查詢建立一個 TableEnvironment StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); // *********** // BATCH QUERY // *********** ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment(); // 爲批查詢建立一個 TableEnvironment BatchTableEnvironment bTableEnv = TableEnvironment.getTableEnvironment(bEnv); // *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // 爲流查詢建立一個 TableEnvironment val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // 爲批查詢建立一個 TableEnvironment val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
TableEnvironment有一個在內部經過表名組織起來的表目錄,Table API或者SQL查詢能夠訪問註冊在目錄中的表,並經過名稱來引用它們。
TableEnvironment容許經過各類源來註冊一個表:
一、一個已存在的Table對象,一般是Table API或者SQL查詢的結果
二、TableSource,能夠訪問外部數據如文件、數據庫或者消息系統
三、DataStream或者DataSet程序中的DataStream或者DataSetapi
將DataStream或者DataSet註冊爲一個表將在Integration With DataStream and DataSet API中討論。安全
一個Table能夠在TableEnvironment中按照下面程序註冊:app
// 獲取一個 StreamTableEnvironment, BatchTableEnvironment也是一樣的方法 StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Table 是簡單的投影查詢的結果 Table projTable = tableEnv.scan("X").project(...); // 將 Table projTable 註冊爲表 "projectedX" tableEnv.registerTable("projectedTable", projTable); // 獲取一個TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // Table 是簡單的投影查詢的結果 val projTable: Table = tableEnv.scan("X").project(...) // 將 Table projTable 註冊爲表 "projectedX" tableEnv.registerTable("projectedTable", projTable)
注意:一個註冊的Table被當作是與關係型數據庫中的視圖相似,即定義Table的查詢不會被優化,可是當其餘查詢引用到已註冊的Table時會被內聯。若是多個查詢引用同一個已註冊的Table,這個Table會跟每一個查詢內聯並進行屢次執行,即:已註冊的Table的結果不會共享。函數
TableSource能夠訪問保存在外部存儲系統如數據庫系統(MySQL、HBase...),指定編碼格式的文件(CSV, Apache [Parquet, Avro, ORC],...)或者消息系統(Apache Kafka,RabbitMQ,...)中的數據。測試
Flink的目標是爲通用的數據格式和存儲系統提供TableSource,請參考Table Sources和Sinks頁來了解Flink所支持的TableSource列表及如何自定義一個TableSource。
一個TableSource能夠在TableEnvironment中按以下方式來定義:
// 獲取一個StreamTableEnvironment, 一樣適用於BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 建立一個 TableSource TableSource csvSource = new CsvTableSource("/path/to/file", ...); // 將TableSource註冊爲表 "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource); // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 建立一個TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // 將 TableSource 註冊爲表 "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource)
一個外部目錄提供了關於外部數據庫和表的信息如:它們的名稱、模式、統計及如何訪問保存在外部數據庫、表和文件中的數據。
一個外部目錄能夠經過實現ExternalCatalog接口來建立並在TableEnvironment中註冊,以下:
// 獲取一個 StreamTableEnvironment, 一樣適用於 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 建立一個外部catalog ExternalCatalog catalog = new InMemoryExternalCatalog(); // 註冊 ExternalCatalog tableEnv.registerExternalCatalog("InMemCatalog", catalog); // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 建立一個 catalog val catalog: ExternalCatalog = new InMemoryExternalCatalog // 註冊 ExternalCatalog tableEnv.registerExternalCatalog("InMemCatalog", catalog)
一旦在TableEnvironment中註冊以後,全部定義在ExternalCatalog中的表均可以經過指定全路徑如:catalog.database.table
在Table API或者SQL查詢來訪問。
目前,Flink提供InMemoryExternalCatalog來作demo或者測試。然而,ExternalCatalog接口還能夠被用來鏈接HCatalog或者Metastore到Table API。
Table API是一個Scala和Java的語言集成查詢API,與SQL相反,查詢並不指定爲字符串而是根據主機語言一步一步的構建。
Table API是基於Table類來的,Table類表明了一個流或者批表,而且提供方法來使用關係型操做。這些方法返回一個新的Table對象,這個Table對象表明着輸入的Table應用關係型操做後的結果。一些關係型操做是由多個方法調用組成的如:table.groupBy(...).select()
, 其中groupBy(...)
指定了table的分組,而select(...)
則是table分組的映射。
Table API文檔描述了streaming和batch表所支持的全部Table API操做。
下面的例子展現了一個簡單的Table API聚合查詢:
// 獲取一個 StreamTableEnvironment, 一樣適用於 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 註冊一個名叫 Orders 的表 // 掃描註冊的 Orders 表 Table orders = tableEnv.scan("Orders"); // 計算全部來自法國的客戶的收入 Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum"); // 發射或者轉換一個 Table // 執行查詢 // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 註冊一個名叫 Orders 的表 // 掃描已註冊的 Orders 表 Table orders = tableEnv.scan("Orders") // 計算全部來自法國偶的客戶的收入 Table revenue = orders .filter('cCountry === "FRANCE") .groupBy('cID, 'cName) .select('cID, 'cName, 'revenue.sum AS 'revSum) // 發射或者轉換一個Table // 執行查詢
注意:Scala Table API使用Scala的符號在引用表屬性時,以'`'開始,Table API使用Scala的隱式轉換,爲了使用Scala的隱式轉換,請確保導入org.apache.flink.api.scala._
和org.apache.flink.table.api.scala._
。
Flink的SQL集成是基於Apache Calcite的,Apache Calcite實現了標準的SQL,SQL查詢被指定爲常規字符串。
SQL文檔描述了Flink對流和批表的SQL支持。
下面的例子展現瞭如何指定一個查詢並返回一個Table結果;
// 獲取一個 StreamTableEnvironment, 一樣適用於BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 註冊一個名叫Orders 的表 // 計算全部來自法國的客戶的收入 Table revenue = tableEnv.sql( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // 發射或者轉換一個Table // 執行查詢 // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) //註冊一個名叫 Orders的表 // 計算全部來自法國的客戶的收入 Table revenue = tableEnv.sql(""" |SELECT cID, cName, SUM(revenue) AS revSum |FROM Orders |WHERE cCountry = 'FRANCE' |GROUP BY cID, cName """.stripMargin) // 發射或者轉換 Table // 執行查詢
Table API和SQL查詢能夠很容易地合併由於它們都返回Table對象:
一、Table API查詢能夠基於SQL查詢結果的Table來進行
二、SQL查詢能夠基於Table API查詢的結果來定義
爲了發射一個Table,能夠將其寫入一個TableSink中,TableSink 是支持各類文件格式(如:CSV, Apache Parquet, Apache Avro)、存儲系統(如:JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系統(如:Apache Kafka,RabbitMQ)的通用接口。
一個批Table只能寫入BatchTableSink中,而流Table須要一個AppendStreamTableSink
、RetractStreamTableSink
或者UpsertStreamTableSink
請參考Table Sources & Sinks文檔來了解更多可用sink的信息和如何實現一個自定義的TableSink。
// 獲取一個StreamTableEnvironment, 一樣適用於 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 使用Table API和/或SQL查詢獲取一個 Table Table result = ... // 建立一個TableSink TableSink sink = new CsvTableSink("/path/to/file", fieldDelim = "|"); // 將結果Table寫入TableSink中 result.writeToSink(sink); // 執行程序 // 獲取一個TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 使用Table API和/或SQL查詢獲取一個 Table val result: Table = ... //建立一個 TableSink val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") // 將結果 Table寫入TableSink中 result.writeToSink(sink) // 執行程序
Table API和SQL查詢根據輸入是流仍是批翻譯成DataStream或者DataSet,查詢內部表示爲一個邏輯查詢計劃,並分兩個階段進行翻譯:
一、優化邏輯計劃
二、翻譯成一個DataStream或者DataSet程序
Table API或者SQL查詢會在下面狀況下觸發:
當調用Table.writeToSink()
時,Table會發射到TableSink中
Table轉換DataStream或者DataSet時(參考與DataStream和DataSet API集成)
一旦翻譯,Table API或者SQL查詢就會像常規DataStream或DataSet處理同樣,而且當StreamExecutionEnvironment.execute()
或者ExecutionEnvironment.execute()
調用時執行。
Table API和SQL查詢能夠很容易地進行集成並嵌入到DataStream和DataSet程序中。例如:咱們能夠查詢一個外部表(如:來自關係型數據庫的表)、作一些預處理,如過濾、映射、聚合或者與元數據關聯,而後使用DataStream或者DataSet API(及其餘基於這些API的庫,如CEP或Gelly)進行進一步處理。一樣,Table API或者SQL查詢也能夠應用於DataStream或者DataSet程序的結果中。
這種交互能夠經過將DataStream或者DataSet轉換成一個Table及將Table轉換成DataStream或者DataSet來實現。在本節,咱們將描述這些轉換是如何完成的。
Scala 隱式轉換
Scala Table API爲DataSet、DataStream和Table類提供了隱式轉換功能。這些轉換能夠經過導入Scala DataStream API中的org.apache.flink.table.api.scala._
和org.apache.flink.api.scala._
包來啓用。
一個DataStream或者DataSet能夠在TableEnvironment中註冊爲Table,表的結果模式根據註冊的DataStream或者DataSet的數據類型來定。請參考數據類型映射到表模式來了解更詳細的信息。
// 獲取 StreamTableEnvironment // 註冊一個DataSet 到BatchTableEnvironment也是等效的 StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // 註冊DataStream 爲表 "myTable" ,並有兩個字段 "f0", "f1" tableEnv.registerDataStream("myTable", stream); // 註冊 DataStream 爲表 "myTable2" 並有兩個字段 "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, "myLong, myString"); // 獲取 TableEnvironment // 註冊一個 DataSet 是等價的 val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // 註冊 DataStream 爲表 "myTable" 並有兩個字段 "f0", "f1" tableEnv.registerDataStream("myTable", stream) // 註冊 DataStream 爲 "myTable2" 並有兩個字段 "myLong", "myString" tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
Table能夠轉換爲DataStream或者DataSet,這樣的話,自定義的DataStream或者DataSet程序就能夠基於Table API或者SQL查詢的結果來執行了。
當將一個Table轉換爲DataStream或者DataSet時,你須要指定生成的DataStream或者DataSet的數據類型,即須要轉換表的行的數據類型,一般最方便的轉換類型是Row,下面列表概述了不一樣選項的功能:
一、Row:字段經過位置映射、能夠是任意數量字段,支持空值,非類型安全訪問
二、POJO:字段經過名稱(POJO字段做爲Table字段時,必須命名)映射,能夠是任意數量字段,支持空值,類型安全訪問
三、Case Class:字段經過位置映射,不支持空值,類型安全訪問
四、Tuple:字段經過位置映射,不得多於22(Scala)或者25(Java)個字段,不支持空值,類型安全訪問
五、Atomic Type:Table必須有一個字段,不支持空值,類型安全訪問。
將Table轉換爲DataStream
流式查詢的結果Table會被動態地更新,即每一個新的記錄到達輸入流時結果就會發生變化。所以,轉換此動態查詢的DataStream須要對錶的更新進行編碼。
有兩種模式來將Table轉換爲DataStream:
一、Append Mode:這種模式只適用於當動態表僅由INSERT更改修改時,即僅附加,以前發射的結果不會被更新。
二、Retract Mode:始終均可以使用此模式,它使用一個boolean標識來編碼INSERT和DELETE更改。
// 獲取一個 StreamTableEnvironment. StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 有兩個字段(String name, Integer age)的Table Table table = ... // 經過指定類將Table轉換爲Row的Append DataStream DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // 經過一個TypeInformation將Table轉換爲Tuple2<String, Integer> 類型的Append DataStream TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // 將Table轉換爲Row的react形式的DataStream // 一個reactDataStream的類型X爲 DataStream<Tuple2<Boolean, X>>. // boolean字段指定了更改的類型. // True 是 INSERT, false 是 DELETE. DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class); // get TableEnvironment. // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream[(String, Int)](table) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有關動態表及其屬性的詳細討論在Streaming Queries文檔中給出。
將Table轉換爲DataSet
Table能夠按照以下方式轉換爲DataSet:
// 獲取 BatchTableEnvironment BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 有兩個字段(String name, Integer age)的Table Table table = ... // 經過指定類將Table轉換爲Row類型的DataSet DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); // 經過TypeInformation 將Table轉換爲Tuple2<String, Integer>類型的DataSet TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // 獲取 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 有兩個字段(String name, Integer age)的Table val table: Table = ... // 將Table轉換爲Row類型的DataSet val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table) // 將Table轉換爲Tuple2[String, Int]類型的DataSet val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
Flink的DataStream和DataSet API支持多種數據類型,如Tuple,POJO, case class及原始數據類型。接下來咱們描述Table API如何將這些類型轉換爲內部行表示及展現將DataStream轉換爲Table的例子。
原子類型
Flink將原生類型(如:Integer, Double, String)或者通用類型(不能再被分析或者分解的類型)視爲原子類型,一個原子類型的DataStream或者DataSet能夠轉換爲只有一個屬性的Table,屬性的類型根據原子類型推算,而且必須得指定屬性的名稱。
// 獲取一個 StreamTableEnvironment, 一樣原理適用於 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Long> stream = ... // 將 DataStream轉換爲具備屬性"myLong"的Table Table table = tableEnv.fromDataStream(stream, "myLong"); // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[Long] = ... // 將 DataStream 轉換爲具備屬性'myLong的Table val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple(Java和Scala都支持)和Case Class(僅Scala支持)
Flink支持Scala內置的Tuple和Flink爲Java提供的Tuple,DataStream和DataSet類型的Tuple均可以被轉換爲表。字段能夠經過爲全部字段(經過位置來映射)提供的名稱來重命名,若是沒有爲字段指定名稱的話,就會採用默認的字段名。
// 獲取一個 StreamTableEnvironment, 一樣適用於BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Long, String>> stream = ... // 將 DataStream爲具備字段名爲"myLong", "myString"的Table Table table1 = tableEnv.fromDataStream(stream, "myLong, myString"); // 將 DataStream 轉換爲具備默認字段名 "f0", "f1"的 Table Table table2 = tableEnv.fromDataStream(stream); //獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // 將 DataStream 轉換爲具備字段名 'myLong, 'myString' 的Table val table1: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString) // 將 DataStream 轉換爲具備默認字段名 '_1, '_2的Table val table2: Table = tableEnv.fromDataStream(stream) // 定義一個 case class case class Person(name: String, age: Int) val streamCC: DataStream[Person] = ... // 將 DataStream 轉換爲具備默認字段名 'name, 'age'的Table val tableCC1 = tableEnv.fromDataStream(streamCC) // 將 DataStream 轉換爲具備字段名 'myName, 'myAge'的Table val tableCC1 = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
POJO(Java 和 Scala)
Flink支持使用POJO做爲複合類型,決定POJO規則的文檔請參考這裏。
當將一個POJO類型的DataStream或者DataSet轉換爲Table而不指定字段名稱時,Table的字段名稱將採用JOPO原生的字段名稱做爲字段名稱。重命名原始的POJO字段須要關鍵字AS,由於POJO沒有固定的順序,名稱映射須要原始名稱而且不能經過位置來完成。
//獲取一個 StreamTableEnvironment, 一樣原理適用於 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Person 是一個有兩個字段"name" and "age" 的POJO DataStream<Person> stream = ... // 將 DataStream 轉換爲有字段 "name", "age" 的Table Table table1 = tableEnv.fromDataStream(stream); // 將 DataStream 轉換爲有字段 "myName", "myAge" 的Table Table table2 = tableEnv.fromDataStream(stream, "name as myName, age as myAge"); // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // Person 是一個有字段 "name" and "age" 的POJO val stream: DataStream[Person] = ... // 將 DataStream 轉換爲具備字段 'name, 'age' 的Table val table1: Table = tableEnv.fromDataStream(stream) // 將 DataStream 轉換爲具備字段 'myName, 'myAge' 的Table val table2: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
Row
Row數據類型支持任意數量的字段,而且字段能夠是null
值,字段名稱能夠經過RowTypeInformation來指定或者將一個Row DataStream或者DataSet轉換爲Table時(根據位置)指定。
// 獲取一個 StreamTableEnvironment, 一樣原理適用於 BatchTableEnvironment StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // 在`RowTypeInfo`中指定字段"name" and "age"的Row類型DataStream DataStream<Row> stream = ... // 將 DataStream 轉換爲具備字段 "name", "age" 的Table Table table1 = tableEnv.fromDataStream(stream); // 將 DataStream 轉換爲具備字段 "myName", "myAge" 的Table Table table2 = tableEnv.fromDataStream(stream, "myName, myAge"); // 獲取一個 TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // 在`RowTypeInfo`中指定字段"name" and "age"的Row類型DataStream val stream: DataStream[Row] = ... // 將 DataStream 轉換爲具備字段 'name, 'age' 的Table val table1: Table = tableEnv.fromDataStream(stream) // 將 DataStream 轉換爲具備字段 'myName, 'myAge' 的Table val table2: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
Apache Flink使用Apache Calcite來優化和翻譯查詢,當前的查詢優化包括投影、過濾下推、子查詢去相關及各類形式的查詢重寫。Flink不去優化join的順序,可是會根據它們的順序去執行(FROM子句中表的順序或者WHERE子句中鏈接謂詞的順序)。
能夠經過提供一個CalciteConfig對象來調整在不一樣階段應用的優化規則集,這個能夠經過調用CalciteConfig.createBuilder())
得到的builder來建立,而且能夠經過調用tableEnv.getConfig.setCalciteConfig(calciteConfig)
來提供給TableEnvironment。
解析一個Table
Table API爲計算一個Table提供了一個機制來解析邏輯和優化查詢計劃,這個能夠經過調用TableEnvironment.explain(table)
方法來完成,它會返回描述三個計劃的字符串:
一、關係查詢語法樹,即未優化的查詢計劃
二、優化後的邏輯查詢計劃
三、物理執行計劃
如下代碼顯示了一個示例和相應的輸出:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello")); DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello")); Table table1 = tEnv.fromDataStream(stream1, "count, word"); Table table2 = tEnv.fromDataStream(stream2, "count, word"); Table table = table1 .where("LIKE(word, 'F%')") .unionAll(table2); String explanation = tEnv.explain(table); System.out.println(explanation); val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) val table = table1 .where('word.like("F%")) .unionAll(table2) val explanation: String = tEnv.explain(table) println(explanation)
輸出以下:
== Abstract Syntax Tree == LogicalUnion(all=[true]) LogicalFilter(condition=[LIKE($1, 'F%')]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalTableScan(table=[[_DataStreamTable_1]]) == Optimized Logical Plan == DataStreamUnion(union=[count, word]) DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')]) DataStreamScan(table=[[_DataStreamTable_0]]) DataStreamScan(table=[[_DataStreamTable_1]]) == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat Stage 2 : Data Source content : collect elements with CollectionInputFormat Stage 3 : Operator content : from: (count, word) ship_strategy : REBALANCE Stage 4 : Operator content : where: (LIKE(word, 'F%')), select: (count, word) ship_strategy : FORWARD Stage 5 : Operator content : from: (count, word) ship_strategy : REBALANCE