Spark 解決數據傾斜的幾種經常使用方法

數據傾斜是大數據計算中一個最棘手的問題,出現數據傾斜後,Spark 做業的性能會比指望值差不少。數據傾斜的調優,就是利用各類技術方案解決不一樣類型的數據傾斜問題,保證 Spark 做業的性能。web

一,數據傾斜原理

一個 Spark 做業,會根據其內部的 Action 操做劃分紅多個 job,每一個 job 內部又會根據 shuffle 操做劃分紅多個 stage,而後每一個 stage 會分配多個 task 去執行任務。每一個 task 會領取一個 partition 的數據處理。sql

同一個 stage 內的 task 是能夠並行處理數據的,具備依賴關係的不一樣 stage 之間是串行處理的。因爲這個處理機制,假設某個 Spark 做業中的某個 job 有兩個 stage,分別爲 stage0 和 stage1,那麼 stage1 必需要等待 stage0 處理結束才能進行。若是 stage0 內部分配了 n 個 task 進行計算任務,其中有一個 task 領取的 partition 數據過大,執行了 1 個小時還沒結束,其他的 n-1 個 task 在半小時內就執行結束了,都在等這最後一個 task 執行結束才能進入下一個 stage。這種因爲某個 stage 內部的 task 領取的數據量過大的現象就是數據傾斜。session

下圖就是一個例子:hello 這個 key 對應了 7 條數據,映射到了同一個 task 去處理了,剩餘的 2 個 task 分別只處理了一個數據。ide

數據傾斜

二,數據傾斜發生的現象

1,絕大多數task執行得都很是快,但個別task執行極慢。好比,總共有1000個task,997個task都在1分鐘以內執行完了,可是剩餘兩三個task卻要一兩個小時。這種狀況很常見。性能

2,本來可以正常執行的Spark做業,某天忽然報出OOM(內存溢出)異常,觀察異常棧,是咱們寫的業務代碼形成的。這種狀況比較少見。大數據

三,如何定位數據傾斜的代碼

數據傾斜只會發生在 shuffle 過程當中。經常使用而且可能會觸發 shuffle 操做的算子有:distinct,groupByKey,reduceByKey,aggregateByKey,join,cogroup,repartition 等。出現數據傾斜,頗有可能就是使用了這些算子中的某一個致使的。spa

若是咱們是 yarn-client 模式提交,咱們能夠在本地直接查看 log,在 log 中定位到當前運行到了哪一個 stage;若是用的 yarn-cluster 模式提交的話,咱們能夠經過 spark web UI 來查看當前運行到了哪一個 stage。不管用的哪一種模式咱們均可以在 spark web UI 上面查看到當前這個 stage 的各個 task 的數據量和運行時間,從而可以進一步肯定是否是 task 的數據分配不均致使的數據傾斜。3d

當肯定了發生數據傾斜的 stage 後,咱們能夠找出會觸發 shuffle 的算子,推算出發生傾斜的那個 stage 對應代碼。觸發 shuffle 操做的除了上面提到的那些算子外,還要注意使用 spark sql 的某些 sql 語句,好比 group by 等。日誌

四,解決方法

解決數據傾斜的思路就是保證每一個 stage 內部的 task 領取的數據足夠均勻。一個是想辦法讓數據源在 Spark 內部計算粒度的這個維度上劃分足夠均勻,若是作不到這個,就要相辦法將讀取的數據源進行加工,儘可能保證均勻。大體有如下幾種方案。cdn

1,聚合源數據和過濾致使傾斜的 key

a,聚合源數據

假設咱們的某個 Spark 做業的數據源是天天 ETL 存儲到 Hive 中的數據,這些數據主要是電商平臺天天用戶的操做日誌。Spark 做業中分析的粒度是 session,那麼咱們能夠在往 Hive 中寫數據的時候就保證每條數據對應一個 session 的全部信息,也就是以 session 爲粒度的將數據寫入到 Hive 中。

這樣咱們能夠保證一點,咱們 Spark 做業中就沒有必要作一些 groupByKey + map 的操做了,能夠直接對每一個 key 的 value 進行 map 操做,計算咱們須要的數據。省去了 shuffle 操做,避免了 shuffle 時的數據傾斜。

可是,當咱們 Spark 做業中分析粒度不止一個粒度,好比除了 session 這個粒度外,還有 date 的粒度,userId 的粒度等等。這時候是沒法保證這些全部粒度的數據都能聚合到一條數據上的。這時候咱們能夠作個妥協,選擇一個相對比較大的粒度,進行聚合數據。好比咱們按照原來的存儲方式可能有 100W 條數據,但按照某個粒度,好比 date 這個粒度,進行聚合後存儲,這樣的話咱們的數據能夠降到 50W 條,能夠作到減輕數據傾斜的現象。

b,過濾致使傾斜的 key

好比說咱們的 Hive 中數據,共有 100W 個 key,其中有 5 個 key 對應的數據量很是的大,可能有幾十萬條數據(這種狀況在電商平臺上發生惡意刷單時候會出現),其它的 key 對應數據量都只有幾十。若是咱們業務上面可以接受這 5 個 key 對應的數據能夠捨棄。這種狀況下咱們能夠在用 sql 從 Hive 中取數據時過濾掉這個 5 個 key。從而避免了數據傾斜。

