spak數據傾斜解決方案

數據傾斜解決方案

數據傾斜的解決,跟以前講解的性能調優,有一點殊途同歸之妙。sql

性能調優中最有效最直接最簡單的方式就是加資源加並行度,並注意RDD架構(複用同一個RDD,加上cache緩存)。相對於前面,shuffle、jvm等是次要的。緩存

6.一、原理以及現象分析

6.1.一、數據傾斜怎麼出現的

在執行shuffle操做的時候,是按照key,來進行values的數據的輸出、拉取和聚合的。架構

同一個key的values,必定是分配到一個reduce task進行處理的。jvm

多個key對應的values,好比一共是90萬。可能某個key對應了88萬數據,被分配到一個task上去面去執行。函數

另外兩個task,可能各分配到了1萬數據,多是數百個key,對應的1萬條數據。性能

這樣就會出現數據傾斜問題。spa

想象一下,出現數據傾斜之後的運行的狀況。很糟糕!對象

其中兩個task,各分配到了1萬數據,可能同時在10分鐘內都運行完了。另一個task有88萬條,88 * 10 =  880分鐘 = 14.5個小時。blog

你們看,原本另外兩個task很快就運行完畢了(10分鐘),可是因爲一個拖後腿的傢伙,第三個task,要14.5個小時才能運行完,就致使整個spark做業,也得14.5個小時才能運行完。內存

數據傾斜,一旦出現,是否是性能殺手?!

6.1.二、發生數據傾斜之後的現象

Spark數據傾斜,有兩種表現:

一、你的大部分的task,都執行的特別特別快,(你要用client模式,standalone client,yarn client,本地機器一執行spark-submit腳本,就會開始打印log),task175 finished,剩下幾個task,執行的特別特別慢,前面的task,通常1s能夠執行完5個,最後發現1000個task,998,999 task,要執行1個小時,2個小時才能執行完一個task。

出現以上loginfo,就代表出現數據傾斜了。

這樣還算好的,由於雖然老牛拉破車同樣很是慢,可是至少還能跑。

二、另外一種狀況是,運行的時候,其餘task都執行完了,也沒什麼特別的問題,可是有的task,就是會忽然間報了一個OOM,JVM Out Of Memory,內存溢出了,task failed,task lost,resubmitting task。反覆執行幾回都到了某個task就是跑不通,最後就掛掉。

某個task就直接OOM,那麼基本上也是由於數據傾斜了,task分配的數量實在是太大了!因此內存放不下,而後你的task每處理一條數據,還要建立大量的對象,內存爆掉了。

這樣也代表出現數據傾斜了。

這種就不太好了,由於你的程序若是不去解決數據傾斜的問題,壓根兒就跑不出來。

做業都跑不完,還談什麼性能調優這些東西?!

6.1.三、定位數據傾斜出現的緣由與出現問題的位置

根據log去定位

出現數據傾斜的緣由,基本只多是由於發生了shuffle操做,在shuffle的過程當中,出現了數據傾斜的問題。由於某個或者某些key對應的數據,遠遠的高於其餘的key。

一、你在本身的程序裏面找找,哪些地方用了會產生shuffle的算子,groupByKey、countByKey、reduceByKey、join

二、看log

log通常會報是在你的哪一行代碼,致使了OOM異常。或者看log,看看是執行到了第幾個stage。spark代碼,是怎麼劃分紅一個一個的stage的。哪個stage生成的task特別慢,就可以本身用肉眼去對你的spark代碼進行stage的劃分,就可以經過stage定位到你的代碼,到底哪裏發生了數據傾斜。

 一、使用Hive ETL預處理數據

方案適用場景:

若是致使數據傾斜的是Hive表。若是該Hive表中的數據自己很不均勻(好比某個key對應了100萬數據,其餘key纔對應了10條數據),並且業務場景須要頻繁使用Spark對Hive表執行某個分析操做,那麼比較適合使用這種技術方案。

方案實現思路:

此時能夠評估一下,是否能夠經過Hive來進行數據預處理(即經過Hive ETL預先對數據按照key進行聚合,或者是預先和其餘表進行join),而後在Spark做業中針對的數據源就不是原來的Hive表了,而是預處理後的Hive表。此時因爲數據已經預先進行過聚合或join操做了,那麼在Spark做業中也就不須要使用原先的shuffle類算子執行這類操做了。

方案實現原理:

這種方案從根源上解決了數據傾斜,由於完全避免了在Spark中執行shuffle類算子,那麼確定就不會有數據傾斜的問題了。可是這裏也要提醒一下你們,這種方式屬於治標不治本。由於畢竟數據自己就存在分佈不均勻的問題,因此Hive ETL中進行group by或者join等shuffle操做時,仍是會出現數據傾斜,致使Hive ETL的速度很慢。咱們只是把數據傾斜的發生提早到了Hive ETL中,避免Spark程序發生數據傾斜而已。

 

二、過濾少數致使傾斜的key

方案適用場景:

若是發現致使傾斜的key就少數幾個,並且對計算自己的影響並不大的話,那麼很適合使用這種方案。好比99%的key就對應10條數據,可是隻有一個key對應了100萬數據,從而致使了數據傾斜。

方案實現思路:

