Spark數據傾斜案例測試及調優準則深刻剖析-Spark商業調優實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java

前言

本文重點介紹最具技術含量的數據傾斜處理算法,以下方法僅供參考。算法

  • 方案一:使用Hive ETL預處理
  • 方案二:過濾致使傾斜的key
  • 方案三:提升Shuffle操做並行度
  • 方案四:兩階段聚合(局部聚合+全局聚合)
  • 方案五:將reduce join轉爲map join
  • 方案六:採樣傾斜key並分拆join操做
  • 方案七:用隨機前綴和擴容RDD進行join
  • 方案九:自定義Partitioner
  • 方案八:多種方案組合

1 數據集

20111230000005	57375476989eea12893c0c3811607bcf	奇藝高清	1	1	http://www.qiyi.com/
20111230000005	66c5bb7774e31d0a22278249b26bc83a	凡人修仙傳	3	1	http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
20111230000007	b97920521c78de70ac38e3713f524b50	本本聯盟	1	1	http://www.bblianmeng.com/
20111230000008	6961d0c97fe93701fc9c0d861d096cd9	華南師範大學圖書館	1	1	http://lib.scnu.edu.cn/
複製代碼

2 scala前置知識

詳細請參考這個博客,很是好:https://blog.csdn.net/zyp13781913772/article/details/81428862
複製代碼

3 數據集預處理

val sourceRdd = sc.textFile("hdfs://bd-master:9000/opendir/source.txt")
sourceRdd.zipWithIndex.take(1)

 Array(
 (20111230000005	57375476989eea12893c0c3811607bcf	奇藝高清	1	1	http://www.qiyi.com/, 0),
 (20111230000005	66c5bb7774e31d0a22278249b26bc83a	凡人修仙傳	3	1	http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1, 1)
 )

  ++  => ++= 數組追加
  +=  => +=: 在數組前面追加元素

val sourceWithIndexRdd = sourceRdd.zipWithIndex.map(tuple => 
{val array = scala.collection.mutable.ArrayBuffer[String](); 
array++=(tuple._1.split("\t")); 
tuple._2.toString +=: array; 
array.toArray})

Array(
Array(0, 20111230000005, 57375476989eea12893c0c3811607bcf, 奇藝高清, 1, 1, http://www.qiyi.com/),
Array(1, 20111230000005, 66c5bb7774e31d0a22278249b26bc83a, 凡人修仙傳, 3, 1, http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1)
)

sourceWithIndexRdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/source_index")
複製代碼

4 MapSide Join 性能測試

  • 方案適用場景:在對RDD使用join類操做,或者是在Spark SQL中使用join語句時,並且join操做中的一個RDD或表的數據量比較小(好比幾百M),比較適用此方案。
  • 方案實現原理:普通的join是會走shuffle過程的,而一旦shuffle,就至關於會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。可是若是一個RDD是比較小的,則能夠採用廣播小RDD全量數據+map算子來實現與join一樣的效果,也就是map join,此時就不會發生shuffle操做,也就不會發生數據傾斜。
  • 方案優勢:對join操做致使的數據傾斜,效果很是好,由於根本就不會發生shuffle,也就根本不會發生數據傾斜。
  • 方案缺點:適用場景較少,由於這個方案只適用於一個大表和一個小表的狀況。

4.1 數據準備

source_index:sql

Array[String] = Array(
    0	20111230000005	57375476989eea12893c0c3811607bcf	奇藝高清	1	1	http://www.qiyi.com/, 
    1	20111230000005	66c5bb7774e31d0a22278249b26bc83a	凡人修仙傳	3	1	http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
    )
複製代碼

數據模擬:apache

val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*")
    val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(1).trim()) })
    (Int, String) = (479936,20111230000005)

    //100萬條數據集
    val kvRdd2 = kvRdd.map(x=>{if(x._1 < 900001) (900001,x._2) else x})
    kvRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/big_data/")
    
    //1萬條數據集
    val joinRdd2 = kvRdd.filter(_._1 > 900000)
    joinRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/small_data/")
複製代碼

4.2 直接join出現數據傾斜

map reduce:
val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*")
val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*")
val joinRdd = sourceRdd.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
複製代碼

4.3 基於MapSide join解決數據傾斜

