Flink SQL 的 9 個示例

TableEnvironment

FLIP-32 中提出,將 Blink 徹底開源,合併到 Flink 主分支中。合併後在 Flink 1.9 中會存在兩個 Planner:Flink Planner 和 Blink Planner。java

在以前的版本中,Flink Table 在整個 Flink 中是一個二等公民。而 Flink SQL 具有的易用性、使用門檻低等特色深受用戶好評,愈來愈被重視,Flink Table 模塊也所以被提高爲一等公民。而 Blink 在設計之初就考慮到流和批的統一,批只是流的一種特殊形式,因此能夠用同一個TableEnvironment來表述流和批。算法

TableEnvironment 總體設計

640.png

圖1 新 Table Environment 總體設計sql

從圖 1 中,能夠看出,TableEnvironment 組成部分以下:編程

  • flink-table-common:這個包中主要是包含 Flink Planner 和 Blink Planner 一些共用的代碼。
  • flink-table-api-java:這部分是用戶編程使用的 API,包含了大部分的 API。
  • flink-table-api-scala:這裏只是很是薄的一層,僅和 Table API 的 Expression 和 DSL 相關。
  • 兩個 Planner:flink-table-planner 和 flink-table-planner-blink。
  • 兩個 Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge,從圖中能夠看出,Flink Planner 和 Blink Planner 都會依賴於具體的 JAVA API,也會依賴於具體的 Bridge,經過 Bridge 能夠將 API 操做相應的轉化爲 Scala 的 DataStream、DataSet,或者轉化爲 JAVA 的 DataStream 或者 Data Set。

新舊 TableEnvironment 對比

在 Flink 1.9 以前,原來的 Flink Table 模塊,有 7 個 Environment,使用和維護上相對困難。7 個 Environment 包括:StreamTableEnvironment、BatchTableEnvironment 兩類,JAVA 和 Scala 分別 2 個,一共 4 個,加上 3 個父類,一共就是 7 個。api

在新的框架之下,社區但願流和批統一,所以對原來的設計進行精簡。首先,提供統一的 TableEnvironment,放在 flink-table-api-java 這個包中。而後,在 Bridge 中,提供了兩個用於銜接 Scala DataStream 和 Java DataStream 的 StreamTableEnvironment。最後,由於 Flink Planner 中還殘存在着 toDataSet() 相似的操做,因此,暫時保留 BatchTableEnvironment。這樣,目前一共是 5 個 TableEnvironment。緩存

由於將來 Flink Planner 將會被移除,BatchTableEnvironment 就會被廢棄,整個 TableEnvironment 的設計也會更加簡潔明瞭。性能優化

新 TableEnvironment 的應用

本節中,將介紹新的應用場景以及相關限制。下圖詳細列出了新 TableEnvironment 的適用場景:網絡

640 - 2.png

圖2 新 Table Environment 適應場景數據結構

第一行,簡單起見,在後續將新的 TableEnvironment 稱爲 UnifyTableEnvironment。在 Blink 中,Batch 被認爲是 Stream 的一個特例,所以 Blink 的 Batch 可使用 UnifyTableEnvironment。app

UnifyTableEnvironment 在 1.9 中有一些限制,好比它不可以註冊 UDAF 和 UDTF,當前新的 Type System 的類型推導功能尚未完成(Java、Scala 的類型推導還沒統一),因此這部分的功能暫時不支持。此外,UnifyTableEnvironment 沒法和 DataStream 和 DataSet 互轉。

第二行,Stream TableEnvironment 支持轉化成 DataStream,也能夠註冊 UDAF 和 UDTF。若是是 JAVA 寫的,就註冊到 JAVA 的 StreamTableEnvironment,若是是用 Scala 寫的,就註冊到 Scala 的 StreamTableEnvironment。

注意,Blink Batch 做業不支持 Stream TableEnvironment ,由於目前 Batch 無法和 DataStream 互轉,因此 toDataStream() 這樣的語義暫時不支持。從圖中也能夠看出,目前Blink Batch只能使用 TableEnvironment。

最後一行,BatchTableEvironment 可以使用 toDataSet() 轉化爲 DataSet。

從上面的圖 2 中,能夠很清晰的看出各個 TableEnvironment 可以作什麼事情,以及他們有哪些限制。

