Spark SQL在100TB上的自適應執行實踐

Spark SQL是Apache Spark最普遍使用的一個組件,它提供了很是友好的接口來分佈式處理結構化數據,在不少應用領域都有成功的生產實踐,可是在超大規模集羣和數據集上,Spark SQL仍然遇到很多易用性和可擴展性的挑戰。爲了應對這些挑戰,英特爾大數據技術團隊和百度大數據基礎架構部工程師在Spark 社區版本的基礎上,改進並實現了自適應執行引擎。本文首先討論Spark SQL在大規模數據集上遇到的挑戰,而後介紹自適應執行的背景和基本架構,以及自適應執行如何應對Spark SQL這些問題,最後咱們將比較自適應執行和現有的社區版本Spark SQL在100 TB 規模TPC-DS基準測試碰到的挑戰和性能差別,以及自適應執行在Baidu Big SQL平臺的使用狀況。sql

挑戰1:關於shuffle partition數網絡

 

在Spark SQL中, shufflepartition數能夠經過參數spark.sql.shuffle.partition來設置,默認值是200。這個參數決定了SQL做業每一個reduce階段任務數量,對整個查詢性能有很大影響。假設一個查詢運行前申請了E個Executor,每一個Executor包含C個core(併發執行線程數),那麼該做業在運行時能夠並行執行的任務數就等於E x C個,或者說該做業的併發數是E x C。假設shuffle partition個數爲P,除了map stage的任務數和原始數據的文件數量以及大小相關,後續的每一個reduce stage的任務數都是P。因爲Spark做業調度是搶佔式的,E x C個併發任務執行單元會搶佔執行P個任務,「能者多勞」,直至全部任務完成,則進入到下一個Stage。但這個過程當中,若是有任務由於處理數據量過大(例如:數據傾斜致使大量數據被劃分到同一個reducer partition)或者其它緣由形成該任務執行時間過長,一方面會致使整個stage執行時間變長,另外一方面E x C個併發執行單元大部分可能都處於空閒等待狀態,集羣資源總體利用率急劇降低。數據結構

 

那麼spark.sql.shuffle.partition參數到底是多少比較合適?若是設置太小,分配給每個reduce任務處理的數據量就越多,在內存大小有限的狀況下,不得不溢寫(spill)到計算節點本地磁盤上。Spill會致使額外的磁盤讀寫,影響整個SQL查詢的性能,更差的狀況還可能致使嚴重的GC問題甚至是OOM。相反,若是shuffle partition設置過大。第一,每個reduce任務處理的數據量很小而且很快結束,進而致使Spark任務調度負擔變大。第二,每個mapper任務必須把本身的shuffle輸出數據分紅P個hash bucket,即肯定數據屬於哪個reduce partition,當shuffle partition數量太多時,hash bucket裏數據量會很小,在做業併發數很大時,reduce任務shuffle拉取數據會形成必定程度的隨機小數據讀操做,當使用機械硬盤做爲shuffle數據臨時存取的時候性能降低會更加明顯。最後,當最後一個stage保存數據時會寫出P個文件,也可能會形成HDFS文件系統中大量的小文件。多線程

 

從上,shuffle partition的設置既不能過小也不能太大。爲了達到最佳的性能,每每須要經屢次試驗才能肯定某個SQL查詢最佳的shuffle partition值。然而在生產環境中,每每SQL以定時做業的方式處理不一樣時間段的數據,數據量大小可能變化很大,咱們也沒法爲每個SQL查詢去作耗時的人工調優,這也意味這些SQL做業很難以最佳的性能方式運行。架構

 

Shuffle partition的另一個問題是,同一個shuffle partition數設置將應用到全部的stage。Spark在執行一個SQL做業時,會劃分紅多個stage。一般狀況下,每一個stage的數據分佈和大小可能都不太同樣,全局的shuffle partition設置最多隻能對某個或者某些stage最優,沒有辦法作到全局全部的stage設置最優。併發

 

這一系列關於shufflepartition的性能和易用性挑戰,促使咱們思考新的方法:咱們可否根據運行時獲取的shuffle數據量信息,例如數據塊大小,記錄行數等等,自動爲每個stage設置合適的shuffle partition值?app

 