若是咱們判斷那少數幾個數據量特別多的key,對做業的執行和計算結果不是特別重要的話,那麼幹脆就直接過濾掉那少數幾個key。好比,在Spark SQL中可使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。若是須要每次做業執行時,動態斷定哪些key的數據量最多而後再進行過濾,那麼可使用sample算子對RDD進行採樣,而後計算出每一個key的數量,取數據量最多的key過濾掉便可。

方案實現原理:

將致使數據傾斜的key給過濾掉以後,這些key就不會參與計算了,天然不可能產生數據傾斜。

 

三、提升shuffle操做的並行度

方案實現思路:

在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,好比reduceByKey(1000),該參數就設置了這個shuffle算子執行時shuffle read task的數量。對於Spark SQL中的shuffle類語句,好比group by、join等,須要設置一個參數,即spark.sql.shuffle.partitions,該參數表明了shuffle read task的並行度,該值默認是200,對於不少場景來講都有點太小。

方案實現原理:

增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。舉例來講,若是本來有5個不一樣的key,每一個key對應10條數據,這5個key都是分配給一個task的,那麼這個task就要處理50條數據。而增長了shuffle read task之後,每一個task就分配到一個key,即每一個task就處理10條數據,那麼天然每一個task的執行時間都會變短了。

 

四、雙重聚合

方案適用場景:

對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。

方案實現思路:

這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每一個key都打上一個隨機數,好比10之內的隨機數,此時原先同樣的key就變成不同的了,好比(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着對打上隨機數後的數據,執行reduceByKey等聚合操做,進行局部聚合,那麼局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。而後將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操做,就能夠獲得最終結果了,好比(hello, 4)。

方案實現原理:

將本來相同的key經過附加隨機前綴的方式,變成多個不一樣的key,就可讓本來被一個task處理的數據分散到多個task上去作局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就能夠獲得最終的結果。若是一個RDD中有一個key致使數據傾斜,同時還有其餘的key,那麼通常先對數據集進行抽樣,而後找出傾斜的key,再使用filter對原始的RDD進行分離爲兩個RDD,一個是由傾斜的key組成的RDD1,一個是由其餘的key組成的RDD2,那麼對於RDD1可使用加隨機前綴進行多分區多task計算,對於另外一個RDD2正常聚合計算,最後將結果再合併起來。

隨機前綴加幾,ReduceByKey分幾個區。

 

五、將reduce join轉爲map join(完全避免數據傾斜)

BroadCast+filter(或者map)

方案適用場景:

在對RDD使用join類操做,或者是在Spark SQL中使用join語句時,並且join操做中的一個RDD或表的數據量比較小(好比幾百M或者一兩G),比較適用此方案。

方案實現思路:

不使用join算子進行鏈接操做,而使用Broadcast變量與map類算子實現join操做,進而徹底規避掉shuffle類的操做,完全避免數據傾斜的發生和出現。將較小RDD中的數據直接經過collect算子拉取到Driver端的內存中來,而後對其建立一個Broadcast變量;接着對另一個RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD的全量數據,與當前RDD的每一條數據按照鏈接key進行比對,若是鏈接key相同的話,那麼就將兩個RDD的數據用你須要的方式鏈接起來。

方案實現原理:

普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是map join,此時就不會發生shuffle操做,也就不會發生數據傾斜。

 

六、採樣傾斜key並分拆join操做

方案適用場景:

兩個RDD/Hive表進行join的時候,若是數據量都比較大,沒法採用「解決方案五」,那麼此時能夠看一下兩個RDD/Hive表中的key分佈狀況。若是出現數據傾斜,是由於其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另外一個RDD/Hive表中的全部key都分佈比較均勻,那麼採用這個解決方案是比較合適的。

方案實現思路:

對包含少數幾個數據量過大的key的那個RDD,經過sample算子採樣出一份樣原本,而後統計一下每一個key的數量,計算出來數據量最大的是哪幾個key。而後將這幾個key對應的數據從原來的RDD中拆分出來,造成一個單獨的RDD,並給每一個key都打上n之內的隨機數做爲前綴,而不會致使傾斜的大部分key造成另一個RDD。接着將須要join的另外一個RDD,也過濾出來那幾個傾斜key對應的數據並造成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會致使傾斜的大部分key也造成另一個RDD。再將附加了隨機前綴的獨立RDD與另外一個膨脹n倍的獨立RDD進行join,此時就能夠將原先相同的key打散成n份,分散到多個task中去進行join了。而另外兩個普通的RDD就照常join便可。最後將兩次join的結果使用union算子合併起來便可,就是最終的join結果 。

 

七、使用隨機前綴和擴容RDD進行join

 

方案適用場景:

若是在進行join操做時,RDD中有大量的key致使數據傾斜,那麼進行分拆key也沒什麼意義,此時就只能使用最後一種方案來解決問題了。

方案實現思路:

該方案的實現思路基本和「解決方案六」相似,首先查看RDD/Hive表中的數據分佈狀況,找到那個形成數據傾斜的RDD/Hive表,好比有多個key都對應了超過1萬條數據。而後將該RDD的每條數據都打上一個n之內的隨機前綴。同時對另一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴。最後將兩個處理後的RDD進行join便可。

相關文章
相關標籤/搜索