2,shuffle 操做提升 reduce 端的並行度

Spark 在作 shuffle 操做時,默認使用的是 HashPartitioner 對數據進行分區。若是 reduce 端的並行度設置的不合適,極可能形成大量不一樣的 key 被分配到同一個 task 上去,形成某個 task 上處理的數據量大於其餘 task,形成數據傾斜。

若是調整 reduce 端的並行度,可讓 reduce 端的每一個 task 處理的數據減小,從而緩解數據傾斜。

設置方法:在對 RDD 執行 shuffle 算子時,給 shuffle 算子傳入一個參數,好比 reduceByKey(1000) ,該參數就設置了這個 shuffle 算子執行時 shuffle read task 的數量。對於 Spark SQL 中的 shuffle 類語句,好比 group by、join 等,須要設置一個參數,即 spark.sql.shuffle.partitions ,該參數表明了 shuffle read task 的並行度,該值默認是 200 ,對於不少場景來講都有點太小。

3,使用隨機 key 進行雙重聚合

在 Spark 中使用 groupByKey 和 reduceByKey 這兩個算子會進行 shuffle 操做。這時候若是 map 端的文件每一個 key 的數據量誤差很大,很容易會形成數據傾斜。

咱們能夠先對須要操做的數據中的 key 拼接上隨機數進行打散分組,這樣原來是一個 key 的數據可能會被分到多個 key 上,而後進行一次聚合,聚合完以後將原來拼在 key 上的隨機數去掉,再進行聚合,這樣對數據傾斜會有比較好的效果。

具體能夠看下圖:

隨機 key 雙重聚合

這種方案對 RDD 執行 reduceByKey 等聚合類 shuffle 算子或者在 Spark SQL 中使用 group by 語句進行分組聚合時,比較適用這種方案。

4, 將 reduce-side join 轉換成 map-side join

兩個 RDD 在進行 join 時會有 shuffle 操做,若是每一個 key 對應的數據分佈不均勻也會有數據傾斜發生。

這種狀況下,若是兩個 RDD 中某個 RDD 的數據量不大,能夠將該 RDD 的數據提取出來,而後作成廣播變量,將數據量大的那個 RDD 作 map 算子操做,而後在 map 算子內和廣播變量進行 join,這樣能夠避免了 join 過程當中的 shuffle,也就避免了 shuffle 過程當中可能會出現的數據傾斜現象。

5,採樣傾斜 key 並拆分 join 操做

當碰到這種狀況:兩個 RDD 進行 join 的時候,其中某個 RDD 中少數的幾個 key 對應的數據量很大,致使了數據傾斜,而另一個 RDD 數據相對分佈均勻。這時候咱們能夠採用這種方法。

1,對包含少數幾個數據量過大的 key 的那個 RDD,經過 sample 算子採樣出一份樣原本,而後統計一下每一個 key 的數量,計算出來數據量最大的是哪幾個 key。

2,而後將這幾個 key 對應的數據從原來的 RDD 中拆分出來,造成一個單獨的 RDD,並給每一個 key 都打上 n 之內的隨機數做爲前綴,而不會致使傾斜的大部分 key 造成另一個 RDD。

3,接着將須要 join 的另外一個 RDD,也過濾出來那幾個傾斜 key 對應的數據並造成一個單獨的 RDD,將每條數據膨脹成 n 條數據,這 n 條數據都按順序附加一個 0~n 的前綴,不會致使傾斜的大部分 key 也造成另一個 RDD。

4,再將附加了隨機前綴的獨立 RDD 與另外一個膨脹 n 倍的獨立 RDD 進行 join,此時就能夠將原先相同的 key 打散成 n 份,分散到多個 task 中去進行 join 了。

5, 而另外兩個普通的 RDD 就照常 join 便可。 最後將兩次 join 的結果使用 union 算子合併起來便可,就是最終的 join 結果。

具體能夠看下圖:

採樣傾斜 key 分別 join

6,使用隨機前綴和擴容 RDD 進行 join

若是在進行 join 操做時,RDD 中有大量的 key 致使數據傾斜,那麼進行分拆 key 也沒什麼意義,這時候能夠採起這種方案。

該方案的實現思路基本和上一種相似,首先查看 RDD 中數據分佈狀況,找到那個形成數據傾斜的 RDD,好比有多個 key 映射數據都很大。而後咱們將該 RDD 沒調數據都打上一個 n 之內的隨機前綴。同時對另外一個 RDD 進行擴容,將其沒調數據擴容成 n 條數據,擴容出來的每條數據依次打上 0~n 的前綴。而後將這兩個 RDD 進行 join。

這種方案和上一種比,少了取樣的過程,由於上一種是針對某個 RDD 中只有很是少的幾個 key 發生數據傾斜,須要針對這幾個 key 特殊處理。而這個方案是針對某個 RDD 有大量的 key 發生數據傾斜,這時候就不必取樣了。

相關文章
相關標籤/搜索