原創文章,同步首發自做者我的博客轉載請務必在文章開頭處註明出處。java
本文結合實例詳細闡明瞭Spark數據傾斜的幾種場景以及對應的解決方案,包括避免數據源傾斜,調整並行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機前綴等。sql
對Spark/Hadoop這樣的大數據系統來說,數據量大並不可怕,可怕的是數據傾斜。緩存
何謂數據傾斜?數據傾斜指的是,並行處理的數據集中,某一部分(如Spark或Kafka的一個Partition)的數據顯著多於其它部分,從而使得該部分的處理速度成爲整個數據集處理的瓶頸。bash
在Spark中,同一個Stage的不一樣Partition能夠並行處理,而具備依賴關係的不一樣Stage之間是串行處理的。假設某個Spark Job分爲Stage 0和Stage 1兩個Stage,且Stage 1依賴於Stage 0,那Stage 0徹底處理結束以前不會處理Stage 1。而Stage 0可能包含N個Task,這N個Task能夠並行進行。若是其中N-1個Task都在10秒內完成,而另一個Task卻耗時1分鐘,那該Stage的總時間至少爲1分鐘。換句話說,一個Stage所耗費的時間,主要由最慢的那個Task決定。架構
因爲同一個Stage內的全部Task執行相同的計算,在排除不一樣計算節點計算能力差別的前提下,不一樣Task之間耗時的差別主要由該Task所處理的數據量決定。併發
Stage的數據來源主要分爲以下兩類app
以Spark Stream經過DirectStream方式讀取Kafka數據爲例。因爲Kafka的每個Partition對應Spark的一個Task(Partition),因此Kafka內相關Topic的各Partition之間數據是否平衡,直接決定Spark處理該數據時是否會產生數據傾斜。dom
如《Kafka設計解析(一)- Kafka背景及架構介紹》一文所述,Kafka某一Topic內消息在不一樣Partition之間的分佈,主要由Producer端所使用的Partition實現類決定。若是使用隨機Partitioner,則每條消息會隨機發送到一個Partition中,從而從機率上來說,各Partition間的數據會達到平衡。此時源Stage(直接讀取Kafka數據的Stage)不會產生數據傾斜。ide
但不少時候,業務場景可能會要求將具有同一特徵的數據順序消費,此時就須要將具備相同特徵的數據放於同一個Partition中。一個典型的場景是,須要將同一個用戶相關的PV信息置於同一個Partition中。此時,若是產生了數據傾斜,則須要經過其它方式處理。oop
Spark在作Shuffle時,默認使用HashPartitioner(非Hash Shuffle)對數據進行分區。若是並行度設置的不合適,可能形成大量不相同的Key對應的數據被分配到了同一個Task上,形成該Task所處理的數據遠大於其它Task,從而形成數據傾斜。
若是調整Shuffle時的並行度,使得本來被分配到同一Task的不一樣Key發配到不一樣Task上處理,則可下降原Task所需處理的數據量,從而緩解數據傾斜問題形成的短板效應。
現有一張測試表,名爲student_external,內有10.5億條數據,每條數據有一個惟一的id值。現從中取出id取值爲9億到10.5億的共1.5條數據,並經過一些處理,使得id爲9億到9.4億間的全部數據對12取模後餘數爲8(即在Shuffle並行度爲12時該數據集所有被HashPartition分配到第8個Task),其它數據集對其id除以100取整,從而使得id大於9.4億的數據在Shuffle時可被均勻分配到全部Task中,而id小於9.4億的數據所有分配到同一個Task中。處理過程以下
INSERT OVERWRITE TABLE test SELECT CASE WHEN id < 940000000 THEN (9500000 + (CAST (RAND() * 8 AS INTEGER)) * 12 ) ELSE CAST(id/100 AS INTEGER) END, name FROM student_external WHERE id BETWEEN 900000000 AND 1050000000;
經過上述處理,一份可能形成後續數據傾斜的測試數據即以準備好。接下來,使用Spark讀取該測試數據,並經過groupByKey(12)
對id分組處理,且Shuffle並行度爲12。代碼以下
public class SparkDataSkew { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder() .appName("SparkDataSkewTunning") .config("hive.metastore.uris", "thrift://hadoop1:9083") .enableHiveSupport() .getOrCreate(); Dataset<Row> dataframe = sparkSession.sql( "select * from test"); dataframe.toJavaRDD() .mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1))) .groupByKey(12) .mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> { int id = tuple._1(); AtomicInteger atomicInteger = new AtomicInteger(0); tuple._2().forEach((String name) -> atomicInteger.incrementAndGet()); return new Tuple2<Integer, Integer>(id, atomicInteger.get()); }).count(); sparkSession.stop(); sparkSession.close(); } }
本次實驗所使用集羣節點數爲4,每一個節點可被Yarn使用的CPU核數爲16,內存爲16GB。使用以下方式提交上述應用,將啓動4個Executor,每一個Executor可以使用核數爲12(該配置並不是生產環境下的最優配置,僅用於本文實驗),可用內存爲12GB。
spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar
GroupBy Stage的Task狀態以下圖所示,Task 8處理的記錄數爲4500萬,遠大於(9倍於)其它11個Task處理的500萬記錄。而Task 8所耗費的時間爲38秒,遠高於其它11個Task的平均時間(16秒)。整個Stage的時間也爲38秒,該時間主要由最慢的Task 8決定。
在這種狀況下,能夠經過調整Shuffle並行度,使得原來被分配到同一個Task(即該例中的Task 8)的不一樣Key分配到不一樣Task,從而下降Task 8所需處理的數據量,緩解數據傾斜。
經過groupByKey(48)
將Shuffle並行度調整爲48,從新提交到Spark。新的Job的GroupBy Stage全部Task狀態以下圖所示。
從上圖可知,記錄數最多的Task 20處理的記錄數約爲1125萬,相比於並行度爲12時Task 8的4500萬,下降了75%左右,而其耗時從原來Task 8的38秒降到了24秒。
在這種場景下,調整並行度,並不意味着必定要增長並行度,也多是減少並行度。若是經過groupByKey(11)
將Shuffle並行度調整爲11,從新提交到Spark。新Job的GroupBy Stage的全部Task狀態以下圖所示。
從上圖可見,處理記錄數最多的Task 6所處理的記錄數約爲1045萬,耗時爲23秒。處理記錄數最少的Task 1處理的記錄數約爲545萬,耗時12秒。
適用場景
大量不一樣的Key被分配到了相同的Task形成該Task數據量過大。
解決方案
調整並行度。通常是增大並行度,但有時如本例減少並行度也可達到效果。
優點
實現簡單,可在須要Shuffle的操做算子上直接設置並行度或者使用spark.default.parallelism
設置。若是是Spark SQL,還可經過SET spark.sql.shuffle.partitions=[num_tasks]
設置並行度。可用最小的代價解決問題。通常若是出現數據傾斜,均可以經過這種方法先試驗幾回,若是問題未解決,再嘗試其它方法。
劣勢
適用場景少,只能將分配到同一Task的不一樣Key分散開,但對於同一Key傾斜嚴重的狀況該方法並不適用。而且該方法通常只能緩解數據傾斜,沒有完全消除問題。從實踐經驗來看,其效果通常。
使用自定義的Partitioner(默認爲HashPartitioner),將本來被分配到同一個Task的不一樣Key分配到不一樣Task。
以上述數據集爲例,繼續將併發度設置爲12,可是在groupByKey
算子上,使用自定義的Partitioner
(實現以下)
.groupByKey(new Partitioner() { @Override public int numPartitions() { return 12; } @Override public int getPartition(Object key) { int id = Integer.parseInt(key.toString()); if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) { return (id - 9500000) / 12; } else { return id % 12; } } })
由下圖可見,使用自定義Partition後,耗時最長的Task 6處理約1000萬條數據,用時15秒。而且各Task所處理的數據集大小至關。
適用場景
大量不一樣的Key被分配到了相同的Task形成該Task數據量過大。
解決方案
使用自定義的Partitioner實現類代替默認的HashPartitioner,儘可能將全部不一樣的Key均勻分配到不一樣的Task中。
優點
不影響原有的並行度設計。若是改變並行度,後續Stage的並行度也會默認改變,可能會影響後續Stage。
劣勢
適用場景有限,只能將不一樣Key分散開,對於同一Key對應數據集很是大的場景不適用。效果與調整並行度相似,只能緩解數據傾斜而不能徹底消除數據傾斜。並且須要根據數據特色自定義專用的Partitioner,不夠靈活。
經過Spark的Broadcast機制,將Reduce側Join轉化爲Map側Join,避免Shuffle從而徹底消除Shuffle帶來的數據傾斜。
經過以下SQL建立一張具備傾斜Key且總記錄數爲1.5億的大表test。
INSERT OVERWRITE TABLE test SELECT CAST(CASE WHEN id < 980000000 THEN (95000000 + (CAST (RAND() * 4 AS INT) + 1) * 48 ) ELSE CAST(id/10 AS INT) END AS STRING), name FROM student_external WHERE id BETWEEN 900000000 AND 1050000000;
使用以下SQL建立一張數據分佈均勻且總記錄數爲50萬的小表test_new。
INSERT OVERWRITE TABLE test_new SELECT CAST(CAST(id/10 AS INT) AS STRING), name FROM student_delta_external WHERE id BETWEEN 950000000 AND 950500000;
直接經過Spark Thrift Server提交以下SQL將表test與表test_new進行Join並將Join結果存於表test_join中。
INSERT OVERWRITE TABLE test_join SELECT test_new.id, test_new.name FROM test JOIN test_new ON test.id = test_new.id;
該SQL對應的DAG以下圖所示。從該圖可見,該執行過程總共分爲三個Stage,前兩個用於從Hive中讀取數據,同時兩者進行Shuffle,經過最後一個Stage進行Join並將結果寫入表test_join中。
從下圖可見,最近Join Stage各Task處理的數據傾斜嚴重,處理數據量最大的Task耗時7.1分鐘,遠高於其它無數據傾斜的Task約2s秒的耗時。
接下來,嘗試經過Broadcast實現Map側Join。實現Map側Join的方法,並不是直接經過CACHE TABLE test_new
將小表test_new進行cache。現經過以下SQL進行Join。
CACHE TABLE test_new; INSERT OVERWRITE TABLE test_join SELECT test_new.id, test_new.name FROM test JOIN test_new ON test.id = test_new.id;
經過以下DAG圖可見,該操做仍分爲三個Stage,且仍然有Shuffle存在,惟一不一樣的是,小表的讀取再也不直接掃描Hive表,而是掃描內存中緩存的表。
而且數據傾斜仍然存在。以下圖所示,最慢的Task耗時爲7.1分鐘,遠高於其它Task的約2秒。
正確的使用Broadcast實現Map側Join的方式是,經過SET spark.sql.autoBroadcastJoinThreshold=104857600;
將Broadcast的閾值設置得足夠大。
再次經過以下SQL進行Join。
SET spark.sql.autoBroadcastJoinThreshold=104857600; INSERT OVERWRITE TABLE test_join SELECT test_new.id, test_new.name FROM test JOIN test_new ON test.id = test_new.id;
經過以下DAG圖可見,該方案只包含一個Stage。
而且從下圖可見,各Task耗時至關,無明顯數據傾斜現象。而且總耗時爲1.5分鐘,遠低於Reduce側Join的7.3分鐘。
適用場景
參與Join的一邊數據集足夠小,可被加載進Driver並經過Broadcast方法廣播到各個Executor中。
解決方案
在Java/Scala代碼中將小數據集數據拉取到Driver,而後經過broadcast方案將小數據集的數據廣播到各Executor。或者在使用SQL前,將broadcast的閾值調整得足夠多,從而使用broadcast生效。進而將Reduce側Join替換爲Map側Join。
優點
避免了Shuffle,完全消除了數據傾斜產生的條件,可極大提高性能。
劣勢
要求參與Join的一側數據集足夠小,而且主要適用於Join的場景,不適合聚合的場景,適用條件有限。
爲數據量特別大的Key增長隨機前/後綴,使得原來Key相同的數據變爲Key不相同的數據,從而使傾斜的數據集分散到不一樣的Task中,完全解決數據傾斜問題。Join另外一則的數據中,與傾斜Key對應的部分數據,與隨機前綴集做笛卡爾乘積,從而保證不管數據傾斜側傾斜Key如何加前綴,都能與之正常Join。
經過以下SQL,將id爲9億到9.08億共800萬條數據的id轉爲9500048或者9500096,其它數據的id除以100取整。從而該數據集中,id爲9500048和9500096的數據各400萬,其它id對應的數據記錄數均爲100條。這些數據存於名爲test的表中。
對於另一張小表test_new,取出50萬條數據,並將id(遞增且惟一)除以100取整,使得全部id都對應100條數據。
INSERT OVERWRITE TABLE test SELECT CAST(CASE WHEN id < 908000000 THEN (9500000 + (CAST (RAND() * 2 AS INT) + 1) * 48 ) ELSE CAST(id/100 AS INT) END AS STRING), name FROM student_external WHERE id BETWEEN 900000000 AND 1050000000; INSERT OVERWRITE TABLE test_new SELECT CAST(CAST(id/100 AS INT) AS STRING), name FROM student_delta_external WHERE id BETWEEN 950000000 AND 950500000;
經過以下代碼,讀取test表對應的文件夾內的數據並轉換爲JavaPairRDD存於leftRDD中,一樣讀取test表對應的數據存於rightRDD中。經過RDD的join算子對leftRDD與rightRDD進行Join,並指定並行度爲48。
public class SparkDataSkew{ public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("DemoSparkDataFrameWithSkewedBigTableDirect"); sparkConf.set("spark.default.parallelism", parallelism + ""); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); leftRDD.join(rightRDD, parallelism) .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2())) .foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { AtomicInteger atomicInteger = new AtomicInteger(); iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); }); javaSparkContext.stop(); javaSparkContext.close(); } }
從下圖可看出,整個Join耗時1分54秒,其中Join Stage耗時1.7分鐘。
經過分析Join Stage的全部Task可知,在其它Task所處理記錄數爲192.71萬的同時Task 32的處理的記錄數爲992.72萬,故它耗時爲1.7分鐘,遠高於其它Task的約10秒。這與上文準備數據集時,將id爲9500048爲9500096對應的數據量設置很是大,其它id對應的數據集很是均勻相符合。
現經過以下操做,實現傾斜Key的分散處理
具體實現代碼以下
public class SparkDataSkew{ public static void main(String[] args) { int parallelism = 48; SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("SolveDataSkewWithRandomPrefix"); sparkConf.set("spark.default.parallelism", parallelism + ""); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); String[] skewedKeyArray = new String[]{"9500048", "9500096"}; Set<String> skewedKeySet = new HashSet<String>(); List<String> addList = new ArrayList<String>(); for(int i = 1; i <=24; i++) { addList.add(i + ""); } for(String key : skewedKeyArray) { skewedKeySet.add(key); } Broadcast<Set<String>> skewedKeys = javaSparkContext.broadcast(skewedKeySet); Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList); JavaPairRDD<String, String> leftSkewRDD = leftRDD .filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())) .mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>((new Random().nextInt(24) + 1) + "," + tuple._1(), tuple._2())); JavaPairRDD<String, String> rightSkewRDD = rightRDD.filter((Tuple2<String, String> tuple) -> skewedKeys.value().contains(tuple._1())) .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream() .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())) .collect(Collectors.toList()) .iterator() ); JavaPairRDD<String, String> skewedJoinRDD = leftSkewRDD .join(rightSkewRDD, parallelism) .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2())); JavaPairRDD<String, String> leftUnSkewRDD = leftRDD.filter((Tuple2<String, String> tuple) -> !skewedKeys.value().contains(tuple._1())); JavaPairRDD<String, String> unskewedJoinRDD = leftUnSkewRDD.join(rightRDD, parallelism).mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1(), tuple._2()._2())); skewedJoinRDD.union(unskewedJoinRDD).foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { AtomicInteger atomicInteger = new AtomicInteger(); iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); }); javaSparkContext.stop(); javaSparkContext.close(); } }
從下圖可看出,整個Join耗時58秒,其中Join Stage耗時33秒。
經過分析Join Stage的全部Task可知
實際上,因爲傾斜Key與非傾斜Key的操做徹底獨立,可並行進行。而本實驗受限於可用總核數爲48,可同時運行的總Task數爲48,故而該方案只是將總耗時減小一半(效率提高一倍)。若是資源充足,可併發執行Task數增多,該方案的優點將更爲明顯。在實際項目中,該方案每每可提高數倍至10倍的效率。
適用場景
兩張表都比較大,沒法使用Map則Join。其中一個RDD有少數幾個Key的數據量過大,另一個RDD的Key分佈較爲均勻。
解決方案
將有數據傾斜的RDD中傾斜Key對應的數據集單獨抽取出來加上隨機前綴,另一個RDD每條數據分別與隨機前綴結合造成新的RDD(至關於將其數據增到到原來的N倍,N即爲隨機前綴的總個數),而後將兩者Join並去掉前綴。而後將不包含傾斜Key的剩餘數據進行Join。最後將兩次Join的結果集經過union合併,便可獲得所有Join結果。
優點
相對於Map則Join,更能適應大數據集的Join。若是資源充足,傾斜部分數據集與非傾斜部分數據集可並行進行,效率提高明顯。且只針對傾斜部分的數據作數據擴展,增長的資源消耗有限。
劣勢
若是傾斜Key很是多,則另外一側數據膨脹很是大,此方案不適用。並且此時對傾斜Key與非傾斜Key分開處理,須要掃描數據集兩遍,增長了開銷。
若是出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集所有加上隨機前綴,而後對另一個不存在嚴重數據傾斜的數據集總體與隨機前綴集做笛卡爾乘積(即將數據量擴大N倍)。
這裏給出示例代碼,讀者可參考上文中分拆出少數傾斜Key添加隨機前綴的方法,自行測試。
public class SparkDataSkew { public static void main(String[] args) { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("ResolveDataSkewWithNAndRandom"); sparkConf.set("spark.default.parallelism", parallelism + ""); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaPairRDD<String, String> leftRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); JavaPairRDD<String, String> rightRDD = javaSparkContext.textFile("hdfs://hadoop1:8020/apps/hive/warehouse/default/test_new/") .mapToPair((String row) -> { String[] str = row.split(","); return new Tuple2<String, String>(str[0], str[1]); }); List<String> addList = new ArrayList<String>(); for(int i = 1; i <=48; i++) { addList.add(i + ""); } Broadcast<List<String>> addListKeys = javaSparkContext.broadcast(addList); JavaPairRDD<String, String> leftRandomRDD = leftRDD.mapToPair((Tuple2<String, String> tuple) -> new Tuple2<String, String>(new Random().nextInt(48) + "," + tuple._1(), tuple._2())); JavaPairRDD<String, String> rightNewRDD = rightRDD .flatMapToPair((Tuple2<String, String> tuple) -> addListKeys.value().stream() .map((String i) -> new Tuple2<String, String>( i + "," + tuple._1(), tuple._2())) .collect(Collectors.toList()) .iterator() ); JavaPairRDD<String, String> joinRDD = leftRandomRDD .join(rightNewRDD, parallelism) .mapToPair((Tuple2<String, Tuple2<String, String>> tuple) -> new Tuple2<String, String>(tuple._1().split(",")[1], tuple._2()._2())); joinRDD.foreachPartition((Iterator<Tuple2<String, String>> iterator) -> { AtomicInteger atomicInteger = new AtomicInteger(); iterator.forEachRemaining((Tuple2<String, String> tuple) -> atomicInteger.incrementAndGet()); }); javaSparkContext.stop(); javaSparkContext.close(); } }
適用場景
一個數據集存在的傾斜Key比較多,另一個數據集數據分佈比較均勻。
優點
對大部分場景都適用,效果不錯。
劣勢
須要將一個數據集總體擴大N倍,會增長資源消耗。
對於數據傾斜,並沒有一個統一的一勞永逸的方法。更多的時候,是結合數據特色(數據集大小,傾斜Key的多少等)綜合使用上文所述的多種方法。