接下來,將使用示例對各類狀況進行說明。

示例1:Blink Batch

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv…
tEnv.execute(「job name」);

從圖 2 中能夠看出,Blink Batch 只能使用 TableEnvironment(即UnifyTableEnvironment),代碼中,首先須要建立一個 EnvironmentSetting,同時指定使用 Blink Planner,而且指定用 Batch 模式。之因此須要指定 Blink Planner,是由於目前 Flink 1.9 中,將 Flink Planner 和 Blink Planner 的 jar 同時放在了 Flink 的 lib 目錄下。若是不指定使用的 Planner,整個框架並不知道須要使用哪一個 Planner,因此必須顯示的指定。固然,若是 lib 下面只有一個 Planner 的 jar,這時不須要顯示指定使用哪一個 Planner。

另外,還須要注意的是在 UnifyEnvironment 中,用戶是沒法獲取到 ExecutionEnvironment 的,即用戶沒法在寫完做業流程後,使用 executionEnvironment.execute() 方法啓動任務。須要顯式的使用 tableEnvironment.execute() 方法啓動任務,這和以前的做業啓動很不相同。

示例 2:Blink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment execEnv = …
StreamTableEnvironment tEnv =  StreamTableEnvironment.create(execEnv, settings);
tEnv…

Blink Stream 既可使用 UnifyTableEnvironment,也可使用 StreamTableEnvironment,與 Batch 模式基本相似,只是須要將 inBatchMode 換成 inStreamingMode。

示例 3:Flink Batch

ExecutionEnvironment execEnv = ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
tEnv...

與以前沒有變化,不作過多介紹。

示例 4:Flink Stream

EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv…
tEnv.execute(「job name」);

Flink Stream 也是同時支持 UnifyEnvironment 和 StreamTableEnvironment,只是在指定 Planner 時,須要指定爲 useOldPlanner,也即 Flink Planner。由於將來 Flink Planner 會被移除,所以,特地起了一個 OlderPlanner 的名字,並且只可以使用 inStreamingMode,沒法使用 inBatchMode。

Catalog 和 DDL

構建一個新的 Catalog API 主要是 FLIP-30 提出的,以前的 ExternalCatalog 將被廢棄,Blink Planner 中已經不支持 ExternalCatalog 了,Flink Planner 還支持 ExternalCatalog。

新 Catalog 設計

下圖是新 Catalog 的總體設計:

640 3.png
圖3 新 Catalog 設計

能夠看到,新的 Catalog 有三層結構(..),最頂層是 Catalog 的名字,中間一層是 Database,最底層是各類 MetaObject,如 Table,Partition,Function 等。當前,內置了兩個 Catalog 實現:MemoryCatalog 和 HiveCatalog。固然,用戶也能夠實現本身的 Catalog。

Catalog 可以作什麼事情呢?首先,它能夠支持 Create,Drop,List,Alter,Exists 等語句,另外它也支持對 Database,Table,Partition,Function,Statistics 等的操做。基本上,經常使用的 SQL 語法都已經支持。

CatalogManager 正如它名字同樣,主要是用來管理 Catalog,且能夠同時管理多個 Catalog。也就是說,能夠經過在一個相同 SQL 中,跨 Catalog 作查詢或者關聯操做。例如,支持對 A Hive Catalog 和 B Hive Catalog 作相互關聯,這給 Flink 的查詢帶來了很大的靈活性。

CatalogManager 支持的操做包括:

  • 註冊 Catalog(registerCatalog)
  • 獲取全部的 Catalog(getCatalogs)
  • 獲取特定的 Catalog(getCatalog)
  • 獲取當前的 Catalog(getCurrentCatalog)
  • 設置當前的 Catalog(setCurrentCatalog)
  • 獲取當前的 Database(getCurrentDatabase)
  • 設置當前的 Database(setCurrentDatabase)

Catalog 雖然設計了三層結構,但在使用的時候,並不須要徹底指定三層結構的值,能夠只寫Table Name,這時候,系統會使用 getCurrentCatalog,getCurrentDatabase 獲取到默認值,自動補齊三層結構,這種設計簡化了對 Catalog 的使用。若是須要切換默認的 Catalog,只須要調用 setCurrentCatalog 就能夠了。

