提效 7 倍,Apache Spark 自適應查詢優化在網易的深度實踐及改進

本文基於 Apahce Spark 3.1.1 版本,講述 AQE 自適應查詢優化的原理,以及網易數帆在 AQE 實踐中遇到的痛點和作出的思考。html

前言

自適應查詢優化(Adaptive Query Execution, AQE) 是 Spark 3.0 版本引入的重大特性之一,能夠在運行時動態的優化用戶的 SQL 執行計劃,很大程度上提升了 Spark 做業的性能和穩定性。AQE 包含動態分區合併、Join 數據傾斜自動優化、動態 Join 策略選擇等多個子特性,這些特性可讓用戶省去不少須要根據做業負載逐個手動調優,甚至修改業務邏輯的痛苦過程,極大的提高了 Spark 自身的易用性和靈活性。git

做爲網易大數據基礎軟件的締造者,網易數帆旗下網易有數團隊自 AQE 誕生起就關注其應用。第一個應用 AQE 的系統是 Kyuubi。 Kyuubi 是網易開源的一款企業級數據湖探索平臺,它基於 Spark SQL 實現了多租戶 SQL on Hadoop 查詢引擎。在網易內部,基於 Kyuubi 的 C/S 架構,在保證 SQL 兼容性的前提下,服務端能夠平滑地實現 Spark 版本升級,將社區和內部的最新優化和加強快速賦能用戶。從 Spark 3.0.2 開始,網易有數就在生產環境中逐步試用和推廣 AQE 的特性。而在 Spark 3.1.1 發佈後,AQE 在 Kyuubi 生產環境中已是用戶默認的執行方式。在這個過程當中,咱們還端到端地幫助某個業務遷移了 1500+ Hive 歷史任務到 Spark 3.1.1 上,不只實現了資源量減半,更將總執行時間縮短了 70%以上,綜合來看執行性能提高 7 倍多。github

固然,AQE 做爲一個「新」特性,在實踐過程當中咱們也發現它在不少方面不盡如人意,還有很大的優化空間。秉着堅持開源策略,網易有數努力將團隊遇到的問題和 Spark 社區分享,將咱們的優化努力合進社區。如下章節,咱們將展開介紹這半年多來 AQE 特性在網易的實踐經驗和優化改進。算法

AQE 的設計思路

首先明確一個核心概念,AQE 的設計和優化徹底圍繞着 shuffle,也就是說若是執行計劃裏不包含 shuffle,那麼 AQE 是無效的。常見的可能產生 shuffle 的算子好比 Aggregate(group by), Join, Repartition。sql

不一樣於傳統以整個執行計劃爲粒度進行調度的方式,AQE 會把執行計劃基於 shuffle 劃分紅若干個子計劃,每一個子計劃用一個新的葉子節點包裹起來,從而使得執行計劃的調度粒度細化到 stage 級別 (stage 也是基於 shuffle 劃分)。這樣拆解後,AQE 就能夠在某個子執行計劃完成後獲取到其 shuffle 的統計數據,並基於這些統計數據再對下一個子計劃動態優化。apache

圖片來自 databricks博客架構

有了這個調度流程以後,AQE 纔可能有接下來的優化策略,從宏觀上來看 AQE 優化執行計劃的策略有兩種:一是動態修改執行計劃;二是動態生成 shuffle reader。併發

動態修改執行計劃

動態修改執行計劃包括兩個部分:對其邏輯計劃從新優化,以及生成新的物理執行計劃。咱們知道通常的 SQL 執行流程是,邏輯執行計劃 -> 物理執行計劃,而 AQE 的執行邏輯是,子物理執行計劃 -> 父邏輯執行計劃 -> 父物理執行計劃,這樣的執行流程提供了更多優化的空間。好比在對 Join 算子選擇執行方式的時候可能有原來的 Sort Merge Join 優化爲 Broadcast Hash Join。執行計劃層面看起來是這樣:ide

 

動態生成 Shuffle Reader

