Spark學習——數據傾斜

其餘更多java基礎文章:
java基礎學習(目錄)java


學習資料:數據傾斜是多麼痛?spark做業/面試/調優必備祕籍面試

1. 什麼是數據傾斜

數據傾斜是一種很常見的問題(依據二八定律),簡單來講,比方WordCount中某個Key對應的數據量很是大的話,就會產生數據傾斜,致使兩個後果:算法

  • OOM(單或少數的節點);
  • 拖慢整個Job執行時間(其餘已經完成的節點都在等這個還在作的節點)。

2. 解決數據傾斜須要

  • 搞定 Shuffle;
  • 搞定業務場景;
  • 搞定 CPU core 的使用狀況;
  • 搞定 OOM 的根本緣由等:通常都由於數據傾斜(某task任務的數據量過大,GC壓力大,和Kafka不一樣在於Kafka的內存不通過JVM,其基於Linux的Page)。

3. 致使Spark數據傾斜的本質

Shuffle時,需將各節點的相同key的數據拉取到某節點上的一個task來處理,若某個key對應的數據量很大就會發生數據傾斜。比方說大部分key對應10條數據,某key對應10萬條,大部分task只會被分配10條數據,很快作完,個別task分配10萬條數據,不只運行時間長,且整個stage的做業時間由最慢的task決定。sql

數據傾斜只會發生在Shuffle過程,如下算法可能觸發Shuffle操做: distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。函數

4. 定位最慢的Task所處的源碼位置

步驟一: 看數據傾斜發生在哪一個stage(也就是看以上算子出如今哪一個階段)。yarn-client模式下查看本地log或Spark Web UI中當前運行的是哪一個stage;yarn-cluster模式下,經過Spark Web UI查看運行到了哪一個Stage。 主要看最慢的Stage各task分配的數據量,來肯定是不是數據傾斜。post

步驟二:根據Stage劃分,推算傾斜發生的代碼(必然有Shuffle類算子)。簡單實用方法:只要看到shuffle類算子或Spark SQL的SQL語句會有Shuffle類的算子的句子,就能夠該地方劃分爲先後兩個Stage。(以前用Python的PySpark接口,Spark Web UI會查看task在源碼中的行數,Java或者Scala雖沒用過,但我想應該有)學習

5. 解決方案

方案一:使用Hive ETL預處理

場景:若Hive表中數據不均勻,且業務中會頻繁用Spark對Hive表分析;
思路:用Hive對數據預處理(對key聚合等操做),本來是Spark對Hive的原表操做,如今就是對Hive預處理後的表操做;
原理:從根源解決了數據傾斜,規避了了Spark進行Shuffle類算子操做。但Hive ETL中進行聚合等操做會發生數據傾斜,只是把慢轉移給了Hive ETL;
優勢:方便,效果好,規避了Spark數據傾斜;
缺點:治標不治本,Hive ETL會數據傾斜。spa

方案二:過濾致使傾斜的key

場景:發生傾斜的key不多且不重要;
思路:對發生傾斜的key過濾掉。比方在Spark SQL中用where子句或filter過濾,若每次做業執行,須要動態斷定可以使用sample算子對RDD採樣後取數據量最多的key過濾;
原理:對傾斜的key過濾後,這些key便不會參與後面的計算,從本質上消除數據傾斜;
優勢:簡單,效果明顯;
缺點:適用場景少,實際中致使傾斜的key不少。接口

方案三:提升Shuffle操做並行度

場景:任何場景均可以,優先選擇的最簡單方案;
思路:對RDD操做的Shuffle算子傳入一個參數,也就是設置Shuffle算子執行時的Shuffle read task數量。對於Spark SQL的Shuffle類語句(如group by,join)即spark.sql.shuffle.partitions,表明shuffle read task的並行度,默認值是200可修改;
原理:增大shuffle read task參數值,讓每一個task處理比原來更少的數據;
優勢:簡單,有效;
缺點:緩解的效果頗有限。內存

方案四:雙重聚合(局部聚合+全局聚合)