挑戰2:Spark SQL最佳執行計劃框架

 

Spark SQL在執行SQL以前,會將SQL或者Dataset程序解析成邏輯計劃,而後經歷一系列的優化,最後肯定一個可執行的物理計劃。最終選擇的物理計劃的不一樣對性能有很大的影響。如何選擇最佳的執行計劃,這即是Spark SQL的Catalyst優化器的核心工做。Catalyst早期主要是基於規則的優化器(RBO),在Spark 2.2中又加入了基於代價的優化(CBO)。目前執行計劃的肯定是在計劃階段,一旦確認之後便再也不改變。然而在運行期間,當咱們獲取到更多運行時信息時,咱們將有可能獲得一個更佳的執行計劃。分佈式

 

以join操做爲例,在Spark中最多見的策略是BroadcastHashJoin和SortMergeJoin。BroadcastHashJoin屬於map side join,其原理是當其中一張表存儲空間大小小於broadcast閾值時,Spark選擇將這張小表廣播到每個Executor上,而後在map階段,每個mapper讀取大表的一個分片,而且和整張小表進行join,整個過程當中避免了把大表的數據在集羣中進行shuffle。而SortMergeJoin在map階段2張數據表都按相同的分區方式進行shuffle寫,reduce階段每一個reducer將兩張表屬於對應partition的數據拉取到同一個任務中作join。RBO根據數據的大小,儘量把join操做優化成BroadcastHashJoin。Spark中使用參數spark.sql.autoBroadcastJoinThreshold來控制選擇BroadcastHashJoin的閾值,默認是10MB。然而對於複雜的SQL查詢,它可能使用中間結果來做爲join的輸入,在計劃階段,Spark並不能精確地知道join中兩表的大小或者會錯誤地估計它們的大小,以至於錯失了使用BroadcastHashJoin策略來優化join執行的機會。可是在運行時,經過從shuffle寫獲得的信息,咱們能夠動態地選用BroadcastHashJoin。如下是一個例子,join一邊的輸入大小隻有600K,但Spark仍然規劃成SortMergeJoin。ide

 

圖1

 

這促使咱們思考第二個問題:咱們可否經過運行時收集到的信息,來動態地調整執行計劃?

 

挑戰3:數據傾斜

 

數據傾斜是常見的致使Spark SQL性能變差的問題。數據傾斜是指某一個partition的數據量遠遠大於其它partition的數據,致使個別任務的運行時間遠遠大於其它任務,所以拖累了整個SQL的運行時間。在實際SQL做業中,數據傾斜很常見,join key對應的hash bucket老是會出現記錄數不太平均的狀況,在極端狀況下,相同join key對應的記錄數特別多,大量的數據必然被分到同一個partition於是形成數據嚴重傾斜。如圖2,能夠看到大部分任務3秒左右就完成了,而最慢的任務卻花了4分鐘,它處理的數據量倒是其它任務的若干倍。

 

圖2

 

目前,處理join時數據傾斜的一些常見手段有: (1)增長shuffle partition數量,指望本來分在同一個partition中的數據能夠被分散到多個partition中,可是對於同key的數據沒有做用。(2)調大BroadcastHashJoin的閾值,在某些場景下能夠把SortMergeJoin轉化成BroadcastHashJoin而避免shuffle產生的數據傾斜。(3)手動過濾傾斜的key,而且對這些數據加入隨機的前綴,在另外一張表中這些key對應的數據也相應的膨脹處理,而後再作join。綜上,這些手段都有各自的侷限性而且涉及不少的人爲處理。基於此,咱們思考了第三個問題:Spark可否在運行時自動地處理join中的數據傾斜?

 

自適應執行背景和簡介

 

早在2015年,Spark社區就提出了自適應執行的基本想法,在Spark的DAGScheduler中增長了提交單個map stage的接口,而且在實現運行時調整shuffle partition數量上作了嘗試。但目前該實現有必定的侷限性,在某些場景下會引入更多的shuffle,即更多的stage,對於三表在同一個stage中作join等狀況也沒法很好的處理。因此該功能一直處於實驗階段,配置參數也沒有在官方文檔中說起。

 