先明確一個簡單的概念 map 負責寫 shuffle 數據,reduce 負責讀取 shuffle 數據。而 shuffle reader 能夠理解爲在 reduce 裏負責拉 shuffle 數據的工具。標準的 shuffle reader 會根據預設定的分區數量 (也就是咱們常常改的 spark.sql.shuffle.partitions),在每一個 reduce 內拉取分配給它的 shuffle 數據。而動態生成的 shuffle reader 會根據運行時的 shuffle 統計數據來決定 reduce 的數量。下面舉兩個例子,分區合併和 Join 動態優化。工具

  • 分區合併是一個通用的優化,其思路是將多個讀取 shuffle 數據量少的 reduce 合併到 1 個 reduce。假若有一個極端狀況,shuffle 的數據量只有幾十 KB,可是分區數聲明瞭幾千,那麼這個任務就會極大的浪費調度資源。在這個背景下,AQE 在跑完 map 後,會感知到這個狀況,而後動態的合併 reduce 的數量,而在這個 case 下 reduce 的數量就會合併爲 1。這樣優化後能夠極大的節省 reduce 數量,並提升 reduce 吞吐量。

  • Join 傾斜優化相對於分區合併,Join 傾斜優化則只專一於 Join 的場景。若是咱們 Join 的某個 key 存在傾斜,那麼對應到 Spark 中就會出現某個 reduce 的分區出現傾斜。在這個背景下,AQE 在跑完 map 後,會預統計每一個 reduce 讀取到的 shuffle 數據量,而後把數據量大的 reduce 分區作切割,也就是把本來由 1 個 reduce 讀取的 shuffle 數據改成 n 個 reduce 讀取。這樣處理後就保證了每一個 reduce 處理的數據量是一致的,從而解決數據傾斜問題。

AQE 優化規則實現都是很是巧妙的,其餘更多優化細節就不展開了,推薦閱讀 Kyuubi與AQE

社區原生 AQE 的問題

看起來 AQE 已是萬能的,咱們常常遇到的問題點都被覆蓋到了,那麼實際用起來的時候真的有這麼絲滑嗎?這裏列舉一些網易在使用 AQE 過程當中遇到的痛點。

覆蓋場景不足

就拿 Join 傾斜優化來講,這真的是一個很是棒的 idea,什麼都很好可是有一個缺陷:覆蓋的場景有限。在網易的深度實踐過程當中,常常會遇到一些 Join 明明就是肉眼可見的傾斜,但卻沒有被優化成想象中的樣子。這種狀況對用戶來講會帶來極大的困擾,在成百上千行的 SQL 裏,哪些 Join 能被優化,哪些不能被優化?要花費很大一部分時間來去校驗確認。

廣播 Join 不可逆

廣播配置 spark.sql.autoBroadcastJoinThreshold 是咱們最常修改的配置之一,其優點是能夠把 Join 用廣播的形式實現,避免了數據 shuffle。可是廣播有個很嚴重的問題:斷定一張表是否能夠被廣播是基於靜態的統計數據,特別是在通過一系列的過濾操做後,再完美的代價估計都是不精確的。由這個問題引起的任務失敗報錯就很常見了,Driver 端的 OOM,廣播超時等。而 AQE 中的廣播是不可逆的,也就是說若是一個 Join 在進入 AQE 優化前已經被選定爲廣播 Join,那麼 AQE 沒法再將其轉換爲其餘 Join (好比 Sort Merge Join)。這對於一些因爲錯誤估計大小而致使被廣播的表是致命的。也是咱們遇到影響任務穩定性的一大因素。

配置不夠靈活

雖然 AQE 真的很好用,可是配置仍是不夠靈活。好比 stage 級別的配置隔離,咱們知道 AQE 是基於 stage 的調度,那麼更進一步的,SQL 的配置也能夠是 stage 級別的,這樣能夠最細粒度的優化每一次 shuffle。聽起來可能有點過猶不及的感受,可是最容易遇到的一個需求就是單獨設置最後一個 stage 的配置。最後一個 stage 是不同凡響的,它表明着寫操做,也就是說它決定了最終產生文件的數量。因此矛盾和痛點就這樣出現了,最後一個 stage 考慮的是存儲,是文件數,而過程當中的 stage 考慮的是計算性能,是併發。

網易數帆在 AQE 上的改進

網易是 AQE 這個特性的重度使用者,固然不該該放着這些痛點無論,基於社區版本的分支下咱們作了一系列的優化和加強,而且已經把其中的一部份內容 push 到了社區。在開源這個話題上,網易秉持着開放的理念

回合社區補丁

Spark 的發佈週期沒有那麼頻繁,就算小版本迭代通常也要小半年,那麼咱們不可能隻眼睜睜看着一系列的 bug 存在於舊分支。所以網易在 Spark 分支管理上的策略是:本身維護小版本,及時跟進大版本 (小版本多是從 3.0.1 到 3.0.2,大版本則是從 3.0 到 3.1)。在這個策略下,咱們能夠及時回合社區新發現的問題。好比 AQE 相關的補丁 SPARK-33933,這個補丁的做用是在執行子物理計劃的時候優先執行廣播其次 shuffle,從而減少在調度資源不足狀況下廣播超時的可能性。社區的這個補丁須要到 3.2.0 分支才能發佈,可是出於穩定性的考慮,網易內部把它回合到了 3.1.1 分支。

回饋社區

提升廣播 Join 的穩定性

