FLIP-32 中提出,將 Blink 徹底開源,合併到 Flink 主分支中。合併後在 Flink 1.9 中會存在兩個 Planner:Flink Planner 和 Blink Planner。java
在以前的版本中,Flink Table 在整個 Flink 中是一個二等公民。而 Flink SQL 具有的易用性、使用門檻低等特色深受用戶好評,愈來愈被重視,Flink Table 模塊也所以被提高爲一等公民。而 Blink 在設計之初就考慮到流和批的統一,批只是流的一種特殊形式,因此能夠用同一個TableEnvironment來表述流和批。算法
圖1 新 Table Environment 總體設計sql
從圖 1 中,能夠看出,TableEnvironment 組成部分以下:編程
在 Flink 1.9 以前,原來的 Flink Table 模塊,有 7 個 Environment,使用和維護上相對困難。7 個 Environment 包括:StreamTableEnvironment、BatchTableEnvironment 兩類,JAVA 和 Scala 分別 2 個,一共 4 個,加上 3 個父類,一共就是 7 個。api
在新的框架之下,社區但願流和批統一,所以對原來的設計進行精簡。首先,提供統一的 TableEnvironment,放在 flink-table-api-java 這個包中。而後,在 Bridge 中,提供了兩個用於銜接 Scala DataStream 和 Java DataStream 的 StreamTableEnvironment。最後,由於 Flink Planner 中還殘存在着 toDataSet() 相似的操做,因此,暫時保留 BatchTableEnvironment。這樣,目前一共是 5 個 TableEnvironment。緩存
由於將來 Flink Planner 將會被移除,BatchTableEnvironment 就會被廢棄,整個 TableEnvironment 的設計也會更加簡潔明瞭。性能優化
本節中,將介紹新的應用場景以及相關限制。下圖詳細列出了新 TableEnvironment 的適用場景:網絡
圖2 新 Table Environment 適應場景數據結構
第一行,簡單起見,在後續將新的 TableEnvironment 稱爲 UnifyTableEnvironment。在 Blink 中,Batch 被認爲是 Stream 的一個特例,所以 Blink 的 Batch 可使用 UnifyTableEnvironment。app
UnifyTableEnvironment 在 1.9 中有一些限制,好比它不可以註冊 UDAF 和 UDTF,當前新的 Type System 的類型推導功能尚未完成(Java、Scala 的類型推導還沒統一),因此這部分的功能暫時不支持。此外,UnifyTableEnvironment 沒法和 DataStream 和 DataSet 互轉。
第二行,Stream TableEnvironment 支持轉化成 DataStream,也能夠註冊 UDAF 和 UDTF。若是是 JAVA 寫的,就註冊到 JAVA 的 StreamTableEnvironment,若是是用 Scala 寫的,就註冊到 Scala 的 StreamTableEnvironment。
注意,Blink Batch 做業不支持 Stream TableEnvironment ,由於目前 Batch 無法和 DataStream 互轉,因此 toDataStream() 這樣的語義暫時不支持。從圖中也能夠看出,目前Blink Batch只能使用 TableEnvironment。
最後一行,BatchTableEvironment 可以使用 toDataSet() 轉化爲 DataSet。
從上面的圖 2 中,能夠很清晰的看出各個 TableEnvironment 可以作什麼事情,以及他們有哪些限制。
接下來,將使用示例對各類狀況進行說明。
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv… tEnv.execute(「job name」);
從圖 2 中能夠看出,Blink Batch 只能使用 TableEnvironment(即UnifyTableEnvironment),代碼中,首先須要建立一個 EnvironmentSetting,同時指定使用 Blink Planner,而且指定用 Batch 模式。之因此須要指定 Blink Planner,是由於目前 Flink 1.9 中,將 Flink Planner 和 Blink Planner 的 jar 同時放在了 Flink 的 lib 目錄下。若是不指定使用的 Planner,整個框架並不知道須要使用哪一個 Planner,因此必須顯示的指定。固然,若是 lib 下面只有一個 Planner 的 jar,這時不須要顯示指定使用哪一個 Planner。
另外,還須要注意的是在 UnifyEnvironment 中,用戶是沒法獲取到 ExecutionEnvironment 的,即用戶沒法在寫完做業流程後,使用 executionEnvironment.execute() 方法啓動任務。須要顯式的使用 tableEnvironment.execute() 方法啓動任務,這和以前的做業啓動很不相同。
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment execEnv = … StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv, settings); tEnv…
Blink Stream 既可使用 UnifyTableEnvironment,也可使用 StreamTableEnvironment,與 Batch 模式基本相似,只是須要將 inBatchMode 換成 inStreamingMode。
ExecutionEnvironment execEnv = ... BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv); tEnv...
與以前沒有變化,不作過多介紹。
EnvironmentSettings settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); tEnv… tEnv.execute(「job name」);
Flink Stream 也是同時支持 UnifyEnvironment 和 StreamTableEnvironment,只是在指定 Planner 時,須要指定爲 useOldPlanner,也即 Flink Planner。由於將來 Flink Planner 會被移除,所以,特地起了一個 OlderPlanner 的名字,並且只可以使用 inStreamingMode,沒法使用 inBatchMode。
構建一個新的 Catalog API 主要是 FLIP-30 提出的,以前的 ExternalCatalog 將被廢棄,Blink Planner 中已經不支持 ExternalCatalog 了,Flink Planner 還支持 ExternalCatalog。
下圖是新 Catalog 的總體設計:
圖3 新 Catalog 設計
能夠看到,新的 Catalog 有三層結構(..),最頂層是 Catalog 的名字,中間一層是 Database,最底層是各類 MetaObject,如 Table,Partition,Function 等。當前,內置了兩個 Catalog 實現:MemoryCatalog 和 HiveCatalog。固然,用戶也能夠實現本身的 Catalog。
Catalog 可以作什麼事情呢?首先,它能夠支持 Create,Drop,List,Alter,Exists 等語句,另外它也支持對 Database,Table,Partition,Function,Statistics 等的操做。基本上,經常使用的 SQL 語法都已經支持。
CatalogManager 正如它名字同樣,主要是用來管理 Catalog,且能夠同時管理多個 Catalog。也就是說,能夠經過在一個相同 SQL 中,跨 Catalog 作查詢或者關聯操做。例如,支持對 A Hive Catalog 和 B Hive Catalog 作相互關聯,這給 Flink 的查詢帶來了很大的靈活性。
CatalogManager 支持的操做包括:
Catalog 雖然設計了三層結構,但在使用的時候,並不須要徹底指定三層結構的值,能夠只寫Table Name,這時候,系統會使用 getCurrentCatalog,getCurrentDatabase 獲取到默認值,自動補齊三層結構,這種設計簡化了對 Catalog 的使用。若是須要切換默認的 Catalog,只須要調用 setCurrentCatalog 就能夠了。
在 TableEnvironment 層,提供了操做 Catalog 的方法,例如:
在 SQL Client 層,也作了必定的支持,可是功能有必定的限制。用戶不可以使用 Create 語句直接建立 Catalog,只能經過在 yarn 文件中,經過定義 Description 的方式去描述 Catalog,而後在啓動 SQL Client 的時候,經過傳入 -e +file_path 的方式,定義 Catalog。目前 SQL Client 支持列出已定義的 Catalog,使用一個已經存在的 Catalog 等操做。
有了 Catalog,就可使用 DDL 來操做 Catalog 的內容,可使用 TableEnvironment 的 sqlUpdate() 方法執行 DDL 語句,也能夠在 SQL Client 執行 DDL 語句。
sqlUpdate() 方法中,支持 Create Table、Create View、Drop Table、Drop View 四個命令。固然,inset into 這樣的語句也是支持的。
下面分別對 4 個命令進行說明:
須要注意的是,目前 DDL 中,還不支持計算列和 Watermark 的定義,後續的版本中將會繼續完善這部分。
Create Table [[catalog_name.]db_name.]table_name( a int comment 'column comment', b bigint, c varchar )comment 'table comment' [partitioned by(b)] With( update-mode='append', connector.type='kafka', ... )
CREATE VIEW view_name AS SELECT xxx
DROP TABLE [IF EXISTS] [[catalog_name.]db_name.]table_name
CREATE VIEW DROP VIEW SHOW CATALOGS/DATABASES/TABLES/FUNCTIONS l USE CATALOG xxx SET xxx=yyy DESCRIBE table_name EXPLAIN SELECT xxx
DDL 部分,在 Flink 1.9 中其實基本已經成型,只是還有一些特性,在將來須要逐漸的完善。
本節將主要從 SQL/Table API 如何轉化爲真正的 Job Graph 的流程開始,讓你們對 Blink Planner 有一個比較清晰的認識,但願對你們閱讀 Blink 代碼,或者使用 Blink 方面有所幫助。而後介紹 Blink Planner 的改進及優化。
圖4 主要流程
從上圖能夠很清楚的看到,解析的過程涉及到了三層:Table API/SQL,Blink Planner,Runtime,下面將對主要的步驟進行講解。
當 SQL 傳輸進來後,首先會去作 SQL 解析,SQL 解析完成以後,會獲得 SqlNode Tree(抽象語法樹),而後會緊接着去作 Validate(驗證),驗證時會去訪問 FunctionManger 和 CatalogManger,FunctionManger 主要是查詢用戶定義的 UDF,以及檢查 UDF 是否合法,CatalogManger 主要是檢查這個 Table 或者 Database 是否存在,若是驗證都經過,就會生成一個 Operation DAG(有向無環圖)。
從這一步能夠看出,Table API 和 SQL 在 Flink 中最終都會轉化爲統一的結構,即 Operation DAG。
優化:優化器會對 RelNode 作各類優化,優化器的輸入是各類優化的規則,以及各類統計信息。當前,在 Blink Planner 裏面,絕大部分的優化規則,Stream 和 Batch 是共享的。差別在於,對 Batch 而言,它沒有 state 的概念,而對於 Stream 而言,它是不支持 sort 的,因此目前 Blink Planner 中,仍是運行了兩套獨立的規則集(Rule Set),而後定義了兩套獨立的 Physical Rel:BatchPhysical Rel 和 StreamPhysical Rel。優化器優化的結果,就是具體的 Physical Rel DAG。
Blink Planner 功能方面改進主要包含以下幾個方面:
性能方面,主要包括如下部分:
本節中,將使用具體的示例進行講解,讓你深刻理解 Blink Planner 性能優化的設計。
create view MyView as select word, count(1) as freq from SourceTable group by word; insert into SinkTable1 select * from MyView where freq >10; insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;
上面的這幾個 SQL,轉化爲 RelNode DAG,大體圖形以下:
圖5 示例5 RelNode DAG
若是是使用 Flink Planner,通過優化層後,會生成以下執行層的 DAG:
圖6 示例 5 old planner DAG
能夠看到,old planner 只是簡單的從 Sink 出發,反向的遍歷到 Source,從而造成兩個獨立的執行鏈路,從上圖也能夠清楚的看到,Scan 和第一層 Aggregate 是有重複計算的。
在 Blink Planner 中,通過優化層以後,會生成以下執行層的 DAG:
圖7 示例 5 Blink Planner DAG
Blink Planner 不是在每次調用 insert into 的時候就開始優化,而是先將全部的 insert into 操做緩存起來,等到執行前才進行優化,這樣就能夠看到完整的執行圖,能夠知道哪些部分是重複計算的。Blink Planner 經過尋找能夠優化的最大公共子圖,找到這些重複計算的部分。通過優化後,Blink Planner 會將最大公共子圖的部分當作一個臨時表,供其餘部分直接使用。
這樣,上面的圖能夠分爲三部分,最大公共子圖部分(臨時表),臨時表與 Filter 和 SinkTable1 優化,臨時表與第二個 Aggregate 和 SinkTable 2 優化。
Blink Planner 實際上是經過聲明的 View 找到最大公共子圖的,所以在開發過程當中,若是須要複用某段邏輯,就將其定義爲 View,這樣就能夠充分利用 Blink Planner 的分段優化功能,減小重複計算。
固然,當前的優化也不是最完美的,由於提早對圖進行了切割,可能會致使一些優化丟失,從此會持續地對這部分算法進行改進。
總結一下,Blink Planner 的分段優化,其實解的是多 Sink 優化問題(DAG 優化),單 Sink 不是分段優化關心的問題,單 Sink 能夠在全部節點上優化,不須要分段。
insert into SinkTabl select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%' union all select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;
這個示例的 SQL 和分段優化的 SQL 實際上是相似的,不一樣的是,沒有將結果 Sink 到兩個 Table 裏面,而是將結果 Union 起來,Sink 到一個結果表裏面。
下面看一下轉化爲 RelNode 的 DAG 圖:
圖 8 示例 6 RelNode DAG
從上圖能夠看出,Scan 和第一層的 Aggregate 也是有重複計算的,Blink Planner 其實也會將其找出來,變成下面的圖:
圖9 示例 6 Blink Planner DAG
Sub-Plan 優化的啓用,有兩個相關的配置:
這兩個配置,默認都是開啓的,用戶能夠根據本身的需求進行關閉。這裏主要說明一下 table.optimizer.reuse-source-enabled 這個參數。在 Batch 模式下,join 操做可能會致使死鎖,具體場景是在執行 hash-join 或者 nested-loop-join 時必定是先讀 build 端,而後再讀 probe 端,若是啓用 reuse-source-enabled,當數據源是同一個 Source 的時候,Source 的數據會同時發送給 build 和 probe 端。這時候,build 端的數據將不會被消費,致使 join 操做沒法完成,整個 join 就被卡住了。
爲了解決死鎖問題,Blink Planner 會先將 probe 端的數據落盤,這樣 build 端讀數據的操做纔會正常,等 build 端的數據所有讀完以後,再從磁盤中拉取 probe 端的數據,從而解決死鎖問題。可是,落盤會有額外的開銷,會多一次寫的操做;有時候,讀兩次 Source 的開銷,可能比一次寫的操做更快,這時候,能夠關閉 reuse-source,性能會更好。
固然,若是讀兩次 Source 的開銷,遠大於一次落盤的開銷,能夠保持 reuse-source 開啓。須要說明的是,Stream 模式是不存在死鎖問題的,由於 Stream 模式 join 不會有選邊的問題。
總結而言,sub-plan reuse 解的問題是優化結果的子圖複用問題,它和分段優化相似,但他們是一個互補的過程。
注:Hash Join:對於兩張待 join 的表 t1, t2。選取其中的一張表按照 join 條件給的列創建hash 表。而後掃描另一張表,一行一行去建好的 hash 表判斷是否有對應相等的行來完成 join 操做,這個操做稱之爲 probe (探測)。前一張表叫作 build 表,後一張表的叫作 probe 表。
Blink 中的 Aggregate 操做是很是豐富的:
下面主要對 Group Agg 優化進行講解,主要是兩類優化。
Local/Global Agg 主要是爲了減小網絡 Shuffle。要運用 Local/Global 的優化,必要條件以下:
select count(*) from t group by color
沒有優化的狀況下,下面的這個 Aggregate 會產生 10 次的 Shuffle 操做。
圖 10 示例 7 未作優化的 Count 操做
使用 Local/Global 優化後,會轉化爲下面的操做,會在本地先進行聚合,而後再進行 Shuffle 操做,整個 Shuffle 的數據剩下 6 條。在 Stream 模式下,Blink 其實會以 mini-batch 的維度對結果進行預聚合,而後將結果發送給 Global Agg 進行彙總。
圖 11 示例 7 通過 Local/Global 優化的 Count 操做
Distinct Agg 進行優化,主要是對 SQL 語句進行改寫,達到優化的目的。但 Batch 模式和 Stream 模式解決的問題是不一樣的:
第一層,求 distinct 的值和非 distinct agg function 的值,第二層求 distinct agg function 的值。
select color, count(distinct id), count(*) from t group by color
手工改寫成:
select color, count(id), min(cnt) from ( select color, id, count(*) filter (where $e=2) as cnt from ( select color, id, 1 as $e from t --for distinct id union all select color, null as id, 2 as $e from t -- for count(*) ) group by color, id, $e ) group by color
轉化的邏輯過程,以下圖所示:
圖 12 示例 8 Batch 模式 Distinct 改寫邏輯
Stream 模式的啓用有一些必要條件:
select color, count(distinct id), count(*) from t group by color
手工改寫成:
select color, sum(dcnt), sum(cnt) from ( select color, count(distinct id) as dcnt, count(*) as cnt from t group by color, mod(hash_code(id), 1024) ) group by color
改寫前,邏輯圖大概以下:
圖 13 示例 9 Stream 模式未優化 Distinct
改寫後,邏輯圖就會變爲下面這樣,熱點數據被打散到多箇中間節點上。
圖14 示例 9 Stream 模式優化 Distinct
須要注意的是,示例 5 的 SQL 中 mod(hash_code(id),1024)中的這個 1024 爲打散的維度,這個值建議設置大一些,設置過小產生的效果可能很差。
本文首先對新的 TableEnvironment 的總體設計進行了介紹,而且列舉了各類模式下TableEnvironment 的選擇,而後經過具體的示例,展現了各類模式下代碼的寫法,以及須要注意的事項。
在新的 Catalog 和 DDL 部分,對 Catalog 的總體設計、DDL 的使用部分也都以實例進行拆分講解。最後,對 Blink Planner 解析 SQL/Table API 的流程、Blink Planner 的改進以及優化的原理進行了講解,但願對你們探索和使用 Flink SQL 有所幫助。