基於這些社區的工做,英特爾大數據技術團隊對自適應執行作了從新的設計,實現了一個更爲靈活的自適性執行框架。在這個框架下面,咱們能夠添加額外的規則,來實現更多的功能。目前,已實現的特性包括:自動設置shuffle partition數,動態調整執行計劃,動態處理數據傾斜等等。

 

自適應執行架構

 

在Spark SQL中,當Spark肯定最後的物理執行計劃後,根據每個operator對RDD的轉換定義,它會生成一個RDD的DAG圖。以後Spark基於DAG圖靜態劃分stage而且提交執行,因此一旦執行計劃肯定後,在運行階段沒法再更新。自適應執行的基本思路是在執行計劃中事先劃分好stage,而後按stage提交執行,在運行時收集當前stage的shuffle統計信息,以此來優化下一個stage的執行計劃,而後再提交執行後續的stage。

 

圖3

 

從圖3中咱們能夠看出自適應執行的工做方法,首先以Exchange節點做爲分界將執行計劃這棵樹劃分紅多個QueryStage(Exchange節點在Spark SQL中表明shuffle)。每個QueryStage都是一棵獨立的子樹,也是一個獨立的執行單元。在加入QueryStage的同時,咱們也加入一個QueryStageInput的葉子節點,做爲父親QueryStage的輸入。例如對於圖中兩表join的執行計劃來講咱們會建立3個QueryStage。最後一個QueryStage中的執行計劃是join自己,它有2個QueryStageInput表明它的輸入,分別指向2個孩子的QueryStage。在執行QueryStage時,咱們首先提交它的孩子stage,而且收集這些stage運行時的信息。當這些孩子stage運行完畢後,咱們能夠知道它們的大小等信息,以此來判斷QueryStage中的計劃是否能夠優化更新。例如當咱們獲知某一張表的大小是5M,它小於broadcast的閾值時,咱們能夠將SortMergeJoin轉化成BroadcastHashJoin來優化當前的執行計劃。咱們也能夠根據孩子stage產生的shuffle數據量,來動態地調整該stage的reducer個數。在完成一系列的優化處理後,最終咱們爲該QueryStage生成RDD的DAG圖,而且提交給DAG Scheduler來執行。

 

自動設置reducer個數

 

假設咱們設置的shufflepartition個數爲5,在map stage結束以後,咱們知道每個partition的大小分別是70MB,30MB,20MB,10MB和50MB。假設咱們設置每個reducer處理的目標數據量是64MB,那麼在運行時,咱們能夠實際使用3個reducer。第一個reducer處理partition 0 (70MB),第二個reducer處理連續的partition 1 到3,共60MB,第三個reducer處理partition 4 (50MB),如圖4所示。

 

圖4

 

在自適應執行的框架中,由於每一個QueryStage都知道本身全部的孩子stage,所以在調整reducer個數時,能夠考慮到全部的stage輸入。另外,咱們也能夠將記錄條數做爲一個reducer處理的目標值。由於shuffle的數據每每都是通過壓縮的,有時partition的數據量並不大,但解壓後記錄條數確遠遠大於其它partition,形成數據不均。因此同時考慮數據大小和記錄條數能夠更好地決定reducer的個數。

 

動態調整執行計劃

 

目前咱們支持在運行時動態調整join的策略,在知足條件的狀況下,即一張表小於Broadcast閾值,能夠將SortMergeJoin轉化成BroadcastHashJoin。因爲SortMergeJoin和BroadcastHashJoin輸出的partition狀況並不相同,隨意轉換可能在下一個stage引入額外的shuffle操做。所以咱們在動態調整join策略時,遵循一個規則,即在不引入額外shuffle的前提下才進行轉換。

 