在 TableEnvironment 層,提供了操做 Catalog 的方法,例如:

  • 註冊 Catalog(registerCatalog)
  • 列出全部的 Catalog(listCatalogs)
  • 獲取指定 Catalog(getCatalog)
  • 使用某個 Catalog(useCatalog)

在 SQL Client 層,也作了必定的支持,可是功能有必定的限制。用戶不可以使用 Create 語句直接建立 Catalog,只能經過在 yarn 文件中,經過定義 Description 的方式去描述 Catalog,而後在啓動 SQL Client 的時候,經過傳入 -e +file_path 的方式,定義 Catalog。目前 SQL Client 支持列出已定義的 Catalog,使用一個已經存在的 Catalog 等操做。

DDL 設計與使用

有了 Catalog,就可使用 DDL 來操做 Catalog 的內容,可使用 TableEnvironment 的 sqlUpdate() 方法執行 DDL 語句,也能夠在 SQL Client 執行 DDL 語句。

sqlUpdate() 方法中,支持 Create Table、Create View、Drop Table、Drop View 四個命令。固然,inset into 這樣的語句也是支持的。

下面分別對 4 個命令進行說明:

  • Create Table:能夠顯示的指定 Catalog Name 或者 DB Name,若是缺省,那就按照用戶設定的 Current Catalog 去補齊,而後能夠指定字段名稱,字段的說明,也能夠支持 Partition By 語法。最後是一個 With 參數,用戶能夠在此處指定使用的 Connector,例如,Kafka,CSV,HBase 等。With 參數須要配置一堆的屬性值,能夠從各個 Connector 的 Factory 定義中找到。Factory 中會指出有哪些必選屬性,哪些可選屬性值。

須要注意的是,目前 DDL 中,還不支持計算列和 Watermark 的定義,後續的版本中將會繼續完善這部分。

Create Table [[catalog_name.]db_name.]table_name(
  a int comment 'column comment',
  b bigint,
  c varchar
)comment 'table comment'
[partitioned by(b)]
With(
    update-mode='append',
    connector.type='kafka',
    ...
)
  • Create View:須要指定 View 的名字,而後緊跟着的是 SQL。View 將會存儲在 Catalog 中。
CREATE VIEW view_name AS SELECT xxx
  • Drop Table&Drop View:和標準 SQL 語法差很少,支持使用 IF EXISTS 語法,若是未加 IF EXISTS ,Drop 一個不存在的表,會拋出異常。
DROP TABLE [IF EXISTS] [[catalog_name.]db_name.]table_name
  • SQL Client 中執行 DDL:大部分都只支持查看操做,僅可使用 Create View 和 Drop View。Catalog,Database,Table ,Function 這些只能作查看。用戶能夠在 SQL Client 中 Use 一個已經存在的 Catalog,修改一些屬性,或者作 Description,Explain 這樣的一些操做。
CREATE VIEW
DROP VIEW
SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS l USE CATALOG xxx
SET xxx=yyy
DESCRIBE table_name
EXPLAIN SELECT xxx

DDL 部分,在 Flink 1.9 中其實基本已經成型,只是還有一些特性,在將來須要逐漸的完善。

Blink Planner

本節將主要從 SQL/Table API 如何轉化爲真正的 Job Graph 的流程開始,讓你們對 Blink Planner 有一個比較清晰的認識,但願對你們閱讀 Blink 代碼,或者使用 Blink 方面有所幫助。而後介紹 Blink Planner 的改進及優化。

640 4.png

圖4 主要流程

從上圖能夠很清楚的看到,解析的過程涉及到了三層:Table API/SQL,Blink Planner,Runtime,下面將對主要的步驟進行講解。

  • Table API&SQL 解析驗證:在 Flink 1.9 中,Table API 進行了大量的重構,引入了一套新的 Operation,這套 Operation 主要是用來描述任務的 Logic Tree。

當 SQL 傳輸進來後,首先會去作 SQL 解析,SQL 解析完成以後,會獲得 SqlNode Tree(抽象語法樹),而後會緊接着去作 Validate(驗證),驗證時會去訪問 FunctionManger 和 CatalogManger,FunctionManger 主要是查詢用戶定義的 UDF,以及檢查 UDF 是否合法,CatalogManger 主要是檢查這個 Table 或者 Database 是否存在,若是驗證都經過,就會生成一個 Operation DAG(有向無環圖)。

