淺析Hive/Spark SQL讀文件時的輸入任務劃分

做者:
王道遠,花名健身,阿里雲EMR技術專家,Apache Spark活躍貢獻者,主要關注大數據計算優化相關工做。sql


Hive以及Spark SQL等大數據計算引擎爲咱們操做存儲在HDFS上結構化數據提供了易於上手的SQL接口,大大下降了ETL等操做的門檻,也所以在實際生產中有着普遍的應用。SQL是非過程化語言,咱們寫SQL的時候並不能控制具體的執行過程,它們依賴執行引擎決定。而Hive和Spark SQL做爲Map-Reduce模型的分佈式執行引擎,其執行過程首先就涉及到如何將輸入數據切分紅一個個任務,分配給不一樣的Map任務。在本文中,咱們就來說解Hive和Spark SQL是如何切分輸入路徑的。微信

Hive

Hive是起步較早的SQL on Hadoop項目,最先也是誕生於Hadoop中,因此輸入劃分這部分的代碼與Hadoop相關度很是高。如今Hive廣泛使用的輸入格式是CombineHiveInputFormat,它繼承於HiveInputFormat,而HiveInputFormat實現了Hadoop的InputFormat接口,其中的getSplits方法用來獲取具體的劃分結果,劃分出的一份輸入數據被稱爲一個「Split」。在執行時,每一個Split對應到一個map任務。在劃分Split時,首先挑出不能合併到一塊兒的目錄——好比開啓了事務功能的路徑。這些不能合併的目錄必須單獨處理,剩下的路徑交給私有方法getCombineSplits,這樣Hive的一個map task最多能夠處理多個目錄下的文件。在實際操做中,咱們通常只要經過set mapred.max.split.size=xx;便可控制文件合併的大小。當一個文件過大時,父類的getSplits也會幫咱們完成相應的切分工做。session

Spark SQL

Spark的表有兩種:DataSource表和Hive表。另外Spark後續版本中DataSource V2也將逐漸流行,目前還在不斷髮展中,暫時就不在這裏討論。咱們知道Spark SQL其實底層是Spark RDD,而RDD執行時,每一個map task會處理RDD的一個Partition中的數據(注意這裏的Partition是RDD的概念,要和表的Partition進行區分)。所以,Spark SQL做業的任務切分關鍵在於底層RDD的partition如何切分。併發

Data Source表

Spark SQL的DataSource表在最終執行的RDD類爲FileScanRDD,由FileSourceScanExec建立出來。在建立這種RDD的時候,具體的Partition直接做爲參數傳給了構造函數,所以劃分輸入的方法也在DataSourceScanExec.scala文件中。具體分兩步:首先把文件劃分爲PartitionFile,再將較小的PartitionFile進行合併。分佈式

第一步部分代碼以下:ide

if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>
    val remaining = file.getLen - offset
    val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
    val hosts = getBlockHosts(blockLocations, offset, size)
    PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {
    val hosts = getBlockHosts(blockLocations, 0, file.getLen)
    Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,
    0, file.getLen, partitionDeleteDeltas, hosts))
  }

咱們能夠看出,Spark SQL首先根據文件類型判斷單個文件是否可以切割,若是能夠則按maxSplitBytes進行切割。若是一個文件剩餘部分沒法填滿maxSplitBytes,也單獨做爲一個Partition。函數

第二部分代碼以下所示:oop

splitFiles.foreach { file =>
    if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }
    // Add the given file to the current partition.
    currentSize += file.length + openCostInBytes
    currentFiles += file
  }

這樣咱們就能夠依次遍歷第一步切好的塊,再按照maxSplitBytes進行合併。注意合併文件時還需加上打開文件的預估代價openCostInBytes。那麼maxSplitBytesopenCostInBytes這兩個關鍵參數怎麼來的呢?佈局

val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism
  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
  val bytesPerCore = totalBytes / defaultParallelism

  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

不難看出,主要是spark.sql.files.maxPartitionBytesspark.sql.files.openCostInBytes、調度器默認併發度以及全部輸入文件實際大小所控制。大數據

Hive表

Spark SQL中的Hive表底層的RDD類爲HadoopRDD,由HadoopTableReader類實現。不過此次,具體的Partition劃分仍是依賴HadoopRDDgetPartitions方法,具體實現以下:

override def getPartitions: Array[Partition] = {
    ...
    try {
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }

不難看出,在處理Hive表的時候,Spark SQL把任務劃分又交給了Hadoop的InputFormat那一套。不過須要注意的是,並非全部Hive表都歸爲這一類,Spark SQL會默認對ORC和Parquet的表進行轉化,用本身的Data Source實現OrcFileFormatParquetFileFormat來把這兩種表做爲Data Source表來處理。

總結

切分輸入路徑只是大數據處理的第一步,雖然不起眼,可是也絕對不可或缺。低效的文件劃分可能會給端到端的執行速度帶來巨大的負面影響,更有可能影響到輸出做業的文件佈局,從而影響到整個數據流水線上全部做業的執行效率。萬事開頭難,爲程序輸入選擇合適的配置參數,能夠有效改善程序執行效率。

留個思考題給讀者們:如何設置參數完全關閉Spark SQL data source表的文件合併?

 

本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索