Flink SQL 系列 | 5 個 TableEnvironment 我該用哪一個?

本文爲 Flink SQL 系列文章的第二篇,前面對 Flink 1.9 Table 新架構及 Planner 的使用進行了詳細說明,本文詳細講解 5 個 TableEnvironment 及其適用場景,並介紹 Flink 社區對 TableEnvironment 的將來規劃。主要內容以下:
  1. TableEnvironment 簡介
  2. 5 個 TableEnvironment 梳理
  3. 如何使用 TableEnvironment
  4. 社區將來規劃

1. TableEnvironment 簡介

TableEnvironment 是用來建立 Table & SQL 程序的上下文執行環境 ,也是 Table & SQL 程序的入口,Table & SQL 程序的全部功能都是圍繞 TableEnvironment 這個核心類展開的。TableEnvironment 的主要職能包括:對接外部系統,表及元數據的註冊和檢索,執行SQL語句,提供更詳細的配置選項。
在 Flink 1.8 中,一共有 7 個 TableEnvironment ,在最新的 Flink 1.9 中,社區進行了重構和優化,只保留了 5 個TableEnvironment 。本文詳細講解 5 個 TableEnvironment 及其適用場景,並介紹 Flink 社區對 TableEnvironment 的將來規劃。

2. 5 個 TableEnvironment 梳理

Flink 1.9 中保留了 5 個 TableEnvironment,在實現上是 5 個面向用戶的接口,在接口底層進行了不一樣的實現。5 個接口包括一個 TableEnvironment 接口,兩個 BatchTableEnvironment 接口,兩個 StreamTableEnvironment 接口,5 個接口文件完整路徑以下:
  • org/apache/flink/table/api/TableEnvironment.java
  • org/apache/flink/table/api/java/BatchTableEnvironment.java
  • org/apache/flink/table/api/scala/BatchTableEnvironment.scala
  • org/apache/flink/table/api/java/StreamTableEnvironment.java
  • org/apache/flink/table/api/scala/StreamTableEnvironment.scala
結合文件的路徑,梳理這 5 個接口,咱們會發現 TableEnvironment 是頂級接口,是全部 TableEnvironment 的基類 ,BatchTableEnvironment 和 StreamTableEnvironment 都提供了 Java 實現和 Scala 實現 ,分別有兩個接口。




5 個 TableEnvironment
其中,TableEnvironment 做爲統一的接口,其統一性體如今兩個方面,一是對於全部基於JVM的語言(即 Scala API 和 Java API 之間沒有區別)是統一的;二是對於 unbounded data (無界數據,即流數據) 和 bounded data (有界數據,即批數據)的處理是統一的。TableEnvironment 提供的是一個純 Table 生態的上下文環境,適用於整個做業都使用 Table API & SQL 編寫程序的場景。TableEnvironment 目前還不支持註冊 UDTF 和 UDAF,用戶有註冊 UDTF 和 UDAF 的需求時,能夠選擇使用其餘 TableEnvironment。
兩個 StreamTableEnvironment 分別用於 Java 的流計算和 Scala 的流計算場景,流計算的對象分別是 Java 的 DataStream 和 Scala 的 DataStream。相比 TableEnvironment,StreamTableEnvironment 提供了 DataStream 和 Table 之間相互轉換的接口,若是用戶的程序除了使用 Table API & SQL 編寫外,還須要使用到 DataStream API,則須要使用 StreamTableEnvironment。
兩個 BatchTableEnvironment 分別用於 Java 的批處理場景和 Scala 的批處理場景,批處理的對象分別是 Java 的 DataSet 和 Scala 的 DataSet。相比 TableEnvironment,BatchTableEnvironment 提供了 DataSet 和 Table 之間相互轉換的接口,若是用戶的程序除了使用 Table API & SQL 編寫外,還須要使用到 DataSet API,則須要使用 BatchTableEnvironment。
從這五個 TableEnvironment 支持的做業類型 ( Stream 做業和 Batch 做業),支持的 API 類型(DataStream API 和 DataSet API),以及對 UDTF/UDAF 的支持這 5 個方面進行對比,各個TableEnvironment 支持的功能能夠概括以下:




TableEnvironment 支持功能對比
可能你們會疑惑爲何在 API 須要區分 Java 和 Scala 的兩個 StreamTableEnvironment(或BatchTableEnvironment ),使用的 DataStream也分爲 Java DataStream 和 Scala DataStream。
緣由主要是 TableEnvironment 的 registerTableFunction方法(用於註冊UDTF) 和 registerAggregateFunction 方法(用戶註冊UDAF) 須要抽取泛型,而現有的 Java 泛型抽取和 Scala 的泛型抽取機制是不同的,Java 的抽取是經過反射機制 實現,而 Scala 是經過 Scala macro 實現。此外,因爲抽取泛型機制的不一致,做爲統一入口的 TableEnvironment 現階段也不支持註冊 UDTF 和 UDAF。針對這個問題,社區已經在計劃引入一套新的類型抽取機制來統一 Java 和 Scala 的類型抽取,實現 Java API 和 Scala API 的統一。




5 個 TableEnvironment 具體實現
結合 Flink planner 和 Blink planner, 進一步梳理 TableEnvironment 的組織關係,咱們能夠注意到一些有趣的細節:
  • 實現流批統一的 Blink planner 中因爲沒有了 DataSet 的概念,已經再也不使用 BatchTableEnvironment,只會使用 TableEnvironment 和 StreamTableEnvironment,而 Flink planner(即 Old planner) 則支持 5 個 TableEnvironment。
  • BatchTableEnvironment 的實現都放到了 Old planner (flink-table-palnner模塊) 中,這個模塊在社區的將來規劃中是會被逐步刪除的。

3. 如何使用 TableEnvironment

根據用戶使用的 planner 和做業的類型,能夠把各個 TableEnvironment 的應用場景分爲 4 類,下面結合代碼來講明在不一樣的場景下如何使用 TableEnvironment 。

場景一:

用戶使用 Old planner,進行流計算的 Table 程序(使用 Table API 或 SQL 進行開發的程序 )的開發。這種場景下,用戶可使用 StreamTableEnvironment 或 TableEnvironment ,二者的區別是 StreamTableEnvironment 額外提供了與 DataStream API 交互的接口。示例代碼以下:
// **********************
// FLINK STREAMING QUERY USING JAVA
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// **********************
// FLINK STREAMING QUERY USING SCALA
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)複製代碼

場景二:

用戶使用 Old planner,進行批處理的 Table 程序的開發。這種場景下,用戶只能使用 BatchTableEnvironment ,由於在使用 Old planner 時,批處理程序操做的數據是 DataSet,只有 BatchTableEnvironment 提供了面向DataSet 的接口實現。示例代碼以下:
// ******************
// FLINK BATCH QUERY USING JAVA
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
// ******************
// FLINK BATCH QUERY USING SCALA
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)複製代碼

場景三:

用戶使用 Blink planner,進行流計算的 Table 程序的開發。這種場景下,用戶可使用 StreamTableEnvironment 或 TableEnvironment ,二者的區別是 StreamTableEnvironment 額外提供與 DataStream API 交互的接口。用戶在 EnvironmentSettings 中聲明使用 Blink planner ,將執行模式設置爲 StreamingMode 便可。示例代碼以下:
// **********************
// BLINK STREAMING QUERY USING JAVA
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// **********************
// BLINK STREAMING QUERY USING SCALA
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)複製代碼

場景四:

用戶使用 Blink planner,進行批處理的 Table 程序的開發。這種場景下,用戶只能使用 TableEnvironment ,由於在使用 Blink planner 時,批處理程序操做的數據已是 bounded DataStream,因此不能使用 BatchTableEnvironment 。用戶在 EnvironmentSettings 中聲明使用 Blink planner ,將執行模式設置爲 BatchMode 便可。值得注意的是,TableEnvironment 接口的具體實現中已經支持了 StreamingMode 和 BatchMode 兩種模式,而 StreamTableEnvironment 接口的具體實現中目前暫不支持 BatchMode 的配置,因此這種場景不能使用 StreamTableEnvironment。示例代碼以下:
// ******************
// BLINK BATCH QUERY USING JAVA
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
// ******************
// BLINK BATCH QUERY USING SCALA
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)複製代碼

4. 社區將來規劃

目前,社區正在推動 DataStream 的批處理能力,以實現流批技術棧的統一,屆時 DataSet API 會退出歷史的舞臺,兩個 BatchTableEnvironment 也將退出歷史的舞臺。同時社區也在努力推進 Java 和 Scala TableEnvironment 的統一。能夠預見的是,Flink TableEnvironment 的將來架構會更加簡潔。TableEnvironment 會是 Flink 推薦使用的入口類,同時能支持 Java API 和 Scala API,還能同時支持流計算做業和批處理做業。只有當須要與 DataStream 作轉換時,才須要用到 StreamTableEnvironment。
本文做者:徐榜江(雪盡)
本文爲雲棲社區原創內容,未經容許不得轉載。
相關文章
相關標籤/搜索