從這一步能夠看出,Table API 和 SQL 在 Flink 中最終都會轉化爲統一的結構,即 Operation DAG。

  • 生成RelNode:Operation DAG 會被轉化爲 RelNode(關係表達式) DAG。

優化:優化器會對 RelNode 作各類優化,優化器的輸入是各類優化的規則,以及各類統計信息。當前,在 Blink Planner 裏面,絕大部分的優化規則,Stream 和 Batch 是共享的。差別在於,對 Batch 而言,它沒有 state 的概念,而對於 Stream 而言,它是不支持 sort 的,因此目前 Blink Planner 中,仍是運行了兩套獨立的規則集(Rule Set),而後定義了兩套獨立的 Physical Rel:BatchPhysical Rel 和 StreamPhysical Rel。優化器優化的結果,就是具體的 Physical Rel DAG。

  • 轉化:獲得 Physical Rel Dag 後,繼續會轉化爲 ExecNode,經過名字能夠看出,ExecNode 已經屬於執行層的概念了,可是這個執行層是 Blink 的執行層,在 ExecNode 中,會進行大量的 CodeGen 的操做,還有非 Code 的 Operator 操做,最後,將 ExecNode 轉化爲 Transformation DAG。
  • 生成可執行 Job Graph:獲得 Transformation DAG 後,最終會被轉化成 Job Graph,完成 SQL 或者 Table API 的解析。

Blink Planner 改進及優化

Blink Planner 功能方面改進主要包含以下幾個方面:

  • 更完整的 SQL 語法支持:例如,IN,EXISTS,NOT EXISTS,子查詢,完整的 Over 語句,Group Sets 等。並且已經跑通了全部的 TPCH,TPCDS 這兩個測試集,性能還很是不錯。
  • 提供了更豐富,高效的算子。
  • 提供了很是完善的 cost 模型,同時可以對接 Catalog 中的統計信息,使 cost 根據統計信息獲得更優的執行計劃。
  • 支持 join reorder。
  • shuffle service:對 Batch 而言,Blink Planner 還支持 shuffle service,這對 Batch 做業的穩定性有很是大的幫助,若是遇到 Batch 做業失敗,經過 shuffle service 可以很快的進行恢復。

性能方面,主要包括如下部分:

  • 分段優化。
  • Sub-Plan Reuse。
  • 更豐富的優化 Rule:共一百多個 Rule ,而且絕大多數 Rule 是 Stream 和 Batch 共享的。
  • 更高效的數據結構 BinaryRow:可以節省序列化和反序列化的操做。
  • mini-batch 支持(僅 Stream):節省 state 的訪問的操做。
  • 節省多餘的 Shuffle 和 Sort(Batch 模式):兩個算子之間,若是已經按 A 作 Shuffle,緊接着他下的下游也是須要按 A Shuffle 的數據,那中間的這一層 Shuffle,就能夠省略,這樣就能夠省不少網絡的開銷,Sort 的狀況也是相似。Sort 和 Shuffle 若是在整個計算裏面是佔大頭,對整個性能是有很大的提高的。

深刻性能優化及實踐

本節中,將使用具體的示例進行講解,讓你深刻理解 Blink Planner 性能優化的設計。

■ 分段優化

示例 5

create view MyView as select word, count(1) as freq from SourceTable group by word; insert into SinkTable1 select * from MyView where freq >10;
insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;

上面的這幾個 SQL,轉化爲 RelNode DAG,大體圖形以下:

640 5-1.png

圖5 示例5 RelNode DAG

若是是使用 Flink Planner,通過優化層後,會生成以下執行層的 DAG:

640 6.png

圖6 示例 5 old planner DAG

能夠看到,old planner 只是簡單的從 Sink 出發,反向的遍歷到 Source,從而造成兩個獨立的執行鏈路,從上圖也能夠清楚的看到,Scan 和第一層 Aggregate 是有重複計算的。

在 Blink Planner 中,通過優化層以後,會生成以下執行層的 DAG:

640 7.png

圖7 示例 5 Blink Planner DAG

