導讀:本文所述內容均基於 2018 年 9 月 17 日 Spark 最新 Spark Release 2.3.1 版本,以及截止到 2018 年 10 月 21 日 Adaptive Execution 最新開發代碼。自動設置 Shuffle Partition 個數已進入 Spark Release 2.3.1 版本,動態調整執行計劃與處理數據傾斜還沒有進入 Spark Release 2.3.1
1 背 景node
Spark SQL / Catalyst 和 CBO 的優化,從查詢自己與目標數據的特色的角度儘量保證了最終生成的執行計劃的高效性。可是sql
執行計劃一旦生成,便不可更改,即便執行過程當中發現後續執行計劃能夠進一步優化,也只能按原計劃執行;性能優化
CBO 基於統計信息生成最優執行計劃,須要提早生成統計信息,成本較大,且不適合數據更新頻繁的場景;網絡
CBO 基於基礎表的統計信息與操做對數據的影響推測中間結果的信息,只是估算,不夠精確。app
本文介紹的 Adaptive Execution 將能夠根據執行過程當中的中間數據優化後續執行,從而提升總體執行效率。核心在於兩點:分佈式
執行計劃可動態調整oop
調整的依據是中間結果的精確統計信息post
2 動態設置 Shuffle Partition2.1 Spark Shuffle 原理性能
Spark Shuffle 通常用於將上游 Stage 中的數據按 Key 分區,保證來自不一樣 Mapper (表示上游 Stage 的 Task)的相同的 Key 進入相同的 Reducer (表示下游 Stage 的 Task)。通常用於 group by 或者 Join 操做。大數據
如上圖所示,該 Shuffle 總共有 2 個 Mapper 與 5 個 Reducer。每一個 Mapper 會按相同的規則(由 Partitioner 定義)將本身的數據分爲五份。每一個 Reducer 從這兩個 Mapper 中拉取屬於本身的那一份數據。
2.2 原有 Shuffle 的問題
使用 Spark SQL 時,可經過spark.sql.shuffle.partitions
指定 Shuffle 時 Partition 個數,也即 Reducer 個數。
該參數決定了一個 Spark SQL Job 中包含的全部 Shuffle 的 Partition 個數。以下圖所示,當該參數值爲 3 時,全部 Shuffle 中 Reducer 個數都爲 3。
這種方法有以下問題:
Partition 個數不宜設置過大;
Reducer(代指 Spark Shuffle 過程當中執行 Shuffle Read 的 Task) 個數過多,每一個 Reducer 處理的數據量太小。大量小 Task 形成沒必要要的 Task 調度開銷與可能的資源調度開銷(若是開啓了 Dynamic Allocation);
Reducer 個數過大,若是 Reducer 直接寫 HDFS 會生成大量小文件,從而形成大量 addBlock RPC,Name node 可能成爲瓶頸,並影響其它使用 HDFS 的應用;
過多 Reducer 寫小文件,會形成後面讀取這些小文件時產生大量 getBlock RPC,對 Name node 產生衝擊;
Partition 個數不宜設置太小
每一個 Reducer 處理的數據量太大,Spill 到磁盤開銷增大;
Reducer GC 時間增加;
Reducer 若是寫 HDFS,每一個 Reducer 寫入數據量較大,沒法充分發揮並行處理優點;
很難保證全部 Shuffle 都最優
不一樣的 Shuffle 對應的數據量不同,所以最優的 Partition 個數也不同。使用統一的 Partition 個數很難保證全部 Shuffle 都最優;
定時任務不一樣時段數據量不同,相同的 Partition 數設置沒法保證全部時間段執行時都最優;
2.3 自動設置 Shuffle Partition 原理
如 Spark Shuffle 原理 一節圖中所示,Stage 1 的 5 個 Partition 數據量分別爲 60MB,40MB,1MB,2MB,50MB。其中 1MB 與 2MB 的 Partition 明顯太小(實際場景中,部分小 Partition 只有幾十 KB 及至幾十字節)。
開啓 Adaptive Execution 後:
Spark 在 Stage 0 的 Shuffle Write 結束後,根據各 Mapper 輸出,統計獲得各 Partition 的數據量,即 60MB,40MB,1MB,2MB,50MB;
經過 ExchangeCoordinator 計算出合適的 post-shuffle Partition 個數(即 Reducer)個數(本例中 Reducer 個數設置爲 3);
啓動相應個數的 Reducer 任務;
每一個 Reducer 讀取一個或多個 Shuffle Write Partition 數據(以下圖所示,Reducer 0 讀取 Partition 0,Reducer 1 讀取 Partition 一、二、3,Reducer 2 讀取 Partition 4)。
三個 Reducer 這樣分配是由於:
targetPostShuffleInputSize 默認爲 64MB,每一個 Reducer 讀取數據量不超過 64MB;
若是 Partition 0 與 Partition 2 結合,Partition 1 與 Partition 3 結合,雖然也都不超過 64 MB。但讀完 Partition 0 再讀 Partition 2,對於同一個 Mapper 而言,若是每一個 Partition 數據比較少,跳着讀多個 Partition 至關於隨機讀,在HDD 上性能不高;
目前的作法是隻結合相臨的 Partition,從而保證順序讀,提升磁盤 IO 性能;
該方案只會合併多個小的 Partition,不會將大的 Partition 拆分,由於拆分過程須要引入一輪新的 Shuffle;
基於上面的緣由,默認 Partition 個數(本例中爲 5)能夠大一點,而後由 ExchangeCoordinator 合併。若是設置的 Partition 個數過小,Adaptive Execution 在此場景下沒法發揮做用。
由上圖可見,Reducer 1 從每一個 Mapper 讀取 Partition 一、二、3 都有三根線,是由於原來的 Shuffle 設計中,每一個 Reducer 每次經過 Fetch 請求從一個特定 Mapper 讀數據時,只能讀一個 Partition 的數據。也即在上圖中,Reducer 1 讀取 Mapper 0 的數據,須要 3 輪 Fetch 請求。對於 Mapper 而言,須要讀三次磁盤,至關於隨機 IO。
爲了解決這個問題,Spark 新增接口,一次 Shuffle Read 能夠讀多個 Partition 的數據。以下圖所示,Task 1 經過一輪請求便可同時讀取 Task 0 內 Partition 0、1 和 2 的數據,減小了網絡請求數量。同時 Mapper 0 一次性讀取並返回三個 Partition 的數據,至關於順序 IO,從而提高了性能。
因爲 Adaptive Execution 的自動設置 Reducer 是由 ExchangeCoordinator 根據 Shuffle Write 統計信息決定的,所以即便在同一個 Job 中不一樣 Shuffle 的 Reducer 個數均可以不同,從而使得每次 Shuffle 都儘量最優。
上文 原有 Shuffle 的問題 一節中的例子,在啓用 Adaptive Execution 後,三次 Shuffle 的 Reducer 個數從原來的所有爲 3 變爲 二、四、3。
2.4 使用與優化方法
可經過spark.sql.adaptive.enabled=true
啓用 Adaptive Execution 從而啓用自動設置 Shuffle Reducer 這一特性。
經過spark.sql.adaptive.shuffle.targetPostShuffleInputSize
可設置每一個 Reducer 讀取的目標數據量,其單位是字節,默認值爲 64 MB。上文例子中,若是將該值設置爲 50 MB,最終效果仍然如上文所示,而不會將 Partition 0 的 60MB 拆分。具體緣由上文已說明。
3 動態調整執行計劃3.1 固定執行計劃的不足
在不開啓 Adaptive Execution 以前,執行計劃一旦肯定,即便發現後續執行計劃能夠優化,也不可更改。以下圖所示,SortMergJoin 的 Shuffle Write 結束後,發現 Join 一方的 Shuffle 輸出只有 46.9KB,仍然繼續執行 SortMergeJoin。
此時徹底可將 SortMergeJoin 變動爲 BroadcastJoin 從而提升總體執行效率。
3.2 SortMergeJoin 原理
SortMergeJoin 是經常使用的分佈式 Join 方式,它幾乎可以使用於全部須要 Join 的場景。但有些場景下,它的性能並非最好的。
SortMergeJoin 的原理以下圖所示:
將 Join 雙方以 Join Key 爲 Key 按照 HashPartitioner 分區,且保證分區數一致;
Stage 0 與 Stage 1 的全部 Task 在 Shuffle Write 時,都將數據分爲 5 個 Partition,而且每一個 Partition 內按 Join Key 排序;
Stage 2 啓動 5 個 Task 分別去 Stage 0 與 Stage 1 中全部包含 Partition 分區數據的 Task 中取對應 Partition 的數據。(若是某個 Mapper 不包含該 Partition 的數據,則 Redcuer 無須向其發起讀取請求);
Stage 2 的 Task 2 分別從 Stage 0 的 Task 0、一、2 中讀取 Partition 2 的數據,而且經過 MergeSort 對其進行排序;
Stage 2 的 Task 2 分別從 Stage 1 的 Task 0、1 中讀取 Partition 2 的數據,且經過 MergeSort 對其進行排序;
Stage 2 的 Task 2 在上述兩步 MergeSort 的同時,使用 SortMergeJoin 對兩者進行 Join。
3.3 BroadcastJoin 原理
當參與 Join 的一方足夠小,可所有置於 Executor 內存中時,可以使用 Broadcast 機制將整個 RDD 數據廣播到每個 Executor 中,該 Executor 上運行的全部 Task 皆可直接讀取其數據。(本文中,後續配圖,爲了方便展現,會將整個 RDD 的數據置於 Task 框內,而隱藏 Executor)。
對於大 RDD,按正常方式,每一個 Task 讀取並處理一個 Partition 的數據,同時讀取 Executor 內的廣播數據,該廣播數據包含了小 RDD 的全量數據,所以可直接與每一個 Task 處理的大 RDD 的部分數據直接 Join。
根據 Task 內具體的 Join 實現的不一樣,又可分爲 BroadcastHashJoin 與 BroadcastNestedLoopJoin。後文不區分這兩種實現,統稱爲 BroadcastJoin。
與 SortMergeJoin 相比,BroadcastJoin 不須要 Shuffle,減小了 Shuffle 帶來的開銷,同時也避免了 Shuffle 帶來的數據傾斜,從而極大地提高了 Job 執行效率。
同時,BroadcastJoin 帶來了廣播小 RDD 的開銷。另外,若是小 RDD 過大,沒法存於 Executor 內存中,則沒法使用 BroadcastJoin。
對於基礎表的 Join,可在生成執行計劃前,直接經過 HDFS 獲取各表的大小,從而判斷是否適合使用 BroadcastJoin。但對於中間表的 Join,沒法提早準確判斷中間表大小從而精確判斷是否適合使用 BroadcastJoin。
《Spark SQL 性能優化再進一步 CBO 基於代價的優化》一文介紹的 CBO 可經過表的統計信息與各操做對數據統計信息的影響,推測出中間表的統計信息,可是該方法獲得的統計信息不夠準確。同時該方法要求提早分析表,具備較大開銷。
而開啓 Adaptive Execution 後,可直接根據 Shuffle Write 數據判斷是否適用 BroadcastJoin。
3.4 動態調整執行計劃原理
如上文 SortMergeJoin 原理 中配圖所示,SortMergeJoin 須要先對 Stage 0 與 Stage 1 按一樣的 Partitioner 進行 Shuffle Write。
Shuffle Write 結束後,可從每一個 ShuffleMapTask 的 MapStatus 中統計獲得按原計劃執行時 Stage 2 各 Partition 的數據量以及 Stage 2 須要讀取的總數據量。(通常來講,Partition 是 RDD 的屬性而非 Stage 的屬性,本文爲了方便,不區分 Stage 與 RDD。能夠簡單認爲一個 Stage 只有一個 RDD,此時 Stage 與 RDD 在本文討論範圍內等價)。
若是其中一個 Stage 的數據量較小,適合使用 BroadcastJoin,無須繼續執行 Stage 2 的 Shuffle Read。相反,可利用 Stage 0 與 Stage 1 的數據進行 BroadcastJoin,以下圖所示。
具體作法是:
將 Stage 1 所有 Shuffle Write 結果廣播出去
啓動 Stage 2,Partition 個數與 Stage 0 同樣,都爲 3
每一個 Stage 2 每一個 Task 讀取 Stage 0 每一個 Task 的 Shuffle Write 數據,同時與廣播獲得的 Stage 1 的全量數據進行 Join
注:廣播數據存於每一個 Executor 中,其上全部 Task 共享,無須爲每一個 Task 廣播一份數據。上圖中,爲了更清晰展現爲何可以直接 Join 而將 Stage 2 每一個 Task 方框內都放置了一份 Stage 1 的全量數據。
雖然 Shuffle Write 已完成,將後續的 SortMergeJoin 改成 Broadcast 仍然能提高執行效率:
SortMergeJoin 須要在 Shuffle Read 時對來自 Stage 0 與 Stage 1 的數據進行 Merge Sort,而且可能須要 Spill 到磁盤,開銷較大;
SortMergeJoin 時,Stage 2 的全部 Task 須要取 Stage 0 與 Stage 1 的全部 Task 的輸出數據(若是有它要的數據 ),會形成大量的網絡鏈接。且當 Stage 2 的 Task 較多時,會形成大量的磁盤隨機讀操做,效率不高,且影響相同機器上其它 Job 的執行效率;
SortMergeJoin 時,Stage 2 每一個 Task 須要從幾乎全部 Stage 0 與 Stage 1 的 Task 取數據,沒法很好利用 Locality;
Stage 2 改用 Broadcast,每一個 Task 直接讀取 Stage 0 的每一個 Task 的數據(一對一),可很好利用 Locality 特性。最好在 Stage 0 使用的 Executor 上直接啓動 Stage 2 的 Task。若是 Stage 0 的 Shuffle Write 數據並未 Spill 而是在內存中,則 Stage 2 的 Task 可直接讀取內存中的數據,效率很是高。若是有 Spill,那可直接從本地文件中讀取數據,且是順序讀取,效率遠比經過網絡隨機讀數據效率高。
3.5 使用與優化方法
該特性的使用方式以下:
當spark.sql.adaptive.enabled
與spark.sql.adaptive.join.enabled
都設置爲true
時,開啓 Adaptive Execution 的動態調整 Join 功能;
spark.sql.adaptiveBroadcastJoinThreshold
設置了 SortMergeJoin 轉 BroadcastJoin 的閾值。若是不設置該參數,該閾值與spark.sql.autoBroadcastJoinThreshold
的值相等;
除了本文所述 SortMergeJoin 轉 BroadcastJoin,Adaptive Execution 還可提供其它 Join 優化策略。部分優化策略可能會須要增長 Shuffle。spark.sql.adaptive.allowAdditionalShuffle
參數決定了是否容許爲了優化 Join 而增長 Shuffle。其默認值爲 false。
4 自動處理數據傾斜4.1 解決數據傾斜典型方案
《Spark 性能優化之道——解決 Spark 數據傾斜(Data Skew)的 N 種姿式》一文講述了數據傾斜的危害,產生緣由,以及典型解決方法。
保證文件可 Split 從而避免讀 HDFS 時數據傾斜;
保證 Kafka 各 Partition 數據均衡從而避免讀 Kafka 引發的數據傾斜;
調整並行度或自定義 Partitioner 從而分散分配給同一 Task 的大量不一樣 Key;
使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 從而避免 Shuffle 引發的數據傾斜;
對傾斜 Key 使用隨機前綴或後綴從而分散大量傾斜 Key,同時將參與 Join 的小表擴容,從而保證 Join 結果的正確性。
4.2 自動解決數據傾斜
目前 Adaptive Execution 可解決 Join 時數據傾斜問題。其思路可理解爲將部分傾斜的 Partition (傾斜的判斷標準爲該 Partition 數據是全部 Partition Shuffle Write 中位數的 N 倍) 進行單獨處理,相似於 BroadcastJoin,以下圖所示。
在上圖中,左右兩邊分別是參與 Join 的 Stage 0 與 Stage 1 (實際應該是兩個 RDD 進行 Join,但如同上文所述,這裏不區分 RDD 與 Stage),中間是獲取 Join 結果的 Stage 2。
明顯 Partition 0 的數據量較大,這裏假設 Partition 0 符合「傾斜」的條件,其它 4 個 Partition 未傾斜。
以 Partition 對應的 Task 2 爲例,它需獲取 Stage 0 的三個 Task 中全部屬於 Partition 2 的數據,並使用 MergeSort 排序。同時獲取 Stage 1 的兩個 Task 中全部屬於 Partition 2 的數據並使用 MergeSort 排序。而後對兩者進行 SortMergeJoin。
對於 Partition 0,可啓動多個 Task:
在上圖中,啓動了兩個 Task 處理 Partition 0 的數據,分別名爲 Task 0-0 與 Task 0-1
Task 0-0 讀取 Stage 0 Task 0 中屬於 Partition 0 的數據
Task 0-1 讀取 Stage 0 Task 1 與 Task 2 中屬於 Partition 0 的數據,並進行 MergeSort
Task 0-0 與 Task 0-1 都從 Stage 1 的兩個 Task 中全部屬於 Partition 0 的數據
Task 0-0 與 Task 0-1 使用 Stage 0 中屬於 Partition 0 的部分數據與 Stage 1中屬於 Partition 0 的全量數據進行 Join
經過該方法,本來由一個 Task 處理的 Partition 0 的數據由多個 Task 共同處理,每一個 Task 需處理的數據量減小,從而避免了 Partition 0 的傾斜。
對於 Partition 0 的處理,有點相似於 BroadcastJoin 的作法。但區別在於,Stage 2 的 Task 0-0 與 Task 0-1 同時獲取 Stage 1 中屬於 Partition 0 的全量數據,是經過正常的 Shuffle Read 機制實現,而非 BroadcastJoin 中的變量廣播實現。
4.3 使用與優化方法
開啓與調優該特性的方法以下:
將spark.sql.adaptive.skewedJoin.enabled
設置爲 true 便可自動處理 Join 時數據傾斜;
spark.sql.adaptive.skewedPartitionMaxSplits
控制處理一個傾斜 Partition 的 Task 個數上限,默認值爲 5;
spark.sql.adaptive.skewedPartitionRowCountThreshold
設置了一個 Partition 被視爲傾斜 Partition 的行數下限,也即行數低於該值的 Partition 不會被看成傾斜 Partition 處理。其默認值爲 10L * 1000 * 1000 即一千萬;
spark.sql.adaptive.skewedPartitionSizeThreshold
設置了一個 Partition 被視爲傾斜 Partition 的大小下限,也即大小小於該值的 Partition 不會被視做傾斜 Partition。其默認值爲 64 * 1024 * 1024 也即 64MB;
spark.sql.adaptive.skewedPartitionFactor
該參數設置了傾斜因子。若是一個 Partition 的大小大於spark.sql.adaptive.skewedPartitionSizeThreshold
的同時大於各 Partition 大小中位數與該因子的乘積,或者行數大於spark.sql.adaptive.skewedPartitionRowCountThreshold
的同時大於各 Partition 行數中位數與該因子的乘積,則它會被視爲傾斜的 Partition。
猜你喜歡
歡迎關注本公衆號:iteblog_hadoop:
回覆 spark_summit_201806 下載 Spark Summit North America 201806 所有PPT
回覆 spark_summit_eu_2018 下載 Spark+AI Summit europe 2018 所有PPT
0、回覆 電子書 獲取 本站全部可下載的電子書
二、Elasticsearch 6.3 發佈,大家要的 SQL 功能來了
三、即將發佈的 Apache Spark 2.4 都有哪些新功能
四、乾貨 | 深刻理解 Spark Structured Streaming
五、Apache Spark 黑名單(Blacklist)機制介紹
六、Kafka分區分配策略(Partition Assignment Strategy)
八、乾貨 | Apache Spark 2.0 做業優化技巧
十一、更多大數據文章歡迎訪問https://www.iteblog.com及本公衆號(iteblog_hadoop)十二、Flink中文文檔:http://flink.iteblog.com1三、Carbondata 中文文檔:http://carbondata.iteblog.com