在實際使用 spark + parquet 的時候, 遇到了兩個不解的地方:java
這兩個問題牽涉到對於 parquet spark 是如何來進行切分 partitions, 以及每一個 partition 要處理哪部分數據的.apache
先說結論, spark 中, parquet 是 splitable 的, 代碼見ParquetFileFormat#isSplitable
. 那會不會把數據切碎? 答案是不會, 由於是以 spark row group 爲最小單位切分 parquet 的, 這也會致使一些 partitions 會沒有數據, 極端狀況下, 只有一個 row group 的話, partitions 再多, 也只會一個有數據.c#
接下來開始咱們的源碼之旅:session
在 FileSourceScanExec#createNonBucketedReadRDD
中, 若是文件是 splitable 的, 會按照 maxSplitBytes 把文件切分, 最後生成的數量, 就是 RDD partition 的數量, 這個解釋了不解1, 代碼以下:app
val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.") val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(blockLocations, offset, size) PartitionedFile( partition.values, file.getPath.toUri.toString, offset, size, hosts) } } else { val hosts = getBlockHosts(blockLocations, 0, file.getLen) Seq(PartitionedFile( partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } } }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] var currentSize = 0L /** Close the current partition and move to the next. */ def closePartition(): Unit = { if (currentFiles.nonEmpty) { val newPartition = FilePartition( partitions.size, currentFiles.toArray.toSeq) // Copy to a new Array. partitions += newPartition } currentFiles.clear() currentSize = 0 } // Assign files to partitions using "First Fit Decreasing" (FFD) splitFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { closePartition() } // Add the given file to the current partition. currentSize += file.length + openCostInBytes currentFiles += file } closePartition() new FileScanRDD(fsRelation.sparkSession, readFile, partitions)
在 ParquetFileFormat#buildReaderWithPartitionValues
實現中, 會使用 split 來初始化 reader, 而且根據配置能夠把 reader 分爲否是 vectorized 的:ide
vectorizedReader.initialize(split, hadoopAttemptContext)
reader.initialize(split, hadoopAttemptContext)
關於 步驟2 在畫外中還有更詳細的代碼, 但與本文的主流程關係不大, 這裏先不表.oop
在 步驟1 中根據文件大小均分了一些 partitions, 但不是全部這些 partitions 最後都會有數據.ui
接回 步驟2 中的 init, 在 SpecificParquetRecordReaderBase#initialize
中, 會在 readFooter
的時候傳入一個 RangeMetadataFilter
, 這個 filter 的range 是根據你的 split 的邊界來的, 最後會用這個 range 來劃定 row group 的歸屬:this
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { ... footer = readFooter(configuration, file, range(inputSplit.getStart(), inputSplit.getEnd())); ... }
parquet 的ParquetFileReader#readFooter
方法會用到ParquetMetadataConverter#converter.readParquetMetadata(f, filter);
, 這個readParquetMetadata
對於RangeMetadataFilter
的處理是:spa
@Override public FileMetaData visit(RangeMetadataFilter filter) throws IOException { return filterFileMetaDataByMidpoint(readFileMetaData(from), filter); }
終於到了最關鍵的切分的地方, 最關鍵的就是這一段, 誰擁有這個 row group的中點, 誰就能夠處理這個 row group.
如今假設咱們有一個40m 的文件, 只有一個 row group, 10m 一分, 那麼將會有4個 partitions, 可是隻有一個 partition 會佔有這個 row group 的中點, 因此也只有這一個 partition 會有數據.
long midPoint = startIndex + totalSize / 2; if (filter.contains(midPoint)) { newRowGroups.add(rowGroup); }
完整代碼以下:
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) { List<RowGroup> rowGroups = metaData.getRow_groups(); List<RowGroup> newRowGroups = new ArrayList<RowGroup>(); for (RowGroup rowGroup : rowGroups) { long totalSize = 0; long startIndex = getOffset(rowGroup.getColumns().get(0)); for (ColumnChunk col : rowGroup.getColumns()) { totalSize += col.getMeta_data().getTotal_compressed_size(); } long midPoint = startIndex + totalSize / 2; if (filter.contains(midPoint)) { newRowGroups.add(rowGroup); } } metaData.setRow_groups(newRowGroups); return metaData; }
步驟2 中的代碼實際上是 spark 正兒八經如何讀文件的代碼, 最後返回一個FileScanRDD
, 也很值得順路看一下, 完整代碼以下:
(file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) val fileSplit = new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty) val split = new org.apache.parquet.hadoop.ParquetInputSplit( fileSplit.getPath, fileSplit.getStart, fileSplit.getStart + fileSplit.getLength, fileSplit.getLength, fileSplit.getLocations, null) val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) // Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } vectorizedReader } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow val reader = pushed match { case Some(filter) => new ParquetRecordReader[UnsafeRow]( new ParquetReadSupport, FilterCompat.get(filter, null)) case _ => new ParquetRecordReader[UnsafeRow](new ParquetReadSupport) } reader.initialize(split, hadoopAttemptContext) reader } val iter = new RecordReaderIterator(parquetReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close())) // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && enableVectorizedReader) { iter.asInstanceOf[Iterator[InternalRow]] } else { val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = new JoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) // This is a horrible erasure hack... if we type the iterator above, then it actually check // the type in next() and we get a class cast exception. If we make that function return // Object, then we can defer the cast until later! if (partitionSchema.length == 0) { // There is no partition columns iter.asInstanceOf[Iterator[InternalRow]] } else { iter.asInstanceOf[Iterator[InternalRow]] .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) } } }
這個返回的(PartitionedFile) => Iterator[InternalRow]
, 是在FileSourceScanExec#inputRDD
用的
private lazy val inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) relation.bucketSpec match { case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled => createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation) case _ => createNonBucketedReadRDD(readFile, selectedPartitions, relation) } }
FileScanRDD
class FileScanRDD( @transient private val sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sparkSession.sparkContext, Nil) { override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { private[this] val files = split.asInstanceOf[FilePartition].files.toIterator private[this] var currentFile: PartitionedFile = null // 根據 currentFile = files.next() 來的, 具體實現我就不貼了 有興趣的能夠本身看下. ... readFunction(currentFile) ... } }
提高一個 parquet 中的 row group 中的行數閾值, 籍此提示 spark 並行度.