mapSide:
val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*")
val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*")
//100萬條數據集
val joinRdd = sourceRdd.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
//1萬條數據集
val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
val broadcastVar = sc.broadcast(joinRdd2.collectAsMap)
joinRdd.map(x => (x._1,(x._2,broadcastVar.value.getOrElse(x._1,"")))).count
複製代碼

5 並行度提高測試:

  • 實現原理:增長shuffle read task的數量,可讓本來分配給一個task的多個key分配給多個task,從而讓每一個task處理比原來更少的數據。
  • 方案優勢:實現起來比較簡單,能夠有效緩解和減輕數據傾斜的影響。
  • 方案缺點:只是緩解了數據傾斜而已,沒有完全根除問題,根據實踐經驗來看,其效果有限。
  • 實踐經驗:該方案一般沒法完全解決數據傾斜,由於若是出現一些極端狀況,好比某個key對應的數據量有100萬,那麼不管你的task數量增長到多少,都沒法處理。

5.1 數據準備

數據模擬--90萬如下的id統一改成8的倍數,所以以並行度爲12的計算,數據傾斜在taskid=8的任務上:api

val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index")
case class brower(id:Int, time:Long, uid:String, keyword:String, url_rank:Int, click_num:Int, click_url:String) extends Serializable

val ds = sourceRdd.map(_.split("\t")).map(attr => brower(attr(0).toInt, attr(1).toLong, attr(2), attr(3), attr(4).toInt, attr(5).toInt, attr(6))).toDS

ds.createOrReplaceTempView("sourceTable")

val newSource = spark.sql("SELECT CASE WHEN id < 900000 THEN (8  + (CAST (RAND() * 50000 AS bigint)) * 12 ) ELSE id END, time, uid, keyword, url_rank, click_num, click_url FROM sourceTable")
newSource.rdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/test_data")
複製代碼

5.2 數據傾斜:

val sourceRdd = sc.textFile("hdfs://bd-master:9000/test_data/p*")
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(1).trim()) })
kvRdd.groupByKey(12).count
複製代碼

5.3 基於並行度提高解決數據傾斜

kvRdd.groupByKey(17).count
複製代碼

6 Spark隨機前綴提高測試

6.1 數據準備

val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*",13)
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(4).trim().toInt) })
數據傾斜的key爲20001,總共980000個,所以能夠經過隨機id達到均勻id:
val kvRdd2 = kvRdd.map(x=>{if(x._1 > 20000) (20001,x._2) else x})
複製代碼

6.2 數據傾斜

kvRdd2.groupByKey().collect
複製代碼

6.3 解決數據傾斜

val kvRdd3 = kvRdd2.map(x=>{if (x._1 ==20001) (x._1 + scala.util.Random.nextInt(100),x._2) else x})
kvRdd3.sortByKey(false).collect
複製代碼

7 兩階段聚合(局部聚合+全局聚合)測試

7.1 兩階段聚合(局部聚合+全局聚合)理論知識

  • 方案適用場景:對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
  • 方案實現原理:將本來相同的key經過附加隨機前綴的方式,變成多個不一樣的key,就可讓本來被一個task處理的數據分散到多個task上去作局部聚合,進而解決單個task處理數據量過多的問題。接着去除掉隨機前綴,再次進行全局聚合,就能夠獲得最終的結果。具體原理見下圖。
  • 方案優勢:對於聚合類的shuffle操做致使的數據傾斜,效果是很是不錯的。一般均可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark做業的性能提高數倍以上。
  • 方案缺點:僅僅適用於聚合類的shuffle操做,適用範圍相對較窄。若是是join類的shuffle操做,還得用其餘的解決方案

7.2 兩階段聚合(局部聚合+全局聚合)代碼實現:

package skewTuring;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    import java.util.Random;
    
    /**
     * 方案適用場景:RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
     * 兩階段聚合(局部聚合+全局聚合)
     */

    public class SkewTuring11 {
        public static void main(String[] args) throws Exception{
            // 構建Spark上下文
            SparkConf conf = new SparkConf().setAppName("SkewTuring11");
            conf.setMaster("local[8]");
            JavaSparkContext sc = new JavaSparkContext(conf);
            //0	20111230000005	57375476989eea12893c0c3811607bcf	奇藝高清	1	1	http://www.qiyi.com/
            JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
    
            // 第一步,給RDD中的每一個key都打上一個隨機前綴。
            JavaPairRDD<String, Long> randomPrefixRdd = sourceRdd.mapToPair(new PairFunction<String,String,Long>() {
                @Override
                public Tuple2<String, Long> call(String s) throws Exception {
                    String[] splits = s.split("\t");
                    Random random = new Random();
                    int prefix = random.nextInt(10);
                    Long key = Long.valueOf(splits[0]);
                    if(key > 10000) {
                        return new Tuple2<String, Long>(prefix + "_" + 10001L, 1L);
                    } else {
                        return new Tuple2<String, Long>(prefix + "_" + key,1L);
                    }
                }
            });
    
            // 第二步,對打上隨機前綴的key進行局部聚合。
            JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(new Function2<Long, Long, Long>() {
                @Override
                public Long call(Long v1, Long v2) throws Exception {
                    return v1 + v2;
                }
            });
            // 第三步,去除RDD中每一個key的隨機前綴。
            JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
                    new PairFunction<Tuple2<String,Long>, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
                                throws Exception {
                            long originalKey = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Long>(originalKey, tuple._2);
                        }
                    });
            // 第四步,對去除了隨機前綴的RDD進行全局聚合。
            JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
                    new Function2<Long, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Long call(Long v1, Long v2) throws Exception {
                            return v1 + v2;
                        }
            });
    System.out.println("*********************************************");
    System.out.println(globalAggrRdd.first());
 }
複製代碼

}數組

8 採樣傾斜key並分拆join操做測試

8.1 採樣傾斜key並分拆join操做理論基礎

  • 採樣
  • Join一側的數據中,爲數據量特別大的Key增長隨機前/後綴,使得原來Key相同的數據變爲Key不相同的數據,從而使傾斜的數據集分散到不一樣的Task中,完全解決數據傾斜問題。
  • Join另外一側的數據中,與傾斜Key對應的部分數據(進行擴容N倍),與隨機前綴集做笛卡爾乘積,從而保證不管數據傾斜側傾斜Key如何加前綴,都能與之正常Join。

8.2 採樣傾斜key並分拆join操做代碼實現