爲了解決靜態估計執行計劃的統計數據不許確以及廣播在 AQE 中不可逆的問題,咱們支持了在 AQE 本身的廣播配置 SPARK-35264。這個方案的思路是增長一個新的廣播配置 spark.sql.adaptive.autoBroadcastJoinThreshold 和已有的廣播配置隔離,再基於 AQE 運行時的統計數據來判斷是否能夠用廣播來完成 Join,保證被廣播表的數據量是可信的。在這個條件下,咱們能夠禁用基於靜態估計的廣播 Join,只開啓 AQE 的廣播,這樣咱們就能夠在享受廣播 Join 性能的同時兼顧穩定性。

增長 Join 傾斜優化覆蓋維度

咱們對 Join 傾斜優化作了不少加強,這個 case 是其中之一。在描述內容以前,咱們先簡單介紹一個 SHJ 和 SMJ (Shuffled Hash Join 簡稱爲 SHJ,Sort Merge Join 簡稱 SMJ)。SMJ 的實現原理是經過先把相同 key shuffle 到同一 reduce,而後作分區內部排序,最後完成 Join。而 SHJ 相對於 SMJ 有着優秀的時間複雜度,經過構建一個 hash map 作數據配對,節省了排序的時間,但缺點也一樣明顯,容易 OOM。

一直以來 SHJ 是一個很容易被遺忘的 Join 實現,這是由於默認配置 spark.sql.preferSortMerge 的存在,並且社區版本里觸發 SHJ 的條件真的很苛刻。但自從 Spark 3.0 全面地支持了全部類型的 Join Hint SPARK-27225,SHJ 又逐漸進入了咱們的視野。回到正題,社區版本的 AQE 目前只對 SMJ 作了傾斜優化,這對於顯式聲明瞭 Join Hint 爲 SHJ 的任務來講很不友好。在這個背景下,咱們增長了 AQE 對 SHJ 傾斜優化的支持 SPARK-35214,使得 Join 傾斜優化在覆蓋維度上獲得了提高。

一些瑣碎的訂正

因爲 Spark 在網易內部的使用場景是很是多的,包括但不限於數倉,ETL,Add hoc,所以咱們須要最大程度減小負面的和誤導用戶的 case。

  • SPARK-35239,這個 issue 能夠描述爲當輸入的 RDD 分區是空的時候沒法對其 shuffle 的分區合併。看起來影響並不大,若是是空表的話那麼就算空跑一些任務也是很是快的。可是在 Add hoc 場景下,默認的 spark.sql.shuffle.partitions 配置調整很大,這就會形成嚴重的 task 資源浪費,而且加劇 Driver 的負擔

  • SPARK-34899,當咱們發現某些 shuffle 分區在被 AQE 的分區合併規則成功優化後,分區數竟然沒有降低,一度懷疑是沒有找到正確使用 AQE 的姿式

  • SPARK-35168,一些 Hive 轉過來的同窗可能會遇到的 issue,理論上 MapReduce 中 reduce 的數量等價於 Spark 的 shuffle 分區數,因此 Spark 作了一些配置映射。可是在映射中出現了 bug 這確定是不能容忍的。

內部優化(已開源)

除了和社區保持交流以外,網易數帆也作了許多基於 AQE 的優化,這些優化都在咱們的開源項目 Kyuubi 裏。

支持複雜場景下 Join 傾斜優化

社區版本對 AQE 的優化比較謹慎,只對標準的 Sort Merge Join 作了傾斜優化,也就是每一個 Join 下的子算子必須包含 Sort 和 Shuffle,這個策略極大的限制了 Join 傾斜優化的覆蓋率。舉例來講,有一個執行計劃先 Aggregate 再 Join,而且這兩個算子之間沒有出現 shuffle。咱們能夠猜到,在沒有 AQE 的介入下,Aggregate 和 Join 之間的 shuffle 被剪枝了,這是一種常見的優化策略,通常是因爲 Aggregate 的 key 和 Join 的 key 存在重複引發的。可是因爲沒有擊中規則,AQE 沒法優化這個場景的 Join。有一些能夠繞過去的方法,好比手動在 Aggregate 和 Join 之間插入一個 shuffle,獲得的執行計劃長這樣子:

咱們在這種思路下,以增長規則的方式能夠在不入侵 AQE 代碼的前提下,自動增長 shuffle 來知足 Join 傾斜優化的觸發條件。選擇這樣處理的理由有 3 個

  • 增長 shuffle 能夠帶來另外一個優秀的反作用,就是支持多表 Join 場景下的優化,能夠說是一箭雙鵰

  • 不用魔改 AQE 的代碼,能夠獨立於咱們內部的 Spark 分支快速迭代

  • 固然這不是最終的解決方案,和社區的交流還在繼續

小文件合併以及 stage 級別的配置隔離

Spark 的小文件問題已經存在不少年了,解決方案也有不少。而 AQE 的出現看起來能夠自然的解決小文件問題,所以網易內部基於 AQE 的分區合併優化規則,對每一個涉及寫操做的 SQL,在其執行計劃的頂端動態插入一個 shuffle 節點,從執行計劃的角度看起來是這樣的:

 

