簡介: 在 Flink 1.12 中,針對目前 operator chaining 沒法覆蓋的場景,推出了 multiple input operator 與 source chaining 優化。該優化將消除 Flink 做業中大多數冗餘 shuffle,進一步提升做業的執行效率。本文將以一個 SQL 做業爲例介紹上述優化,並展現 Flink 1.12 在 TPC-DS 測試集上取得的成果。
執行效率的優化一直是 Flink 追尋的目標。在大多數做業,特別是批做業中,數據經過網絡在 task 之間傳遞(稱爲數據 shuffle)的代價較大。正常狀況下一條數據通過網絡須要通過序列化、磁盤讀寫、socket 讀寫與反序列化等艱難險阻,才能從上游 task 傳輸到下游;而相同數據在內存中的傳輸,僅須要耗費幾個 CPU 週期傳輸一個八字節指針便可。算法
Flink 在早期版本中已經經過 operator chaining 機制,將併發相同的相鄰單輸入算子整合進同一個 task 中,消除了單輸入算子之間沒必要要的網絡傳輸。然而,join 等多輸入算子之間一樣存在額外的數據 shuffle 問題,shuffle 數據量最大的 source 節點與多輸入算子之間的數據傳輸也沒法利用 operator chaining 機制進行優化。api
在 Flink 1.12 中,咱們針對目前 operator chaining 沒法覆蓋的場景,推出了 multiple input operator 與 source chaining 優化。該優化將消除 Flink 做業中大多數冗餘 shuffle,進一步提升做業的執行效率。本文將以一個 SQL 做業爲例介紹上述優化,並展現 Flink 1.12 在 TPC-DS 測試集上取得的成果。網絡
咱們將以 TPC-DS q96 爲例子詳細介紹如何消除冗餘 shuffle,該 SQL 意在經過多路 join 篩選並統計符合特定條件的訂單量。併發
select count(*) from store_sales ,household_demographics ,time_dim, store where ss_sold_time_sk = time_dim.t_time_sk and ss_hdemo_sk = household_demographics.hd_demo_sk and ss_store_sk = s_store_sk and time_dim.t_hour = 8 and time_dim.t_minute >= 30 and household_demographics.hd_dep_count = 5 and store.s_store_name = 'ese'
圖 1 - 初始執行計劃框架
因爲部分算子對輸入數據的分佈有要求(如 hash join 算子要求同一併發內數據 join key 的 hash 值相同),數據在算子之間傳遞時可能須要通過從新排布與整理。與 map-reduce 的 shuffle 過程相似,Flink shuffle 將上游 task 產生的中間結果進行整理,並按需發送給須要這些中間結果的下游 task。但在一部分狀況下,上游產出的數據已經知足了數據分佈要求(如連續多個 join key 相同的 hash join 算子),此時對數據的整理便再也不必要,由此產生的 shuffle 也就成爲了冗餘 shuffle,在執行計劃中以 forward shuffle 表示。socket
圖 1 中的 hash join 算子是一種稱爲 broadcast hash join 的特殊算子。以 store_sales join time_dim 爲例,因爲 time_dim 表數據量很小,此時經過 broadcast shuffle 將該表的全量數據發送給 hash join 的每一個併發,就能讓任何併發接受 store_sales 表的任意數據而不影響 join 結果的正確性,同時提升 hash join 的執行效率。此時 store_sales 表向 join 算子的網絡傳輸也成爲了冗餘 shuffle。同理幾個 join 之間的 shuffle 也是沒必要要的。oop
圖 2 - 冗餘的shuffle(紅框標記)性能
除 hash join 與 broadcast hash join 外,產生冗餘 shuffle 的場景還有不少,例如 group key 與 join key 相同的 hash aggregate + hash join、group key 具備包含關係的多個 hash aggregate 等等,這裏再也不展開描述。測試
對 Flink 優化過程有必定了解的讀者可能會知道,爲了消除沒必要要的 forward shuffle,Flink 在早期就已經引入了 operator chaining 機制。該機制將併發相同的相鄰單輸入算子整合進同一個 task 中,並在同一個線程中一塊兒運算。Operator chaining 機制在圖 1 中其實已經在發揮做用,若是沒有它,作 broadcast shuffle 的三個 Source 節點名稱中被「->」分隔的算子將會被拆分至多個不一樣的 task,產生冗餘的數據 shuffle。圖 3 爲 Operator chaining 關閉是的執行計劃。優化
圖 3 - Operator chaining關閉後的執行計劃
減小數據在 TM 之間經過網絡和文件傳輸並將算子連接合併入 task 是很是有效的優化:它能減小線程之間的切換,減小消息的序列化與反序列化,減小數據在緩衝區的交換,並減小延遲的同時提升總體吞吐量。然而,operator chaining 對算子的整合有很是嚴格的限制,其中一條就是「下游算子的入度爲 1」,也就是說下游算子只能有一路輸入。這就將多路輸入的算子(如 join)排除在外。
若是咱們能仿照 operator chaining 的優化思路,引入新的優化機制並知足如下條件:
咱們就能夠將用 forward shuffle 鏈接的的多輸入算子放到一個 task 裏執行,從而消除沒必要要的 shuffle。Flink 社區很早就關注到了 operator chaining 的不足,在 Flink 1.11 中引入了 streaming api 層的
MultipleInputTransformation 以及對應的 MultipleInputStreamTask。這些 api 知足了上述條件 2,而 Flink 1.12 在此基礎上在 SQL 層中實現了知足條件 1 的新算子——multiple input operator,能夠參考 FLIP 文檔[1]。
Multiple input operator 是 table 層一個可插拔的優化。它位於 table 層優化的最後一步,遍歷生成的執行計劃並將不被 exchange 阻隔的相鄰算子整合進一個 multiple input operator 中。圖 4 展現了該優化對本來 SQL 優化步驟的修改。
圖 4 - 加入 multiple input operator 後的優化步驟
讀者可能會有疑問:爲何不在現有的 operator chaining 上進行修改,而要另起爐竈呢?實際上,multiple input operator 除了要完成 operator chaining 的工做以外,還須要對各個輸入的優先級進行排序。這是由於一部分多輸入算子(如 hash join 與 nested loop join)對輸入有嚴格的順序限制,若輸入優先級排序不當極可能形成死鎖。因爲算子輸入優先級的信息僅在 table 層的算子中有描述,更加天然的方式是在 table 層引入該優化機制。
值得注意的是,multiple input operator 不一樣於管理多個 operator 的 operator chaining,其自己就是一整個大 operator,而其內部運算在外界看來就是一個黑盒。Multiple input operator 的內部結構在 operator name 中徹底體現,讀者在運行包含該 operator 的做業時,能夠從 operator name 看到哪些算子以怎樣的拓撲結構被組合進了 multiple input operator 中。
圖 5 展現了通過 multiple input 優化後的算子的拓撲圖以及 multiple input operator 的透視圖。圖中三個 hash join 算子之間的冗餘的 shuffle 被移除後,它們能夠在一個 task 裏執行,只不過 operator chaining 無法處理這種多輸入的狀況,將它們放到 multiple input operator 裏執行,由 multiple input operator 管理各個算子的輸入順序和算子之間的調用關係。
圖 5 - 通過 multiple input 優化後的算子拓撲圖
Multiple input operator 的構建和運行過程較爲複雜,對此細節有興趣的讀者能夠參考設計文檔[2]。
通過 multiple input operator 的優化,咱們將圖 1 中的執行計劃優化爲圖 6,圖 3 通過 operator chaining 優化後就變爲圖 6 的執行圖。
圖 6 - 通過 multiple input operator 優化後的執行計劃
圖 6 中從 store_sales 表產生的 forward shuffle(如紅框所示)表示咱們仍有優化空間。正如序言中所說,在大部分做業中,從 source 直接產生的數據因爲沒有通過 join 等算子的篩選和加工,shuffle 的數據量是最大的。以 10T 數據下的 TPC-DS q96 爲例,若是不進行進一步優化,包含 store_sales 源表的 task 將向網絡中傳輸 1.03T 的數據,而通過一次 join 的篩選後,數據量急速降低至 16.5G。若是咱們能將源表的 forward shuffle 省去,做業總體執行效率又能前進一大步。
惋惜的是,multiple input operator 也不能覆蓋 source shuffle 的場景,這是由於 source 不一樣於其它任何算子,它沒有任何輸入。Flink 1.12 爲此給 operator chaining 新增了 source chaining 功能,將不被 shuffle 阻隔的 source 合併到 operator chaining 中,省去了 source 與下游算子之間的 forward shuffle。
目前僅有 FLIP-27 source 以及 multiple input operator 能夠利用 source chaining 功能,不過這已經足夠解決本文中的優化場景。
結合 multiple input operator 與 source chaining 以後,圖 7 展現了本文優化案例的最終執行方案。
圖 7 - 優化後的執行方案
Multiple input operator 與 source chaining 對大部分做業,特別是批做業有顯著的優化效果。咱們利用 TPC-DS 測試集對 Flink 1.12 的總體性能進行了測試,與 Flink 1.10 公佈的 12267s 總用時相比,Flink 1.12 的總用時僅爲 8708s,縮短了近 30% 的運行時間!
圖 8 - TPC-DS 測試集總用時對比
圖 9 - TPC-DS 部分測試點用時對比
經過 TPC-DS 的測試效果看到,source chaining + multiple input 可以給咱們帶來很大的性能提高。目前總體框架已完成,經常使用批算子已支持消除冗餘 exchange 的推導邏輯,後續咱們將支持更多的批算子和更精細的推導算法。
流做業的數據 shuffle 雖然不須要像批做業同樣將數據寫入磁盤,但將網絡傳輸變爲內存傳輸帶來的性能提高也是很是可觀的,所以流做業支持 source chaining + multiple input 也是一個很是使人期待的優化。同時,在流做業上支持該優化還須要不少工做,例如流算子上消除冗餘 exchange 的推導邏輯暫未支持,一些算子須要重構以消除輸入數據是 binary 的要求等等,這也是爲何 Flink 1.12 暫未在流做業中推出推出該優化的緣由。後續版本咱們將逐步完成這些工做,也但願更多社區的力量加入咱們一塊兒儘早的將更多的優化落地。
做者:賀小令、翁才智
原文連接本文爲阿里雲原創內容,未經容許不得轉載