package skewTuring;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFlatMapFunction;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import java.util.Random;
    
    /**
     * 方案適用場景:若是出現數據傾斜,是由於其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另外一個RDD/Hive表中的全部key都分佈比較均勻
     * 採樣傾斜key並分拆join操做
     */
    public class SkewTuring22 {
        public static void main(String[] args) throws Exception{
            // 構建Spark上下文
            SparkConf conf = new SparkConf().setAppName("SkewTuring11");
            conf.setMaster("local[8]");
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            //主源數據--少許傾斜key
            //0	20111230000005	57375476989eea12893c0c3811607bcf	奇藝高清	1	1	http://www.qiyi.com/
            JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
            JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
                @Override
                public Tuple2<Long,String> call(String s) throws Exception {
                    String[] splits = s.split("\t");
                    Long key = Long.valueOf(splits[0]);
                    String value = splits[6];
                    if(key > 10000) {
                        return new Tuple2<Long,String>(10001L, value);
                    } else {
                        return new Tuple2<Long,String>(key, value);
                    }
                }
            });
    
            //副源數據 -均勻key
            JavaPairRDD<Long,String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
                @Override
                public Tuple2<Long,String> call(String s) throws Exception {
                    String[] splits = s.split("\t");
                    Long key = Long.valueOf(splits[0]);
                    String value = splits[6];
                    return new Tuple2<Long,String>(key, value);
                }
            });
    
            //首先從包含了少數幾個致使數據傾斜key的randomPrefixRdd中,採樣10%的樣本數據。
            JavaPairRDD<Long,String> sampledRDD = mapdSourceRdd.sample(false, 0.1);
    
            System.out.println(" 隨機採樣:"+sampledRDD.first());
    
            // 對樣本數據RDD統計出每一個key的出現次數,並按出現次數降序排序。
            // 對降序排序後的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。
            // 具體取出多少個數據量最多的key,由你們本身決定,咱們這裏就取1個做爲示範。
            JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
                    new PairFunction<Tuple2<Long,String>, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
                                throws Exception {
                            return new Tuple2<Long, Long>(tuple._1, 1L);
                        }
                    });
    
            JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
                    new Function2<Long, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Long call(Long v1, Long v2) throws Exception {
                            return v1 + v2;
                        }
                    });
    
            JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
                    new PairFunction<Tuple2<Long,Long>, Long, Long>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
                                throws Exception {
                            return new Tuple2<Long, Long>(tuple._2, tuple._1);
                        }
                    });
    
            final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
    
            System.out.println("數據傾斜id"+skewedUserid);
    
    
            /**
             * 主源數據  過濾傾斜key 造成獨立的RDD
             */
            JavaPairRDD<Long, String> skewedRDD = mapdSourceRdd.filter(
                    new Function<Tuple2<Long,String>, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                            return tuple._1.equals(skewedUserid);
                        }
                    });
            System.out.println("主源數據 傾斜數據  rdd:"+ skewedRDD.take(100));
    
            // 從mapdSourceRdd中分拆出不致使數據傾斜的普通key,造成獨立的RDD。
            JavaPairRDD<Long, String> commonRDD = mapdSourceRdd.filter(
                    new Function<Tuple2<Long,String>, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                            return !tuple._1.equals(skewedUserid);
                        }
                    });
            System.out.println("主源數據 常規數據  rdd:"+ commonRDD.take(100));
    
            /**
             * sourceRdd2  副源數據  過濾傾斜數據  隨機擴容N倍
             */
            // rdd2,就是那個全部key的分佈相對較爲均勻的rdd。
            // 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
            // 對擴容的每條數據,都打上0~100的前綴。
            JavaPairRDD<String, String> skewedRandomRDD2 = mapdSourceRdd2.filter(
                    new Function<Tuple2<Long,String>, Boolean>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Boolean call(Tuple2<Long, String> tuple) throws Exception {
                            return tuple._1.equals(skewedUserid);
                        }
                    }).flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
                        @Override
                        public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
                            List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
                            for (int i = 0; i < 10; i++) {
                                list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
                            }
                            return list.iterator();
                        }
            });
    
            System.out.println("副源數據 擴容表處理:" + skewedRandomRDD2.take(100));
    
            /**
             * 主源傾斜數據  key+隨機數
             */
             // 將rdd1中分拆出來的致使傾斜的key的獨立rdd,每條數據都打上100之內的隨機前綴。
            // 而後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
            final JavaPairRDD<String, String> skewedRandomRDD = skewedRDD.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
                private static final long serialVersionUID = 1L;
                @Override
                public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                        throws Exception {
                    Random random = new Random();
                    int prefix = random.nextInt(10);
                    return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
                }
            });
    
            System.out.println("主源數據 隨機數處理:" + skewedRandomRDD.take(100));
    
            JavaPairRDD<Long, Tuple2<String, String>> joinedRDD1 = skewedRandomRDD
                    .join(skewedRandomRDD2)
                    .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,String>>, Long, Tuple2<String, String>>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public Tuple2<Long, Tuple2<String, String>> call(Tuple2<String, Tuple2<String, String>> tuple) throws Exception {
                            long key = Long.valueOf(tuple._1.split("_")[1]);
                            return new Tuple2<Long, Tuple2<String, String>>(key, tuple._2);
                        }
                    });
            System.out.println("主 副源數據 傾斜數據 join 處理:" + joinedRDD1.take(100));
    
            // 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。
            JavaPairRDD<Long, Tuple2<String, String>> joinedRDD2 = commonRDD.join(mapdSourceRdd2);
    
            System.out.println("主 副源數據 常規數據 join 處理:" + joinedRDD2.take(100));
            // 將傾斜key join後的結果與普通key join後的結果,uinon起來。
            // 就是最終的join結果。
            JavaPairRDD<Long, Tuple2<String, String>> resultRDD = joinedRDD1.union(joinedRDD2);
    
            System.out.println("最終join結果:"+ resultRDD.sample(false, 0.1).take(100));
        }
    }
複製代碼