將SortMergeJoin轉化成BroadcastHashJoin有哪些好處呢?由於數據已經shuffle寫到磁盤上,咱們仍然須要shuffle讀取這些數據。咱們能夠看看圖5的例子,假設A表和B表join,map階段2張表各有2個map任務,而且shuffle partition個數爲5。若是作SortMergeJoin,在reduce階段須要啓動5個reducer,每一個reducer經過網絡shuffle讀取屬於本身的數據。然而,當咱們在運行時發現B表能夠broadcast,而且將其轉換成BroadcastHashJoin以後,咱們只須要啓動2個reducer,每個reducer讀取一個mapper的整個shuffle output文件。當咱們調度這2個reducer任務時,能夠優先將其調度在運行mapper的Executor上,所以整個shuffle讀變成了本地讀取,沒有數據經過網絡傳輸。而且讀取一個文件這樣的順序讀,相比原先shuffle時隨機的小文件讀,效率也更勝一籌。另外,SortMergeJoin過程當中每每會出現不一樣程度的數據傾斜問題,拖慢總體的運行時間。而轉換成BroadcastHashJoin後,數據量通常比較均勻,也就避免了傾斜,咱們能夠在下文實驗結果中看到更具體的信息。

 

圖5

 

動態處理數據傾斜

 

在自適應執行的框架下,咱們能夠在運行時很容易地檢測出有數據傾斜的partition。當執行某個stage時,咱們收集該stage每一個mapper 的shuffle數據大小和記錄條數。若是某一個partition的數據量或者記錄條數超過中位數的N倍,而且大於某個預先配置的閾值,咱們就認爲這是一個數據傾斜的partition,須要進行特殊的處理。

 

圖6

 

假設咱們A表和B表作inner join,而且A表中第0個partition是一個傾斜的partition。通常狀況下,A表和B表中partition 0的數據都會shuffle到同一個reducer中進行處理,因爲這個reducer須要經過網絡拉取大量的數據而且進行處理,它會成爲一個最慢的任務拖慢總體的性能。在自適應執行框架下,一旦咱們發現A表的partition 0發生傾斜,咱們隨後使用N個任務去處理該partition。每一個任務只讀取若干個mapper的shuffle 輸出文件,而後讀取B表partition 0的數據作join。最後,咱們將N個任務join的結果經過Union操做合併起來。爲了實現這樣的處理,咱們對shuffle read的接口也作了改變,容許它只讀取部分mapper中某一個partition的數據。在這樣的處理中,B表的partition 0會被讀取N次,雖然這增長了必定的額外代價,可是經過N個任務處理傾斜數據帶來的收益仍然大於這樣的代價。若是B表中partition 0也發生傾斜,對於inner join來講咱們也能夠將B表的partition 0分紅若干塊,分別與A表的partition 0進行join,最終union起來。但對於其它的join類型例如Left Semi Join咱們暫時不支持將B表的partition 0拆分。

 

自適應執行和Spark SQL在100TB上的性能比較

 

咱們使用99臺機器搭建了一個集羣,使用Spark2.2在TPC-DS 100TB的數據集進行了實驗,比較原版Spark和自適應執行的性能。如下是集羣的詳細信息:

 

 

 

圖7

 

實驗結果顯示,在自適應執行模式下,103條SQL中有92條都獲得了明顯的性能提高,其中47條SQL的性能提高超過10%,最大的性能提高達到了3.8倍,而且沒有出現性能降低的狀況。另外在原版Spark中,有5條SQL由於OOM等緣由沒法順利運行,在自適應模式下咱們也對這些問題作了優化,使得103條SQL在TPC-DS 100TB數據集上所有成功運行。如下是具體的性能提高比例和性能提高最明顯的幾條SQL。

 

圖8

圖9

 

經過仔細分析了這些性能提高的SQL,咱們能夠看到自適應執行帶來的好處。首先是自動設置reducer個數,原版Spark使用10976做爲shuffle partition數,在自適應執行時,如下SQL的reducer個數自動調整爲1064和1079,能夠明顯看到執行時間上也提高了不少。這正是由於減小了調度的負擔和任務啓動的時間,以及減小了磁盤IO請求。

 

原版Spark:

 

圖10

 

自適應執行:

 

圖11

 

在運行時動態調整執行計劃,將SortMergeJoin轉化成BroadcastHashJoin在某些SQL中也帶來了很大的提高。例如在如下的例子中,本來使用SortMergeJoin由於數據傾斜等問題花費了2.5分鐘。在自適應執行時,由於其中一張表的大小隻有2.5k因此在運行時轉化成了BroadcastHashJoin,執行時間縮短爲10秒。


