Apache Flink 零基礎入門(七):Table API 編程

做者:程鶴羣(軍長)java

文章概述:本文主要包含三部分:第一部分,主要介紹什麼是 Table API,從概念角度進行分析,讓你們有一個感性的認識;第二部分,從代碼的層面介紹怎麼使用 Table API;第三部分,介紹 Table API 近期的動態。文章結構以下:python

  • 什麼是 Table APIgit

    • Flink API 總覽
    • Table API 的特性
  • Table API 編程github

    • WordCount 示例
    • Table API 操做sql

      • 如何獲取一個 Table
      • 若是輸出一個 Table
      • 若是查詢一個 Table
  • Table API 動態

1、什麼是 Table API

爲了更好地瞭解 Table API,咱們先看下 Flink 都提供了哪些 API 供用戶使用。apache

1.Flink API 總覽

01 flink_api.png

如圖,Flink 根據使用的便捷性和表達能力的強弱提供了 3 層 API,由上到下,表達能力逐漸加強,好比 processFunction,是最底層的 API,表達能力最強,咱們能夠用他來操做 state 和 timer 等複雜功能。Datastream API 相對於 processFunction 來講,又進行了進一步封裝,提供了不少標準的語義算子給你們使用,好比咱們經常使用的 window 算子(包括 Tumble, slide,session 等)。那麼最上面的 SQL 和 Table API 使用最爲便捷,具備自身的不少特色,重點概括以下:編程

02 table_api_and_sql.png

第一,Table API & SQL 是一種聲明式的 API。用戶只需關心作什麼,不用關心怎麼作,好比圖中的 WordCount 例子,只須要關心按什麼維度聚合,作哪一種類型的聚合,不須要關心底層的實現。api

第二,高性能。Table API & SQL 底層會有優化器對 query 進行優化。舉個例子,假如 WordCount 的例子裏寫了兩個 count 操做,優化器會識別並避免重複的計算,計算的時候只保留一個 count 操做,輸出的時候再把相同的值輸出兩遍便可,以達到更好的性能。緩存

第三,流批統一。上圖例子能夠發現,API 並無區分流和批,同一套 query 能夠流批覆用,對業務開發來講,避免開發兩套代碼。性能優化

第四,標準穩定。Table API & SQL 遵循 SQL 標準,不易變更。API 比較穩定的好處是不用考慮 API 兼容性問題。

第五,易理解。語義明確,所見即所得。

2.Table API 特性

上一小節介紹了 Table API 和 SQL 一些共有的特性,這個小節重點介紹下 Table API 自身的特性。主要能夠概括爲如下兩點:

03 tableapi_properties.png

第一,Table API 使得多聲明的數據處理寫起來比較容易。

怎麼理解?好比咱們有一個 Table(tab),而且須要執行一些過濾操做而後輸出到結果表,對應的實現是:tab.where(「a < 10」).inertInto(「resultTable1」);此外,咱們還須要作另一些篩選,而後也對結果輸出,即 tab.where(「a > 100」).insertInto(「resultTable2」)。你會發現,用 Table API 寫起來會很是簡潔方便,兩行代碼就把功能實現了。

第二,Table API 是 Flink 自身的一套 API,這使得咱們更容易地去擴展標準的 SQL。固然,在擴展 SQL 的時候並非隨意的去擴展,須要考慮 API 的語義、原子性和正交性,而且當且僅當須要的時候纔去添加。

對比 SQL,咱們能夠認爲 Table API 是 SQL 的超集。SQL 有的操做,Table API 能夠有,然而咱們又能夠從易用性和功能性地角度對 SQL 進行擴展和提高。

2、Table API編程

第一章介紹了 Table API 相關的概念。這一章咱們來看下如何用 Table API 來編程。本章會先從一個 WordCount 的例子出發,讓你們對 Table API 編程先有一個大概的認識,而後再具體介紹一下 Table API 的操做,好比,如何獲取一個 Table,如何輸出一個 Table,以及如何對 Table 執行查詢操做。

1.WordCount舉例

