1.數據傾斜發生時的現象sql
絕大多數task執行得都很是快,但個別task執行極慢。好比,總共有1000個task,997個task都在1分鐘以內執行完了,可是剩餘兩三個task卻要一兩個小時。這種狀況很常見。
本來可以正常執行的Spark做業,某天忽然報出OOM(內存溢出)異常,觀察異常棧,是咱們寫的業務代碼形成的。這種狀況比較少見。性能
2.數據傾斜發生的原理spa
數據傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的key拉取到某個節點上的一個task來進行處理,好比按照key進行 聚合或join等操做。此時若是某個key對應的數據量特別大的話,就會發生數據傾斜。好比大部分key對應10條數據,可是個別key卻對應了100萬 條數據,那麼大部分task可能就只會分配到10條數據,而後1秒鐘就運行完了;可是個別task可能分配到了100萬數據,要運行一兩個小時。所以,整 個Spark做業的運行進度是由運行時間最長的那個task決定的。內存
3.解決方案一:使用Hive ETL預處理數據it
方案適用場景:致使數據傾斜的是Hive表。若是該Hive表中的數據自己很不均勻(好比某個key對應了100萬數據,其餘key纔對應了10條數據),並且業務場景須要頻繁使用Spark對Hive表執行某個分析操做,那麼比較適合使用這種技術方案。spark
方案實現思路:此時能夠評估一下,是否能夠經過Hive來進行數據預處理(即經過Hive ETL預先對數據按照key進行聚合,或者是預先和其餘表進行join),而後在Spark做業中針對的數據源就不是原來的Hive表了,而是預處理後的 Hive表。此時因爲數據已經預先進行過聚合或join操做了,那麼在Spark做業中也就不須要使用原先的shuffle類算子執行這類操做了。io
方案實現原理:這種方案從根源上解決了數據傾斜,由於完全避免了在Spark中執行shuffle類算子,那麼 確定就不會有數據傾斜的問題了。可是這裏也要提醒一下你們,這種方式屬於治標不治本。由於畢竟數據自己就存在分佈不均勻的問題,因此Hive ETL中進行group by或者join等shuffle操做時,仍是會出現數據傾斜,致使Hive ETL的速度很慢。咱們只是把數據傾斜的發生提早到了Hive ETL中,避免Spark程序發生數據傾斜而已。原理
方案優勢:實現起來簡單便捷,效果還很是好,徹底規避掉了數據傾斜,Spark做業的性能會大幅度提高。隨機數
方案缺點:治標不治本,Hive ETL中仍是會發生數據傾斜。程序
4.解決方案二:過濾少數致使傾斜的key
方案適用場景:若是發現致使傾斜的key就少數幾個,並且對計算自己的影響並不大的話,那麼很適合使用這種方案。好比99%的key就對應10條數據,可是隻有一個key對應了100萬數據,從而致使了數據傾斜。
方案實現思路:若是咱們判斷那少數幾個數據量特別多的key,對做業的執行和計算結果不是特別重要的話,那麼幹 脆就直接過濾掉那少數幾個key。好比,在Spark SQL中可使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。若是須要每次做業執行時,動態斷定哪些key的數據量最多而後再進行過濾,那麼可使用 sample算子對RDD進行採樣,而後計算出每一個key的數量,取數據量最多的key過濾掉便可。
方案實現原理:將致使數據傾斜的key給過濾掉以後,這些key就不會參與計算了,天然不可能產生數據傾斜。
方案優勢:實現簡單,並且效果也很好,能夠徹底規避掉數據傾斜。
方案缺點:適用場景很少,大多數狀況下,致使傾斜的key仍是不少的,並非只有少數幾個。
5.解決方案三:提升shuffle操做的並行度
方案適用場景:若是咱們必需要對數據傾斜迎難而上,那麼建議優先使用這種方案,由於這是處理數據傾斜最簡單的一種方案。
方案實現思路:在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,好比 reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,好比group by、join等,須要設置一個參數,即spark.sql.shuffle.partitions,該參數表明了shuffle read task的並行度,該值默認是200,對於不少場景來講都有點太小。
方案實現原理:增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個 key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了。具體原理以下圖所示。
方案優勢:實現起來比較簡單,能夠有效緩解和減輕數據傾斜的影響。
方案缺點:只是緩解了數據傾斜而已,沒有完全根除問題,根據實踐經驗來看,其效果有限。
6.解決方案四:兩階段聚合(局部聚合+全局聚合)
方案適用場景:對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個key都打上一個隨機 數,好比10之內的隨機數,此時原先同樣的key就變成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行reduceByKey等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。而後將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操做,就能夠獲得最終結果了,好比(hello, 4)。
方案實現原理:將本來相同的key經過附加隨機前綴的方式,變成多個不一樣的key,就可讓本來被一個task處理的數據分散到多個task上去作局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就能夠獲得最終的結果。具體原理見下圖。
方案優勢:對於聚合類的shuffle操做致使的數據傾斜,效果是很是不錯的。一般均可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark做業的性能提高數倍以上。
方案缺點:僅僅適用於聚合類的shuffle操做,適用範圍相對較窄。若是是join類的shuffle操做,還得用其餘的解決方案。