Blink Planner 不是在每次調用 insert into 的時候就開始優化,而是先將全部的 insert into 操做緩存起來,等到執行前才進行優化,這樣就能夠看到完整的執行圖,能夠知道哪些部分是重複計算的。Blink Planner 經過尋找能夠優化的最大公共子圖,找到這些重複計算的部分。通過優化後,Blink Planner 會將最大公共子圖的部分當作一個臨時表,供其餘部分直接使用。

這樣,上面的圖能夠分爲三部分,最大公共子圖部分(臨時表),臨時表與 Filter 和 SinkTable1 優化,臨時表與第二個 Aggregate 和 SinkTable 2 優化。

Blink Planner 實際上是經過聲明的 View 找到最大公共子圖的,所以在開發過程當中,若是須要複用某段邏輯,就將其定義爲 View,這樣就能夠充分利用 Blink Planner 的分段優化功能,減小重複計算。

固然,當前的優化也不是最完美的,由於提早對圖進行了切割,可能會致使一些優化丟失,從此會持續地對這部分算法進行改進。

總結一下,Blink Planner 的分段優化,其實解的是多 Sink 優化問題(DAG 優化),單 Sink 不是分段優化關心的問題,單 Sink 能夠在全部節點上優化,不須要分段。

■ Sub-Plan Reuse

示例 6

insert into SinkTabl
select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'
union all
select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;

這個示例的 SQL 和分段優化的 SQL 實際上是相似的,不一樣的是,沒有將結果 Sink 到兩個 Table 裏面,而是將結果 Union 起來,Sink 到一個結果表裏面。

下面看一下轉化爲 RelNode 的 DAG 圖:

640 8.png

圖 8 示例 6 RelNode DAG

從上圖能夠看出,Scan 和第一層的 Aggregate 也是有重複計算的,Blink Planner 其實也會將其找出來,變成下面的圖:

640 9.png

圖9 示例 6 Blink Planner DAG

Sub-Plan 優化的啓用,有兩個相關的配置:

  • table.optimizer.reuse-sub-plan-enabled (默認開啓)
  • table.optimizer.reuse-source-enabled(默認開啓)

這兩個配置,默認都是開啓的,用戶能夠根據本身的需求進行關閉。這裏主要說明一下 table.optimizer.reuse-source-enabled 這個參數。在 Batch 模式下,join 操做可能會致使死鎖,具體場景是在執行 hash-join 或者 nested-loop-join 時必定是先讀 build 端,而後再讀 probe 端,若是啓用 reuse-source-enabled,當數據源是同一個 Source 的時候,Source 的數據會同時發送給 build 和 probe 端。這時候,build 端的數據將不會被消費,致使 join 操做沒法完成,整個 join 就被卡住了。

爲了解決死鎖問題,Blink Planner 會先將 probe 端的數據落盤,這樣 build 端讀數據的操做纔會正常,等 build 端的數據所有讀完以後,再從磁盤中拉取 probe 端的數據,從而解決死鎖問題。可是,落盤會有額外的開銷,會多一次寫的操做;有時候,讀兩次 Source 的開銷,可能比一次寫的操做更快,這時候,能夠關閉 reuse-source,性能會更好。

固然,若是讀兩次 Source 的開銷,遠大於一次落盤的開銷,能夠保持 reuse-source 開啓。須要說明的是,Stream 模式是不存在死鎖問題的,由於 Stream 模式 join 不會有選邊的問題。

總結而言,sub-plan reuse 解的問題是優化結果的子圖複用問題,它和分段優化相似,但他們是一個互補的過程。

注:Hash Join:對於兩張待 join 的表 t1, t2。選取其中的一張表按照 join 條件給的列創建hash 表。而後掃描另一張表,一行一行去建好的 hash 表判斷是否有對應相等的行來完成 join 操做,這個操做稱之爲 probe (探測)。前一張表叫作 build 表,後一張表的叫作 probe 表。

■ Agg 分類優化

Blink 中的 Aggregate 操做是很是豐富的:

  • group agg,例如:select count(a) from t group by b
  • over agg,例如:select count(a) over (partition by b order by c) from t
  • window agg,例如:select count(a) from t group by tumble(ts, interval '10' second), b
  • table agg ,例如:tEnv.scan('t').groupBy('a').flatAggregate(flatAggFunc('b' as ('c', 'd')))