這是一個完整的,用 java 編寫的 batch 版本的 WordCount 例子,此外,還有 scala 和 streaming 版本的 WordCount,都統一上傳到了 github 上(https://github.com/hequn8128/TableApiDemo),你們能夠下載下來嘗試運行或者修改。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class JavaBatchWordCount {   // line:10

    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        String path = JavaBatchWordCount.class.getClassLoader().getResource("words.txt").getPath();
        tEnv.connect(new FileSystem().path(path))
            .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
            .withSchema(new Schema().field("word", Types.STRING))
            .registerTableSource("fileSource");  // line:20

        Table result = tEnv.scan("fileSource")
            .groupBy("word")
            .select("word, count(1) as count");

        tEnv.toDataSet(result, Row.class).print();
    }
}

咱們具體看下這個 WordCount 的例子。首先,第1三、14行,是對 environment 的一些初始化,先經過 ExecutionEnvironment 的 getExecutionEnvironment 方法拿到執行環境,而後再經過 BatchTableEnvironment 的 create 拿到對應的 Table 環境,拿到環境後,咱們能夠註冊 TableSource、TableSink 或執行一些其餘操做。

這裏須要注意的是,ExecutionEnvironment 跟 BatchTableEnvironment 都是對應 Java 的版本,對於 scala 程序,這裏須要是一個對應 scala 版本的 environment。這也是初學者一開始可能會遇到的問題,由於 environent 有不少且容易混淆。爲了讓你們更好區分這些 environment,下面對 environment 進行了一些概括。

004 table_environment.png

這裏從 batch/stream,還有 Java/scala,對 environment 進行了分類,對於這些 environment 使用時須要特別注意,不要 import 錯了。environment 的問題,社區已經進行了一些討論,如上圖下方的連接,這裏再也不具體展開。

咱們再回到剛剛的 WordCount 的例子,拿到 environment 後,須要作的第二件事情是註冊對應的TableSource。

tEnv.connect(new FileSystem().path(path))
    .withFormat(new OldCsv().field("word", Types.STRING).lineDelimiter("\n"))
    .withSchema(new Schema().field("word", Types.STRING))
    .registerTableSource("fileSource");

使用起來也很是方便,首先,由於咱們要讀一個文件,須要指定讀取文件的路徑,指定了以後,咱們須要再描述文件內容的格式,好比他是 csv 的文件而且行分割符是什麼。還有就是指定這個文件對應的 Schema 是什麼,好比只有一列單詞,而且類型是 String。最後,咱們須要把 TableSource 註冊到 environment 裏去。

Table result = tEnv.scan("fileSource")
    .groupBy("word")
    .select("word, count(1) as count");

tEnv.toDataSet(result, Row.class).print();

經過 scan 剛纔註冊好的 TableSource,咱們能夠拿到一個 Table 對象,並執行相應的一些操做,好比 GroupBy,count。最後,能夠把 Table 按 DataSet 的方式進行輸出。

以上即是一個 Table API 的 WordCount 完整例子。涉及 Table 的獲取,Table 的操做,以及 Table 的輸出。接下來會具體介紹如何獲取 Table、輸出 Table 和執行 Table 操做。

2.如何獲取一個Table

獲取 Table 大致能夠分爲兩步,第一步,註冊對應的 TableSource;第二步,調用 Table environement 的 scan 方法獲取 Table 對象。註冊 Table Source 又有3種方法:經過 Table descriptor 來註冊,經過自定義 source 來註冊,或者經過 DataStream 來註冊。具體的註冊方式以下圖所示:

04 get_table.png

3.如何輸出一個Table

對應輸出 Table,咱們也有相似的3種方法:Table descriptor, 自定義 Table sink 以及輸出成一個 DataStream。以下圖所示:

05 emit_table.png

4.如何操做一個Table

4.1 Table 操做總覽

第二、3節介紹瞭如何獲取和輸出一個 Table,本節主要介紹如何對 Table 進行操做。Table 上有不少操做,好比一些 projection 操做 select、filter、where;聚合操做,如 groupBy、flatAggrgate;還有join操做,等等。咱們以一個具體的例子來介紹下 Table 上各操做的轉換流程。

06 api_on_table.png

如上圖,當咱們拿到一個 Table 後,調用 groupBy 會返回一個 GroupedTable。GroupedTable 裏只有 select 方法,對 GroupedTable 調用 select 方法會返回一個 Table。拿到這個 Table 後,咱們能夠再調用 Table 上的方法。圖中其餘 Table,如 OverWindowedTable 也是相似的流程。值得注意的是,引入各個類型的 Table 是爲了保證 API 的合法性和便利性,好比 groupBy 以後只有 select 操做是有意義的,在編輯器上能夠直接點出來。

