Spark Parquet file split

在實際使用 spark + parquet 的時候, 遇到了兩個不解的地方:java

  1. 咱們只有一個 parquet 文件(小於 hdfs block size), 可是 spark 在某個 stage 生成了4個 tasks 來處理.
  2. 4個 tasks 中只有一個 task 處理了全部數據, 其餘幾個都沒有處理數據.

這兩個問題牽涉到對於 parquet spark 是如何來進行切分 partitions, 以及每一個 partition 要處理哪部分數據的.apache

先說結論, spark 中, parquet 是 splitable 的, 代碼見ParquetFileFormat#isSplitable. 那會不會把數據切碎? 答案是不會, 由於是以 spark row group 爲最小單位切分 parquet 的, 這也會致使一些 partitions 會沒有數據, 極端狀況下, 只有一個 row group 的話, partitions 再多, 也只會一個有數據.c#

接下來開始咱們的源碼之旅:session

處理流程

1. 根據 parquet 按文件大小切塊生成 partitions:

FileSourceScanExec#createNonBucketedReadRDD 中, 若是文件是 splitable 的, 會按照 maxSplitBytes 把文件切分, 最後生成的數量, 就是 RDD partition 的數量, 這個解釋了不解1, 代碼以下:app

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")

val splitFiles = selectedPartitions.flatMap { partition =>
  partition.files.flatMap { file =>
    val blockLocations = getBlockLocations(file)
    if (fsRelation.fileFormat.isSplitable(
        fsRelation.sparkSession, fsRelation.options, file.getPath)) {
      (0L until file.getLen by maxSplitBytes).map { offset =>
        val remaining = file.getLen - offset
        val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
        val hosts = getBlockHosts(blockLocations, offset, size)
        PartitionedFile(
          partition.values, file.getPath.toUri.toString, offset, size, hosts)
      }
    } else {
      val hosts = getBlockHosts(blockLocations, 0, file.getLen)
      Seq(PartitionedFile(
        partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
    }
  }
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
  if (currentFiles.nonEmpty) {
    val newPartition =
      FilePartition(
        partitions.size,
        currentFiles.toArray.toSeq) // Copy to a new Array.
    partitions += newPartition
  }
  currentFiles.clear()
  currentSize = 0
}

// Assign files to partitions using "First Fit Decreasing" (FFD)
splitFiles.foreach { file =>
  if (currentSize + file.length > maxSplitBytes) {
    closePartition()
  }
  // Add the given file to the current partition.
  currentSize += file.length + openCostInBytes
  currentFiles += file
}
closePartition()

new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

2. 使用 ParquetInputSplit 構造 reader:

ParquetFileFormat#buildReaderWithPartitionValues 實現中, 會使用 split 來初始化 reader, 而且根據配置能夠把 reader 分爲否是 vectorized 的:ide

  • vectorizedReader.initialize(split, hadoopAttemptContext)
  • reader.initialize(split, hadoopAttemptContext)

關於 步驟2 在畫外中還有更詳細的代碼, 但與本文的主流程關係不大, 這裏先不表.oop

3. 劃分 parquet 的 row groups 到不一樣的 partitions 中去

在 步驟1 中根據文件大小均分了一些 partitions, 但不是全部這些 partitions 最後都會有數據.ui

接回 步驟2 中的 init, 在 SpecificParquetRecordReaderBase#initialize 中, 會在 readFooter 的時候傳入一個 RangeMetadataFilter, 這個 filter 的range 是根據你的 split 的邊界來的, 最後會用這個 range 來劃定 row group 的歸屬:this

public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
    throws IOException, InterruptedException {
    ...
    footer = readFooter(configuration, file, range(inputSplit.getStart(), inputSplit.getEnd()));
    ...
}

parquet 的ParquetFileReader#readFooter方法會用到ParquetMetadataConverter#converter.readParquetMetadata(f, filter);, 這個readParquetMetadata對於RangeMetadataFilter的處理是:spa

@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
  return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}

終於到了最關鍵的切分的地方, 最關鍵的就是這一段, 誰擁有這個 row group的中點, 誰就能夠處理這個 row group.

如今假設咱們有一個40m 的文件, 只有一個 row group, 10m 一分, 那麼將會有4個 partitions, 可是隻有一個 partition 會佔有這個 row group 的中點, 因此也只有這一個 partition 會有數據.

long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
  newRowGroups.add(rowGroup);
}