原版Spark:

 

 

圖12

自適應執行:

 

 

圖13

 

100 TB的挑戰及優化

 

成功運行TPC-DS 100 TB數據集中的全部SQL,對於Apache Spark來講也是一大挑戰。雖然SparkSQL官方表示支持TPC-DS全部的SQL,但這是基於小數據集。在100TB這個量級上,Spark暴露出了一些問題致使有些SQL執行效率不高,甚至沒法順利執行。在作實驗的過程當中,咱們在自適應執行框架的基礎上,對Spark也作了其它的優化改進,來確保全部SQL在100TB數據集上能夠成功運行。如下是一些典型的問題。

 

統計map端輸出數據時driver單點瓶頸的優化(SPARK-22537)

 

在每一個map任務結束後,會有一個表示每一個partition大小的數據結構(即下面提到的CompressedMapStatus或HighlyCompressedMapStatus)返回給driver。而在自適應執行中,當一次shuffle的map stage結束後,driver會聚合每一個mapper給出的partition大小信息,獲得在各個partition上全部mapper輸出的數據總大小。該統計由單線程完成,若是mapper的數量是M,shuffle partition的數量爲S,那麼統計的時間複雜度在O(M x S) ~ O (M x S x log(M x S)) 之間,當CompressedMapStatus被使用時,複雜度爲這個區間的下限,當HighlyCompressedMapStatus被使用時,空間有所節省,時間會更長,在幾乎全部的partition數據都爲空時,複雜度會接近該區間的上限。

 

在M x S增大時,咱們會遇到driver上的單點瓶頸,一個明顯的表現是UI上map stage和reduce stage之間的停頓。爲了解決這個單點瓶頸,咱們將任務儘可能均勻地劃分給多個線程,線程之間不相交地爲scala Array中的不一樣元素賦聚合值。

在這項優化中,新的spark.shuffle.mapOutput.parallelAggregationThreshold(簡稱threshold)被引入,用於配置使用多線程聚合的閾值,聚合的並行度由JVM中可用core數和M * S / threshold + 1中的小值決定。

 

Shuffle讀取連續partition時的優化 (SPARK-9853)

 

在自適應執行的模式下,一個reducer可能會從一個mapoutput文件中讀取諾幹個連續的數據塊。目前的實現中,它須要拆分紅許多獨立的getBlockData調用,每次調用分別從硬盤讀取一小塊數據,這樣就須要不少的磁盤IO。咱們對這樣的場景作了優化,使得Spark能夠一次性地把這些連續數據塊都讀上來,這樣就大大減小了磁盤的IO。在小的基準測試程序中,咱們發現shuffle read的性能能夠提高3倍。

 

BroadcastHashJoin中避免沒必要要的partition讀的優化

 

自適應執行能夠爲現有的operator提供更多優化的可能。在SortMergeJoin中有一個基本的設計:每一個reducetask會先讀取左表中的記錄,若是左表的 partition爲空,則右表中的數據咱們無需關注(對於非anti join的狀況),這樣的設計在左表有一些partition爲空時能夠節省沒必要要的右表讀取,在SortMergeJoin中這樣的實現很天然。

 

BroadcastHashJoin中不存在按照join key分區的過程,因此缺失了這項優化。然而在自適應執行的一些狀況中,利用stage間的精確統計信息,咱們能夠找回這項優化:若是SortMergeJoin在運行時被轉換成了BroadcastHashJoin,且咱們能獲得各個partition key對應partition的精確大小,則新轉換成的BroadcastHashJoin將被告知:無需去讀那些小表中爲空的partition,由於不會join出任何結果。

 

Baidu真實產品線試用狀況

 