前面咱們提到,能夠將 Table API 當作是 SQL 的超集,所以咱們也能夠對 Table 裏的操做按此進行分類,大體分爲三類,以下圖所示:

07 table_method_classification.png

第一類,是跟 SQL 對齊的一些操做,好比 select, filter, join 等。第二類,是一些提高 Table API 易用性的操做。第三類,是加強 Table API 功能的一些操做。第一類操做因爲和 SQL 相似,比較容易理解,其次,也能夠查看官方的文檔,瞭解具體的方法,因此這裏再也不展開介紹。下面的章節會重點介紹後兩類操做,這些操做也是 Table API 獨有的。

4.2 提高易用性相關操做

介紹易用性以前,咱們先來看一個問題。假設咱們有一張很大的表,裏面有一百列,此時須要去掉一列,那麼SQL怎麼寫?咱們須要 select 剩下的 99 列!顯然這會給用戶帶來不小的代價。爲了解決這個問題,咱們在Table上引入了一個 dropColumns 方法。利用 dropColumns 方法,咱們即可以只寫去掉的列。與此對應,還引入了 addColumns, addOrReplaceColumns 和 renameColumns 方法,以下圖所示:

08 dropColumns.png

解決了剛纔的問題後,咱們再看下面另外一個問題:假設仍是一張100列的表,咱們須要選第20到第80列,那麼咱們如何操做呢?爲了解決這個問題,咱們又引入了 withColumns 和 withoutColumns 方法。對於剛纔的問題,咱們能夠簡單地寫成 table.select(「withColumns(20 to 80)」)。

09 withColumns.png

4.3 加強功能相關操做

該小節會介紹下 TableAggregateFunction 的功能和用法。在引入 TableAggregateFunction 以前,Flink 裏有三種自定義函數:ScalarFunction,TableFunction 和 AggregateFunction。咱們能夠從輸入和輸出的維度對這些自定義函數進行分類。以下圖所示,ScalarFunction 是輸入一行,輸出一行;TableFunction 是輸入一行,輸出多行;AggregateFunction 是輸入多行輸出一行。爲了讓語義更加完整,Table API 新加了 TableAggregateFunction,它能夠接收和輸出多行。TableAggregateFunction 添加後,Table API 的功能能夠獲得很大的擴展,某種程度上能夠用它來實現自定義 operator。好比,咱們能夠用 TableAggregateFunction 來實現 TopN。

10 table_aggregate_function.png

TableAggregateFunction 使用也很簡單,方法簽名和用法以下圖所示:

11 flatAggregate.png

用法上,咱們只須要調用 table.flatAggregate(),而後傳入一個 TableAggregateFunction 實例便可。用戶能夠繼承 TableAggregateFunction 來實現自定義的函數。繼承的時候,須要先定義一個 Accumulator,用來存取狀態,此外自定義的 TableAggregateFunction 須要實現 accumulate 和 emitValue 方法。accumulate 方法用來處理輸入的數據,而 emitValue 方法負責根據 accumulator 裏的狀態輸出結果。

3、Table API 動態

最後介紹下 Table API 近期的動態:

1.Flip-29

主要是 Table API 功能和易用性的加強。好比剛剛介紹的 columns 相關操做,還有 TableAggregateFunction。

社區對應的 jira 是:https://issues.apache.org/jira/browse/FLINK-10972

2.Python Table API

但願在 Table API 上增長 python 語言的支持。這個應該是 Python 用戶的福音。

社區對應的 jira 是:https://issues.apache.org/jira/browse/FLINK-12308

3.Interactive Programming(交互式編程)

即 Table 上會提供一個 cache 算子,執行 cache 操做能夠緩存 table 的結果,並在這個結果上作其餘操做。社區對應 jira 是:https://issues.apache.org/jira/browse/FLINK-11199

4.Iterative Processing(迭代計算)

Table 上會支持一個 iterator 的算子,該算子能夠用來執行迭代計算。好比迭代 100 次,或者指定一個收斂的條件,在機器學習領域使用比較普遍。社區對應 jira 是:https://issues.apache.org/jira/browse/FLINK-11199


▼ Apache Flink 社區推薦 ▼

Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:

https://developer.aliyun.com/...

首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:

https://tianchi.aliyun.com/ma...

相關文章
相關標籤/搜索