做者:
王道遠,花名健身,阿里雲EMR技術專家,Apache Spark活躍貢獻者,主要關注大數據計算優化相關工做。sql
Hive以及Spark SQL等大數據計算引擎爲咱們操做存儲在HDFS上結構化數據提供了易於上手的SQL接口,大大下降了ETL等操做的門檻,也所以在實際生產中有着普遍的應用。SQL是非過程化語言,咱們寫SQL的時候並不能控制具體的執行過程,它們依賴執行引擎決定。而Hive和Spark SQL做爲Map-Reduce模型的分佈式執行引擎,其執行過程首先就涉及到如何將輸入數據切分紅一個個任務,分配給不一樣的Map任務。在本文中,咱們就來說解Hive和Spark SQL是如何切分輸入路徑的。微信
Hive
Hive是起步較早的SQL on Hadoop項目,最先也是誕生於Hadoop中,因此輸入劃分這部分的代碼與Hadoop相關度很是高。如今Hive廣泛使用的輸入格式是CombineHiveInputFormat
,它繼承於HiveInputFormat
,而HiveInputFormat
實現了Hadoop的InputFormat
接口,其中的getSplits
方法用來獲取具體的劃分結果,劃分出的一份輸入數據被稱爲一個「Split」。在執行時,每一個Split對應到一個map任務。在劃分Split時,首先挑出不能合併到一塊兒的目錄——好比開啓了事務功能的路徑。這些不能合併的目錄必須單獨處理,剩下的路徑交給私有方法getCombineSplits
,這樣Hive的一個map task最多能夠處理多個目錄下的文件。在實際操做中,咱們通常只要經過set mapred.max.split.size=xx;
便可控制文件合併的大小。當一個文件過大時,父類的getSplits
也會幫咱們完成相應的切分工做。session
Spark SQL
Spark的表有兩種:DataSource表和Hive表。另外Spark後續版本中DataSource V2也將逐漸流行,目前還在不斷髮展中,暫時就不在這裏討論。咱們知道Spark SQL其實底層是Spark RDD,而RDD執行時,每一個map task會處理RDD的一個Partition中的數據(注意這裏的Partition是RDD的概念,要和表的Partition進行區分)。所以,Spark SQL做業的任務切分關鍵在於底層RDD的partition如何切分。併發
Data Source表
Spark SQL的DataSource表在最終執行的RDD類爲FileScanRDD
,由FileSourceScanExec
建立出來。在建立這種RDD的時候,具體的Partition直接做爲參數傳給了構造函數,所以劃分輸入的方法也在DataSourceScanExec.scala
文件中。具體分兩步:首先把文件劃分爲PartitionFile
,再將較小的PartitionFile
進行合併。分佈式
第一步部分代碼以下:ide
if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(blockLocations, offset, size) PartitionedFile( partition.values, file.getPath.toUri.toString, offset, size, partitionDeleteDeltas, hosts) } } else { val hosts = getBlockHosts(blockLocations, 0, file.getLen) Seq(PartitionedFile(partition.values, file.getPath.toUri.toString, 0, file.getLen, partitionDeleteDeltas, hosts)) }
咱們能夠看出,Spark SQL首先根據文件類型判斷單個文件是否可以切割,若是能夠則按maxSplitBytes
進行切割。若是一個文件剩餘部分沒法填滿maxSplitBytes
,也單獨做爲一個Partition。函數
第二部分代碼以下所示:oop
splitFiles.foreach { file =>
if (currentSize + file.length > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += file.length + openCostInBytes
currentFiles += file
}
這樣咱們就能夠依次遍歷第一步切好的塊,再按照maxSplitBytes
進行合併。注意合併文件時還需加上打開文件的預估代價openCostInBytes
。那麼maxSplitBytes
和openCostInBytes
這兩個關鍵參數怎麼來的呢?佈局
val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
不難看出,主要是spark.sql.files.maxPartitionBytes
、spark.sql.files.openCostInBytes
、調度器默認併發度以及全部輸入文件實際大小所控制。大數據
Hive表
Spark SQL中的Hive表底層的RDD類爲HadoopRDD
,由HadoopTableReader
類實現。不過此次,具體的Partition劃分仍是依賴HadoopRDD
的getPartitions
方法,具體實現以下:
override def getPartitions: Array[Partition] = { ... try { val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { ... } }
不難看出,在處理Hive表的時候,Spark SQL把任務劃分又交給了Hadoop的InputFormat那一套。不過須要注意的是,並非全部Hive表都歸爲這一類,Spark SQL會默認對ORC和Parquet的表進行轉化,用本身的Data Source實現OrcFileFormat
和ParquetFileFormat
來把這兩種表做爲Data Source表來處理。
總結
切分輸入路徑只是大數據處理的第一步,雖然不起眼,可是也絕對不可或缺。低效的文件劃分可能會給端到端的執行速度帶來巨大的負面影響,更有可能影響到輸出做業的文件佈局,從而影響到整個數據流水線上全部做業的執行效率。萬事開頭難,爲程序輸入選擇合適的配置參數,能夠有效改善程序執行效率。
留個思考題給讀者們:如何設置參數完全關閉Spark SQL data source表的文件合併?
本文分享自微信公衆號 - Apache Spark技術交流社區(E-MapReduce_Spark)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。