正常狀況下,一個spark task要處理一個partition即128M的數據,因處理過程較耗時而成爲任務瓶頸。算法
大的方向是進行任務拆分,增大並行度。apache
優勢:RDD中定義的算子,能夠直接使用
缺點:使用以上算子來增大並行度,必定會進行shuffle操做
結論:測試發現,雖然增大了業務處理的並行度,但shuffle操做的開銷比較大,所以總體的耗時沒有明顯減小。bash
初始化SparkSession時進行以下代碼設置:oop
.config("mapreduce.input.fileinputformat.split.minsize","67108864") // 即爲想設置的分片大小:64M
.config("mapreduce.job.maps","1000") // 確保分片足夠大
複製代碼
用以實現spark讀取hive時,一個task處理一個64M的數據塊。
優勢:理論來講,並行度擴大一倍,耗時將減小一半。
結論:測試發下,耗時確實大幅度降低。源碼分析
調用鏈: HadoopTableReader#createHadoopRdd性能
HadoopRDD#getPartitions
FileInputFormat#getSplits
FileInputFormat#computeSplitSize測試
核心代碼片斷ui
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
sparkSession.sparkContext.defaultMinPartitions)
}
複製代碼
val rdd = new HadoopRDD(
sparkSession.sparkContext,
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
classOf[Writable],
_minSplitsPerRDD)
複製代碼
由HadoopTableReader生成HadoopRDD,參數:_minSplitsPerRDD在非local模式下可經過mapreduce.job.maps設置spa
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
// 多處省略
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
return splits.toArray(new FileSplit[splits.size()]);
}
複製代碼
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
複製代碼
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
複製代碼
最終數據分片的大小由Math.max(minSize, Math.min(goalSize, blockSize))
計算獲得,根據源碼可知:code
blockSize:hdfs實際存儲的blockSize,128M不可變
goalSize:totalSize / (numSplits == 0 ? 1 : numSplits)
numSplits local模式下爲0;其餘模式可經過:mapreduce.job.maps 配置
minSize:SPLIT_MINSIZE與minSplitSize的最大值
SPLIT_MINSIZE:默認爲1,可經過mapreduce.input.fileinputformat.split.minsize 配置
minSplitSize:默認爲1
複製代碼
默認狀況下, 返回結果爲128M。爲了讓計算結果爲減少,好比64M,只須要 minSize
爲64M,Math.min(goalSize, blockSize)
足夠小便可,即:
Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize)
,便可實現 minSize 爲64M結合spark分片機制進行參數設置,既提升任務並行度又避免shuffle的性能損耗。