Spark讀hive text表之非shuffle方式增大並行度

背景介紹:

  • cdh集羣、hadoop2.6.0、spark2.3.0
  • hive表:text格式存儲
  • 數據塊:128M
  • 處理過程:讀取hive表 -> 業務處理(無聚合操做) -> 寫入hive、es

問題描述:

正常狀況下,一個spark task要處理一個partition即128M的數據,因處理過程較耗時而成爲任務瓶頸。算法

解決過程:

大的方向是進行任務拆分,增大並行度。apache

  • 方法一:使用spark提供的repartition/coalesce

優勢:RDD中定義的算子,能夠直接使用
缺點:使用以上算子來增大並行度,必定會進行shuffle操做
結論:測試發現,雖然增大了業務處理的並行度,但shuffle操做的開銷比較大,所以總體的耗時沒有明顯減小。bash

  • 方法二:基於spark讀text格式文件的分片算法,從源頭減少數據塊以增大並行度

初始化SparkSession時進行以下代碼設置:oop

.config("mapreduce.input.fileinputformat.split.minsize","67108864") // 即爲想設置的分片大小:64M
.config("mapreduce.job.maps","1000")  // 確保分片足夠大

複製代碼

用以實現spark讀取hive時,一個task處理一個64M的數據塊。
優勢:理論來講,並行度擴大一倍,耗時將減小一半。
結論:測試發下,耗時確實大幅度降低。源碼分析

源碼分析

調用鏈: HadoopTableReader#createHadoopRdd性能

HadoopRDD#getPartitions
  FileInputFormat#getSplits
    FileInputFormat#computeSplitSize測試

核心代碼片斷ui

private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
    0 // will splitted based on block by default.
  } else {
    math.max(hadoopConf.getInt("mapreduce.job.maps", 1),
      sparkSession.sparkContext.defaultMinPartitions)
  }

複製代碼
val rdd = new HadoopRDD(
      sparkSession.sparkContext,
      _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
      Some(initializeJobConfFunc),
      inputFormatClass,
      classOf[Writable],
      classOf[Writable],
      _minSplitsPerRDD)

複製代碼

由HadoopTableReader生成HadoopRDD,參數:_minSplitsPerRDD在非local模式下可經過mapreduce.job.maps設置spa

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    // 多處省略

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

   long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(goalSize, minSize, blockSize); 
    

   return splits.toArray(new FileSplit[splits.size()]);
  }
複製代碼
protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

複製代碼
public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";

複製代碼

最終數據分片的大小由Math.max(minSize, Math.min(goalSize, blockSize))計算獲得,根據源碼可知:code

blockSize:hdfs實際存儲的blockSize,128M不可變
goalSize:totalSize / (numSplits == 0 ? 1 : numSplits)
numSplits local模式下爲0;其餘模式可經過:mapreduce.job.maps 配置

minSize:SPLIT_MINSIZE與minSplitSize的最大值
SPLIT_MINSIZE:默認爲1,可經過mapreduce.input.fileinputformat.split.minsize 配置
minSplitSize:默認爲1
複製代碼

默認狀況下, 返回結果爲128M。爲了讓計算結果爲減少,好比64M,只須要 minSize爲64M,Math.min(goalSize, blockSize)足夠小便可,即:

  • 設置 numSplits 足夠大好比1000(參數:mapreduce.job.maps),就能保證goalSize足夠小,進而保證Math.min(goalSize, blockSize)足夠小
  • 設置 SPLIT_MINSIZE 爲64M(參數:mapreduce.input.fileinputformat.split.minsize),根據 Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize),便可實現 minSize 爲64M

結論總結

結合spark分片機制進行參數設置,既提升任務並行度又避免shuffle的性能損耗。

相關文章
相關標籤/搜索