9 使用隨機前綴和擴容RDD進行join(大表隨機添加N種隨機前綴,小表擴大N倍)

  • Join一側:若是出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集所有加上隨機前綴app

  • Join另一側:對另一個不存在嚴重數據傾斜的數據集總體與隨機前綴集做笛卡爾乘積(即將數據量擴大N倍)。dom

    package skewTuring;
     
     import org.apache.spark.SparkConf;
     import org.apache.spark.api.java.JavaPairRDD;
     import org.apache.spark.api.java.JavaRDD;
     import org.apache.spark.api.java.JavaSparkContext;
     import org.apache.spark.api.java.function.Function;
     import org.apache.spark.api.java.function.Function2;
     import org.apache.spark.api.java.function.PairFlatMapFunction;
     import org.apache.spark.api.java.function.PairFunction;
     import scala.Tuple2;
     
     import java.util.ArrayList;
     import java.util.Iterator;
     import java.util.List;
     import java.util.Random;
     
     /**
      * 方案適用場景:若是在進行join操做時,RDD中有大量的key致使數據傾斜
      * 使用隨機前綴和擴容RDD進行join
      */
     
     public class SkewTuring33 {
     
         public static void main(String[] args) throws Exception{
             // 構建Spark上下文
             SparkConf conf = new SparkConf().setAppName("SkewTuring11");
             conf.setMaster("local[8]");
             JavaSparkContext sc = new JavaSparkContext(conf);
     
             //主源數據1--存在大量傾斜key
             JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
     
             //主源數據--大量傾斜key
             //0	20111230000005	57375476989eea12893c0c3811607bcf	奇藝高清	1	1	http://www.qiyi.com/
             JavaRDD<String> sourceRdd1 = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
             JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
                 @Override
                 public Tuple2<Long,String> call(String s) throws Exception {
                     String[] splits = s.split("\t");
                     Long key = Long.valueOf(splits[0]);
                     String value = splits[6];
                     if(key > 10000) {
                         return new Tuple2<Long,String>(10001L, value);
                     } else {
                         return new Tuple2<Long,String>(key, value);
                     }
                 }
             });
     
             //副源數據2--均勻key
             JavaPairRDD<Long, String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
                 @Override
                 public Tuple2<Long,String> call(String s) throws Exception {
                     String[] splits = s.split("\t");
                     Long key = Long.valueOf(splits[0]);
                     String value = splits[6];
                     return new Tuple2<Long,String>(key, value);
                 }
             });
     
             /**
              * 主源傾斜數據  key+隨機數
              */
             // 將rdd1中分拆出來的致使傾斜的key的獨立rdd,每條數據都打上100之內的隨機前綴。
             // 而後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
             final JavaPairRDD<String, String> skewedRandomRDD = mapdSourceRdd.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
                 private static final long serialVersionUID = 1L;
                 @Override
                 public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception {
                     Random random = new Random();
                     int prefix = random.nextInt(100);
                     return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
                 }
             });
             System.out.println("主源數據 傾斜數據  rdd:"+ skewedRandomRDD.take(100));
     
             /**
              * sourceRdd2  均勻key 擴容N倍
              */
             // rdd2,就是那個全部key的分佈相對較爲均勻的rdd。
             // 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
             // 對擴容的每條數據,都打上0~100的前綴。
             JavaPairRDD<String, String> expandedRDD = mapdSourceRdd2.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
                 @Override
                 public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
                     List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
                     for (int i = 0; i < 100; i++) {
                         list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
                     }
                     return list.iterator();
                 }
             });
     
             System.out.println("副源數據 擴容表處理 :" + expandedRDD.take(100));
             // 將兩個處理後的RDD進行join便可。
             JavaPairRDD<String, Tuple2<String, String>> joinedRDD = skewedRandomRDD.join(expandedRDD);
             System.out.println("最終join結果:"+ joinedRDD.take(100));
         }
     }
    複製代碼

10 自定義Partitioner

  • 適用場景: 大量不一樣的Key被分配到了相同的Task形成該Task數據量過大。
  • 解決方案: 使用自定義的Partitioner實現類代替默認的HashPartitioner,儘可能將全部不一樣的Key均勻分配到不一樣的Task中。
  • 優點: 不影響原有的並行度設計。若是改變並行度,後續Stage的並行度也會默認改變,可能會影響後續Stage。
  • 劣勢: 適用場景有限,只能將不一樣Key分散開,對於同一Key對應數據集很是大的場景不適用。效果與調整並行度相似,只能緩解數據傾斜而不能徹底消除數據傾斜。並且須要根據數據特色自定義專用的Partitioner,不夠靈活。

11 總結

本文主要從數據傾斜的角度進行了分析,經過實際的案例測試進行了總結和昇華,一片成文的博客實屬不易,但願各自珍惜!!ide

秦凱新 於深圳post

相關文章
相關標籤/搜索