下面主要對 Group Agg 優化進行講解,主要是兩類優化。

■ Local/Global Agg 優化

Local/Global Agg 主要是爲了減小網絡 Shuffle。要運用 Local/Global 的優化,必要條件以下:

  • Aggregate 的全部 Agg Function 都是 mergeable 的,每一個 Aggregate 須要實現 merge 方法,例如 SUM,COUNT,AVG,這些都是能夠分多階段完成,最終將結果合併;可是求中位數,計算 95% 這種相似的問題,沒法拆分爲多階段,所以,沒法運用 Local/Global 的優化。
  • table.optimizer.agg-phase-strategy 設置爲 AUTO 或者 TWO_PHASE。
  • Stream 模式下,mini-batch 開啓 ;Batch 模式下 AUTO 會根據 cost 模型加上統計數據,選擇是否進行 Local/Global 優化。

示例 7

select count(*) from t group by color

沒有優化的狀況下,下面的這個 Aggregate 會產生 10 次的 Shuffle 操做。

640 10.png

圖 10 示例 7 未作優化的 Count 操做

使用 Local/Global 優化後,會轉化爲下面的操做,會在本地先進行聚合,而後再進行 Shuffle 操做,整個 Shuffle 的數據剩下 6 條。在 Stream 模式下,Blink 其實會以 mini-batch 的維度對結果進行預聚合,而後將結果發送給 Global Agg 進行彙總。

640 11.png

圖 11 示例 7 通過 Local/Global 優化的 Count 操做

■ Distinct Agg 優化

Distinct Agg 進行優化,主要是對 SQL 語句進行改寫,達到優化的目的。但 Batch 模式和 Stream 模式解決的問題是不一樣的:

  • Batch 模式下的 Distinct Agg,須要先作 Distinct,再作 Agg,邏輯上須要兩步才能實現,直接實現 Distinct Agg 開銷太大。
  • Stream 模式下,主要是解決熱點問題,由於 Stream 須要將全部的輸入數據放在 State 裏面,若是數據有熱點,State 操做會很頻繁,這將影響性能。

Batch 模式

第一層,求 distinct 的值和非 distinct agg function 的值,第二層求 distinct agg function 的值。

示例 8

select color, count(distinct id), count(*) from t group by color

手工改寫成:

select color, count(id), min(cnt) from (
   select color, id, count(*) filter (where $e=2) as cnt from (       
      select color, id, 1 as $e from t --for distinct id 
      union all
      select color, null as id, 2 as $e from t -- for count(*) 
  ) group by color, id, $e 
) group by color

轉化的邏輯過程,以下圖所示:

640 12.png

圖 12 示例 8 Batch 模式 Distinct 改寫邏輯

Stream 模式

Stream 模式的啓用有一些必要條件:

  • 必須是支持的 agg function:avg/count/min/max/sum/first_value/concat_agg/single_value;
  • table.optimizer.distinct-agg.split.enabled(默認關閉)

示例 9

select color, count(distinct id), count(*) from t group by color

手工改寫成:

select color, sum(dcnt), sum(cnt) from (
  select color, count(distinct id) as dcnt, count(*) as cnt from t 
  group by color, mod(hash_code(id), 1024)
) group by color

改寫前,邏輯圖大概以下:

640 13.png

圖 13 示例 9 Stream 模式未優化 Distinct

改寫後,邏輯圖就會變爲下面這樣,熱點數據被打散到多箇中間節點上。

640 14.jpg

圖14 示例 9 Stream 模式優化 Distinct

須要注意的是,示例 5 的 SQL 中 mod(hash_code(id),1024)中的這個 1024 爲打散的維度,這個值建議設置大一些,設置過小產生的效果可能很差。

總結

本文首先對新的 TableEnvironment 的總體設計進行了介紹,而且列舉了各類模式下TableEnvironment 的選擇,而後經過具體的示例,展現了各類模式下代碼的寫法,以及須要注意的事項。

在新的 Catalog 和 DDL 部分,對 Catalog 的總體設計、DDL 的使用部分也都以實例進行拆分講解。最後,對 Blink Planner 解析 SQL/Table API 的流程、Blink Planner 的改進以及優化的原理進行了講解,但願對你們探索和使用 Flink SQL 有所幫助。

相關文章
相關標籤/搜索