場景:對RDD進行reduceByKey等聚合類shuffle算子,SparkSQL的groupBy作分組聚合這兩種狀況
思路:首先經過map給每一個key打上n之內的隨機數的前綴並進行局部聚合,即(hello, 1) (hello, 1) (hello, 1) (hello, 1)變爲(1_hello, 1) (1_hello, 1) (2_hello, 1),並進行reduceByKey的局部聚合,而後再次map將key的前綴隨機數去掉再次進行全局聚合;
原理:對本來相同的key進行隨機數附加,變成不一樣key,讓本來一個task處理的數據分攤到多個task作局部聚合,規避單task數據過量。以後再去隨機前綴進行全局聚合;
優勢:效果很是好(對聚合類Shuffle操做的傾斜問題);
缺點:範圍窄(僅適用於聚合類的Shuffle操做,join類的Shuffle還需其它方案)。

方案五:將reduce join轉爲map join

場景:對RDD或Spark SQL使用join類操做或語句,且join操做的RDD或表比較小(百兆或1,2G); 思路:使用broadcast和map類算子實現join的功能替代本來的join,完全規避shuffle。對較小RDD直接collect到內存,並建立broadcast變量;並對另一個RDD執行map類算子,在該算子的函數中,從broadcast變量(collect出的較小RDD)與當前RDD中的每條數據依次比對key,相同的key執行你須要方式的join; 原理:若RDD較小,可採用廣播小的RDD,並對大的RDD進行map,來實現與join一樣的效果。簡而言之,用broadcast-map代替join,規避join帶來的shuffle(無Shuffle無傾斜); 優勢:效果很好(對join操做致使的傾斜),根治; 缺點:適用場景小(大表+小表),廣播(driver和executor節點都會駐留小表數據)小表也耗內存。

方案六:採樣傾斜key並分拆join操做

場景:兩個較大的(沒法採用方案五)RDD/Hive表進行join時,且一個RDD/Hive表中少數key數據量過大,另外一個RDD/Hive表的key分佈較均勻(RDD中二者之一有一個更傾斜);
思路:

  1. 對更傾斜rdd1進行採樣(RDD.sample)並統計出數據量最大的幾個key;
  2. 對這幾個傾斜的key從本來rdd1中拆出造成一個單獨的rdd1_1,並打上0~n的隨機數前綴,被拆分的原rdd1的另外一部分(不包含傾斜key)又造成一個新rdd1_2;
  3. 對rdd2過濾出rdd1傾斜的key,獲得rdd2_1,並將其中每條數據擴n倍,對每條數據按順序附加0~n的前綴,被拆分出key的rdd2也獨立造成另外一個rdd2_2; 【我的認爲,這裏擴了n倍,最後union完還須要將每一個傾斜key對應的value減去(n-1)】
  4. 將加了隨機前綴的rdd1_1和rdd2_1進行join(此時本來傾斜的key被打散n份並被分散到更多的task中進行join); 【我的認爲,這裏應該作兩次join,兩次join中間有一個map去前綴】
  5. 另外兩個普通的RDD(rdd1_二、rdd2_2)照常join;
  6. 最後將兩次join的結果用union結合獲得最終的join結果。 原理:對join致使的傾斜是由於某幾個key,可將本來RDD中的傾斜key拆分出原RDD獲得新RDD,並以加隨機前綴的方式打散n份作join,將傾斜key對應的大量數據分攤到更多task上來規避傾斜;

優勢:前提是join致使的傾斜(某幾個key傾斜),避免佔用過多內存(只需對少數傾斜key擴容n倍);
缺點:對過多傾斜key不適用。

方案七:用隨機前綴和擴容RDD進行join

場景:RDD中有大量key致使傾斜; 思路:與方案六相似。

  1. 查看RDD/Hive表中數據分佈並找到形成傾斜的RDD/表;
  2. 對傾斜RDD中的每條數據打上n之內的隨機數前綴;
  3. 對另一個正常RDD的每條數據擴容n倍,擴容出的每條數據依次打上0到n的前綴;
  4. 對處理後的兩個RDD進行join。

原理:與方案六隻有惟一不一樣在於這裏對不傾斜RDD中全部數據進行擴大n倍,而不是找出傾斜key進行擴容(這是方案六);
優勢:對join類的數據傾斜均可處理,效果很是顯著;
缺點:緩解,擴容須要大內存。
【我的認爲,這裏和方案六同樣,也須要對擴容的key對應的value最後減去(n-1),除非只需大小關係,對值沒有要求】

方案八:多種方案組合

實際中,需綜合着對業務全盤考慮,可先用方案一和二進行預處理,同時在須要Shuffle的操做提高Shuffle的並行度,最後針對數據分佈選擇後面方案中的一種或多種。實際中須要對數據和方案思路理解靈活應用。

相關文章
相關標籤/搜索