完整代碼以下:

static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
  List<RowGroup> rowGroups = metaData.getRow_groups();
  List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
  for (RowGroup rowGroup : rowGroups) {
    long totalSize = 0;
    long startIndex = getOffset(rowGroup.getColumns().get(0));
    for (ColumnChunk col : rowGroup.getColumns()) {
      totalSize += col.getMeta_data().getTotal_compressed_size();
    }
    long midPoint = startIndex + totalSize / 2;
    if (filter.contains(midPoint)) {
      newRowGroups.add(rowGroup);
    }
  }
  metaData.setRow_groups(newRowGroups);
  return metaData;
}

畫外:

步驟2 中的代碼實際上是 spark 正兒八經如何讀文件的代碼, 最後返回一個FileScanRDD, 也很值得順路看一下, 完整代碼以下:

(file: PartitionedFile) => {
      assert(file.partitionValues.numFields == partitionSchema.size)

      val fileSplit =
        new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)

      val split =
        new org.apache.parquet.hadoop.ParquetInputSplit(
          fileSplit.getPath,
          fileSplit.getStart,
          fileSplit.getStart + fileSplit.getLength,
          fileSplit.getLength,
          fileSplit.getLocations,
          null)

      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
      val hadoopAttemptContext =
        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)

      // Try to push down filters when filter push-down is enabled.
      // Notice: This push-down is RowGroups level, not individual records.
      if (pushed.isDefined) {
        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
      }
      val parquetReader = if (enableVectorizedReader) {
        val vectorizedReader = new VectorizedParquetRecordReader()
        vectorizedReader.initialize(split, hadoopAttemptContext)
        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
        if (returningBatch) {
          vectorizedReader.enableReturningBatches()
        }
        vectorizedReader
      } else {
        logDebug(s"Falling back to parquet-mr")
        // ParquetRecordReader returns UnsafeRow
        val reader = pushed match {
          case Some(filter) =>
            new ParquetRecordReader[UnsafeRow](
              new ParquetReadSupport,
              FilterCompat.get(filter, null))
          case _ =>
            new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
        }
        reader.initialize(split, hadoopAttemptContext)
        reader
      }

      val iter = new RecordReaderIterator(parquetReader)
      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))

      // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
      if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
          enableVectorizedReader) {
        iter.asInstanceOf[Iterator[InternalRow]]
      } else {
        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
        val joinedRow = new JoinedRow()
        val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)

        // This is a horrible erasure hack...  if we type the iterator above, then it actually check
        // the type in next() and we get a class cast exception.  If we make that function return
        // Object, then we can defer the cast until later!
        if (partitionSchema.length == 0) {
          // There is no partition columns
          iter.asInstanceOf[Iterator[InternalRow]]
        } else {
          iter.asInstanceOf[Iterator[InternalRow]]
            .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
        }
      }
    }

這個返回的(PartitionedFile) => Iterator[InternalRow] , 是在FileSourceScanExec#inputRDD用的

private lazy val inputRDD: RDD[InternalRow] = {
  val readFile: (PartitionedFile) => Iterator[InternalRow] =
    relation.fileFormat.buildReaderWithPartitionValues(
      sparkSession = relation.sparkSession,
      dataSchema = relation.dataSchema,
      partitionSchema = relation.partitionSchema,
      requiredSchema = requiredSchema,
      filters = pushedDownFilters,
      options = relation.options,
      hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))

  relation.bucketSpec match {
    case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
      createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    case _ =>
      createNonBucketedReadRDD(readFile, selectedPartitions, relation)
  }
}

FileScanRDD

class FileScanRDD(
    @transient private val sparkSession: SparkSession,
    readFunction: (PartitionedFile) => Iterator[InternalRow],
    @transient val filePartitions: Seq[FilePartition])
  extends RDD[InternalRow](sparkSession.sparkContext, Nil) {

  override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {

    private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
    private[this] var currentFile: PartitionedFile = null // 根據 currentFile = files.next() 來的, 具體實現我就不貼了 有興趣的能夠本身看下.
    ...
    readFunction(currentFile)
    ...
  }
}

結論

提高一個 parquet 中的 row group 中的行數閾值, 籍此提示 spark 並行度.

相關文章
相關標籤/搜索