本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java
本文重點介紹最具技術含量的數據傾斜處理算法,以下方法僅供參考。算法
20111230000005 57375476989eea12893c0c3811607bcf 奇藝高清 1 1 http://www.qiyi.com/
20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙傳 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
20111230000007 b97920521c78de70ac38e3713f524b50 本本聯盟 1 1 http://www.bblianmeng.com/
20111230000008 6961d0c97fe93701fc9c0d861d096cd9 華南師範大學圖書館 1 1 http://lib.scnu.edu.cn/
複製代碼
詳細請參考這個博客,很是好:https://blog.csdn.net/zyp13781913772/article/details/81428862
複製代碼
val sourceRdd = sc.textFile("hdfs://bd-master:9000/opendir/source.txt")
sourceRdd.zipWithIndex.take(1)
Array(
(20111230000005 57375476989eea12893c0c3811607bcf 奇藝高清 1 1 http://www.qiyi.com/, 0),
(20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙傳 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1, 1)
)
++ => ++= 數組追加
+= => +=: 在數組前面追加元素
val sourceWithIndexRdd = sourceRdd.zipWithIndex.map(tuple =>
{val array = scala.collection.mutable.ArrayBuffer[String]();
array++=(tuple._1.split("\t"));
tuple._2.toString +=: array;
array.toArray})
Array(
Array(0, 20111230000005, 57375476989eea12893c0c3811607bcf, 奇藝高清, 1, 1, http://www.qiyi.com/),
Array(1, 20111230000005, 66c5bb7774e31d0a22278249b26bc83a, 凡人修仙傳, 3, 1, http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1)
)
sourceWithIndexRdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/source_index")
複製代碼
source_index:sql
Array[String] = Array(
0 20111230000005 57375476989eea12893c0c3811607bcf 奇藝高清 1 1 http://www.qiyi.com/,
1 20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙傳 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
)
複製代碼
數據模擬:apache
val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*")
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(1).trim()) })
(Int, String) = (479936,20111230000005)
//100萬條數據集
val kvRdd2 = kvRdd.map(x=>{if(x._1 < 900001) (900001,x._2) else x})
kvRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/big_data/")
//1萬條數據集
val joinRdd2 = kvRdd.filter(_._1 > 900000)
joinRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/small_data/")
複製代碼
map reduce:
val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*")
val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*")
val joinRdd = sourceRdd.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
複製代碼
mapSide:
val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*")
val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*")
//100萬條數據集
val joinRdd = sourceRdd.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
//1萬條數據集
val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
val broadcastVar = sc.broadcast(joinRdd2.collectAsMap)
joinRdd.map(x => (x._1,(x._2,broadcastVar.value.getOrElse(x._1,"")))).count
複製代碼
數據模擬--90萬如下的id統一改成8的倍數,所以以並行度爲12的計算,數據傾斜在taskid=8的任務上:api
val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index")
case class brower(id:Int, time:Long, uid:String, keyword:String, url_rank:Int, click_num:Int, click_url:String) extends Serializable
val ds = sourceRdd.map(_.split("\t")).map(attr => brower(attr(0).toInt, attr(1).toLong, attr(2), attr(3), attr(4).toInt, attr(5).toInt, attr(6))).toDS
ds.createOrReplaceTempView("sourceTable")
val newSource = spark.sql("SELECT CASE WHEN id < 900000 THEN (8 + (CAST (RAND() * 50000 AS bigint)) * 12 ) ELSE id END, time, uid, keyword, url_rank, click_num, click_url FROM sourceTable")
newSource.rdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/test_data")
複製代碼
val sourceRdd = sc.textFile("hdfs://bd-master:9000/test_data/p*")
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(1).trim()) })
kvRdd.groupByKey(12).count
複製代碼
kvRdd.groupByKey(17).count
複製代碼
val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*",13)
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(4).trim().toInt) })
數據傾斜的key爲20001,總共980000個,所以能夠經過隨機id達到均勻id:
val kvRdd2 = kvRdd.map(x=>{if(x._1 > 20000) (20001,x._2) else x})
複製代碼
kvRdd2.groupByKey().collect
複製代碼
val kvRdd3 = kvRdd2.map(x=>{if (x._1 ==20001) (x._1 + scala.util.Random.nextInt(100),x._2) else x})
kvRdd3.sortByKey(false).collect
複製代碼
package skewTuring;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Random;
/**
* 方案適用場景:RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。
* 兩階段聚合(局部聚合+全局聚合)
*/
public class SkewTuring11 {
public static void main(String[] args) throws Exception{
// 構建Spark上下文
SparkConf conf = new SparkConf().setAppName("SkewTuring11");
conf.setMaster("local[8]");
JavaSparkContext sc = new JavaSparkContext(conf);
//0 20111230000005 57375476989eea12893c0c3811607bcf 奇藝高清 1 1 http://www.qiyi.com/
JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
// 第一步,給RDD中的每一個key都打上一個隨機前綴。
JavaPairRDD<String, Long> randomPrefixRdd = sourceRdd.mapToPair(new PairFunction<String,String,Long>() {
@Override
public Tuple2<String, Long> call(String s) throws Exception {
String[] splits = s.split("\t");
Random random = new Random();
int prefix = random.nextInt(10);
Long key = Long.valueOf(splits[0]);
if(key > 10000) {
return new Tuple2<String, Long>(prefix + "_" + 10001L, 1L);
} else {
return new Tuple2<String, Long>(prefix + "_" + key,1L);
}
}
});
// 第二步,對打上隨機前綴的key進行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// 第三步,去除RDD中每一個key的隨機前綴。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// 第四步,對去除了隨機前綴的RDD進行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
System.out.println("*********************************************");
System.out.println(globalAggrRdd.first());
}
複製代碼
}數組
package skewTuring;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* 方案適用場景:若是出現數據傾斜,是由於其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另外一個RDD/Hive表中的全部key都分佈比較均勻
* 採樣傾斜key並分拆join操做
*/
public class SkewTuring22 {
public static void main(String[] args) throws Exception{
// 構建Spark上下文
SparkConf conf = new SparkConf().setAppName("SkewTuring11");
conf.setMaster("local[8]");
JavaSparkContext sc = new JavaSparkContext(conf);
//主源數據--少許傾斜key
//0 20111230000005 57375476989eea12893c0c3811607bcf 奇藝高清 1 1 http://www.qiyi.com/
JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
if(key > 10000) {
return new Tuple2<Long,String>(10001L, value);
} else {
return new Tuple2<Long,String>(key, value);
}
}
});
//副源數據 -均勻key
JavaPairRDD<Long,String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
return new Tuple2<Long,String>(key, value);
}
});
//首先從包含了少數幾個致使數據傾斜key的randomPrefixRdd中,採樣10%的樣本數據。
JavaPairRDD<Long,String> sampledRDD = mapdSourceRdd.sample(false, 0.1);
System.out.println(" 隨機採樣:"+sampledRDD.first());
// 對樣本數據RDD統計出每一個key的出現次數,並按出現次數降序排序。
// 對降序排序後的數據,取出top 1或者top 100的數據,也就是key最多的前n個數據。
// 具體取出多少個數據量最多的key,由你們本身決定,咱們這裏就取1個做爲示範。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
}
});
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
System.out.println("數據傾斜id"+skewedUserid);
/**
* 主源數據 過濾傾斜key 造成獨立的RDD
*/
JavaPairRDD<Long, String> skewedRDD = mapdSourceRdd.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
System.out.println("主源數據 傾斜數據 rdd:"+ skewedRDD.take(100));
// 從mapdSourceRdd中分拆出不致使數據傾斜的普通key,造成獨立的RDD。
JavaPairRDD<Long, String> commonRDD = mapdSourceRdd.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});
System.out.println("主源數據 常規數據 rdd:"+ commonRDD.take(100));
/**
* sourceRdd2 副源數據 過濾傾斜數據 隨機擴容N倍
*/
// rdd2,就是那個全部key的分佈相對較爲均勻的rdd。
// 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
// 對擴容的每條數據,都打上0~100的前綴。
JavaPairRDD<String, String> skewedRandomRDD2 = mapdSourceRdd2.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
for (int i = 0; i < 10; i++) {
list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
}
return list.iterator();
}
});
System.out.println("副源數據 擴容表處理:" + skewedRandomRDD2.take(100));
/**
* 主源傾斜數據 key+隨機數
*/
// 將rdd1中分拆出來的致使傾斜的key的獨立rdd,每條數據都打上100之內的隨機前綴。
// 而後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
final JavaPairRDD<String, String> skewedRandomRDD = skewedRDD.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
System.out.println("主源數據 隨機數處理:" + skewedRandomRDD.take(100));
JavaPairRDD<Long, Tuple2<String, String>> joinedRDD1 = skewedRandomRDD
.join(skewedRandomRDD2)
.mapToPair(new PairFunction<Tuple2<String,Tuple2<String,String>>, Long, Tuple2<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Tuple2<String, String>> call(Tuple2<String, Tuple2<String, String>> tuple) throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, String>>(key, tuple._2);
}
});
System.out.println("主 副源數據 傾斜數據 join 處理:" + joinedRDD1.take(100));
// 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。
JavaPairRDD<Long, Tuple2<String, String>> joinedRDD2 = commonRDD.join(mapdSourceRdd2);
System.out.println("主 副源數據 常規數據 join 處理:" + joinedRDD2.take(100));
// 將傾斜key join後的結果與普通key join後的結果,uinon起來。
// 就是最終的join結果。
JavaPairRDD<Long, Tuple2<String, String>> resultRDD = joinedRDD1.union(joinedRDD2);
System.out.println("最終join結果:"+ resultRDD.sample(false, 0.1).take(100));
}
}
複製代碼
Join一側:若是出現數據傾斜的Key比較多,上一種方法將這些大量的傾斜Key分拆出來,意義不大。此時更適合直接對存在數據傾斜的數據集所有加上隨機前綴app
Join另一側:對另一個不存在嚴重數據傾斜的數據集總體與隨機前綴集做笛卡爾乘積(即將數據量擴大N倍)。dom
package skewTuring;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* 方案適用場景:若是在進行join操做時,RDD中有大量的key致使數據傾斜
* 使用隨機前綴和擴容RDD進行join
*/
public class SkewTuring33 {
public static void main(String[] args) throws Exception{
// 構建Spark上下文
SparkConf conf = new SparkConf().setAppName("SkewTuring11");
conf.setMaster("local[8]");
JavaSparkContext sc = new JavaSparkContext(conf);
//主源數據1--存在大量傾斜key
JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
//主源數據--大量傾斜key
//0 20111230000005 57375476989eea12893c0c3811607bcf 奇藝高清 1 1 http://www.qiyi.com/
JavaRDD<String> sourceRdd1 = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
if(key > 10000) {
return new Tuple2<Long,String>(10001L, value);
} else {
return new Tuple2<Long,String>(key, value);
}
}
});
//副源數據2--均勻key
JavaPairRDD<Long, String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
return new Tuple2<Long,String>(key, value);
}
});
/**
* 主源傾斜數據 key+隨機數
*/
// 將rdd1中分拆出來的致使傾斜的key的獨立rdd,每條數據都打上100之內的隨機前綴。
// 而後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。
final JavaPairRDD<String, String> skewedRandomRDD = mapdSourceRdd.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
System.out.println("主源數據 傾斜數據 rdd:"+ skewedRandomRDD.take(100));
/**
* sourceRdd2 均勻key 擴容N倍
*/
// rdd2,就是那個全部key的分佈相對較爲均勻的rdd。
// 這裏將rdd2中,前面獲取到的key對應的數據,過濾出來,分拆成單獨的rdd,並對rdd中的數據使用flatMap算子都擴容100倍。
// 對擴容的每條數據,都打上0~100的前綴。
JavaPairRDD<String, String> expandedRDD = mapdSourceRdd2.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
for (int i = 0; i < 100; i++) {
list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
}
return list.iterator();
}
});
System.out.println("副源數據 擴容表處理 :" + expandedRDD.take(100));
// 將兩個處理後的RDD進行join便可。
JavaPairRDD<String, Tuple2<String, String>> joinedRDD = skewedRandomRDD.join(expandedRDD);
System.out.println("最終join結果:"+ joinedRDD.take(100));
}
}
複製代碼
本文主要從數據傾斜的角度進行了分析,經過實際的案例測試進行了總結和昇華,一片成文的博客實屬不易,但願各自珍惜!!ide
秦凱新 於深圳post