再結合能夠控制每一個分區大小的相關配置,看起來一切都是這麼美好。但問題仍是來了,其中有兩個最明顯的問題:

  • 簡單添加一個 shuffle 節點沒法知足動態分區寫的場景

假設咱們最終產生 1k 個分區,動態插入的分區值的數量也是 1k,那麼最終會產生的文件數是 1k x 1k = 1m。這確定是不能被接受的,所以咱們須要對動態分區字段作重分區,讓包含相同分區值的數據落在同一個分區內,這樣 1k 個分區生成的文件數最多也是 1k。可是這樣處理後還有有一個潛在的風險點,不一樣分區值的分佈是不均勻的,也就是說可能出現數據傾斜問題。對於這樣狀況,咱們又額外增長了與業務無關的重分區字段,並經過配置的方式幫助用戶快速應對不一樣的業務場景。

  • 單分區處理的數據量過大致使性能瓶頸

成也蕭何,敗也蕭何。把 spark.sql.adaptive.advisoryPartitionSizeInBytes 調大後小文件的問題是解決了,可是過程當中每一個分區處理的數據量也隨之增長,這致使過程當中的併發度沒法達到預期的要求。所以 stage 級別的配置隔離出現了。咱們直接把整個 SQL 配置劃分爲兩部分,最後一個 stage 以及以前的 stage,而後把這兩個部分之間的配置作了隔離。拿上面這個配置來講,在最後一個 stage 的樣子是 spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes。在配置隔離的幫助下,咱們能夠完美解決小文件和計算性能不能兼得的問題,用戶能夠更加優雅地使用 AQE。

案例分享

多表 Join 傾斜

下面這兩張圖爲 3 表 Join 的執行計劃,因爲長度的限制咱們只截取到 Join 相關的片斷,而且沒有被優化的任務因爲數據傾斜問題沒有執行成功。能夠明顯看到社區版本沒法對這類多表 Join 作傾斜優化,而咱們在動態插入 shuffle 以後,兩次 Join 都成功的被優化。在這個特性的幫助下,Join 傾斜優化的覆蓋場景相對於社區有明顯提高。

社區版本

 

內部版本

Stage 配置隔離

在支持了 stage 級別的配置隔離後,咱們單獨設置了最後一個 stage 的參數,下面兩張圖是某個線上任務先後兩天的執行狀況,能夠明顯看到在配置隔離後,在保證最終產出的文件數一致的狀況下,過程當中 stage 的併發度獲得了提高,從而使任務性能獲得提高。

配置隔離前

 

配置隔離後

任務性能對比

這張圖展現了咱們部分遷移任務的資源成本以及性能對比,其中藍線是遷移前的數據,紅線是遷移後的數據。能夠很是明顯看到,在資源成本大幅降低的同時任務性能有不一樣程度的提高。

總結與展望

首先得感謝一下 Apache Spark 社區,在引入了 AQE 以後,咱們的線上任務獲得了不一樣程度的性能提高,也使得咱們在遇到問題的時候能夠有更多解決問題的思路。在深度實踐的過程當中,咱們也發現了一些能夠優化的點:

  • 在優化細節上的角度,能夠增長命中 AQE 優化的 case,好比 Join 傾斜優化加強,讓用戶不用逐個檢查不能被優化的執行計劃

  • 在業務使用上的角度,能夠同時支持 ETL,Add hoc 等側重點不同的場景,好比 stage 配置隔離這個特性,讓關注寫和讀的業務都有良好的體驗

在完成這個階段性的優化後,接下來咱們會繼續深耕在 AQE 的覆蓋場景上,好比支持 Union 算子的細粒度優化,加強 AQE 的代價估計算法等。除此以外,還有一些潛在的性能迴歸問題也是值得咱們注意的,好比在作分區合併優化後會放大某些高時間複雜度算子的性能瓶頸。

做爲多是最快在線上使用 Apache Spark 3.1.1 的用戶,網易在享受社區技術福利的同時也在反哺社區。這也是網易對技術的思考和理念:

  • 由於開放,咱們擁抱開源,深刻社區

  • 由於熱愛,咱們快速接收新的理論,實踐新的技術

做者簡介:尤夕多,目前就任於網易數帆-有數事業部,專一於開源大數據領域。網易數帆開源項目 Kyuubi Committer / Apache Spark Contributor。

 

相關閱讀:

大數據實戰:Kyuubi 與 Spark ThriftServer 的全面對比分析

網易數帆開源Kyuubi:基於Spark的高性能JDBC和SQL執行引擎

擁抱開源,咱們是認真的-網易易數2020年Apache Spark貢獻總結

相關文章
相關標籤/搜索