咱們將自適應執行優化應用在Baidu內部基於Spark SQL的即席查詢服務BaiduBig SQL之上,作了進一步的落地驗證,經過選取單日全天真實用戶查詢,按照原有執行順序回放重跑和分析,獲得以下幾點結論:

 

  1. 對於秒級的簡單查詢,自適應版本的性能提高並不明顯,這主要是由於它們的瓶頸和主要耗時集中在了IO上面,而這不是自適應執行的優化點。

  2. 按照查詢複雜度維度考量測試結果發現:查詢中迭代次數越多,多表join場景越複雜的狀況下自適應執行效果越好。咱們簡單按照group by, sort, join, 子查詢等操做個數來將查詢分類,如上關鍵詞大於3的查詢有明顯的性能提高,優化比從50%~200%不等,主要優化點來源於shuffle的動態併發數調整及join優化。

  3. 從業務使用角度來分析,前文所述SortMergeJoin轉BroadcastHashJoin的優化在Big SQL場景中命中了多種典型的業務SQL模板,試考慮以下計算需求:用戶指望從兩張不一樣維度的計費信息中撈取感興趣的user列表在兩個維度的總體計費。收入信息原表大小在百T級別,用戶列表只包含對應用戶的元信息,大小在10M之內。兩張計費信息表字段基本一致,因此咱們將兩張表與用戶列表作inner join後union作進一步分析,SQL表達以下:

 

select t.c1, t.id, t.c2, t.c3, t.c4,  sum(t.num1), sum(t.num2), sum(t.num3) from

(

select  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 from basedata.shitu_a t1 INNER JOIN basedata.user_82_1512023432000 t2  ON (t1.id = t2.id)  where  (event_day=20171107)  and flag !=  'true'  group by c1, t1.id, c2, c3, c4

union  all

select  c1, t1.id as id, c2, c3, c4, sum(num1s) as num1, sum(num2) as num2, sum(num3)  as num3 from basedata.shitu_b t1 INNER JOIN basedata.user_82_1512023432000 t2  ON (t1.id = t2.id)  where  (event_day=20171107)  and flag !=  'true'  group by c1, t1.id, c2, c3, c4

) t group by t.c1, t.id, t.c2, t.c3, c4

 

 

對應的原版Spark執行計劃以下:

 

圖14

 

針對於此類用戶場景,能夠所有命中自適應執行的join優化邏輯,執行過程當中屢次SortMergeJoin轉爲BroadcastHashJoin,減小了中間內存消耗及多輪sort,獲得了近200%的性能提高。

 

結合上述3點,下一步自適應執行在Baidu內部的優化落地工做將進一步集中在大數據量、複雜查詢的例行批量做業之上,並考慮與用戶查詢複雜度關聯進行動態的開關控制。對於數千臺的大規模集羣上運行的複雜查詢,自適應執行能夠動態調整計算過程當中的並行度,能夠幫助大幅提高集羣的資源利用率。另外,自適應執行能夠獲取到多輪stage之間更完整的統計信息,下一步咱們也考慮將對應數據及Strategy接口開放給Baidu Spark平臺上層用戶,針對特殊做業進行進一步的定製化Strategy策略編寫。

 

總結

 

隨着Spark SQL普遍的使用以及業務規模的不斷增加,在大規模數據集上遇到的易用性和性能方面的挑戰將日益明顯。本文討論了三個典型的問題,包括調整shuffle partition數量,選擇最佳執行計劃和數據傾斜。這些問題在現有的框架下並不容易解決,而自適應執行能夠很好地應對這些問題。咱們介紹了自適應執行的基本架構以及解決這些問題的具體方法。最後咱們在TPC-DS 100TB數據集上驗證了自適應執行的優點,相比較原版Spark SQL,103個SQL查詢中,90%的查詢都獲得了明顯的性能提高,最大的提高達到3.8倍,而且原先失敗的5個查詢在自適應執行下也順利完成。咱們在百度的Big SQL平臺也作了進一步的驗證,對於複雜的真實查詢能夠達到2倍的性能提高。總之,自適應執行解決了Spark SQL在大數據規模上遇到的不少挑戰,而且很大程度上改善了Spark SQL的易用性和性能,提升了超大集羣中多租戶多併發做業狀況下集羣的資源利用率。未來,咱們考慮在自適應執行的框架之下,提供更多運行時能夠優化的策略,而且將咱們的工做貢獻回饋給社區,也但願有更多的朋友能夠參與進來,將其進一步完善。

相關文章
相關標籤/搜索