Spark Shuffle機制詳細源碼解析

Shuffle過程主要分爲Shuffle write和Shuffle read兩個階段,2.0版本以後hash shuffle被刪除,只保留sort shuffle,下面結合代碼分析:java

1.ShuffleManager

Spark在初始化SparkEnv的時候,會在create()方法裏面初始化ShuffleManagerapache

// Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
    val shuffleMgrClass =
      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

這裏能夠看到包含sort和tungsten-sort兩種shuffle,經過反射建立了ShuffleManager,ShuffleManager是一個特質,核心方法有下面幾個:數組

private[spark] trait ShuffleManager {

  /**
   * 註冊一個shuffle返回句柄
   */
  def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle

  /** 獲取一個Writer根據給定的分區,在executors執行map任務時被調用 */
  def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Long,
      context: TaskContext,
      metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V]

  /**
   * 獲取一個Reader根據reduce分區的範圍,在executors執行reduce任務時被調用
   */
  def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext,
      metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C]
	...
}

2.SortShuffleManager

SortShuffleManager是ShuffleManager的惟一實現類,對於以上三個方法的實現以下:緩存

2.1 registerShuffle

/**
   * Obtains a [[ShuffleHandle]] to pass to tasks.
   */
  override def registerShuffle[K, V, C](
      shuffleId: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    // 1.首先檢查是否符合BypassMergeSort
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
      // 2.不然檢查是否可以序列化
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, dependency)
    }
  }

1.首先檢查是否符合BypassMergeSort,這裏須要知足兩個條件,首先是當前shuffle依賴中沒有map端的聚合操做,其次是分區數要小於spark.shuffle.sort.bypassMergeThreshold的值,默認爲200,若是知足這兩個條件,會返回BypassMergeSortShuffleHandle,啓用bypass merge-sort shuffle機制數據結構

def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
  // We cannot bypass sorting if we need to do map-side aggregation.
  if (dep.mapSideCombine) {
    false
  } else {
    // 默認值爲200
    val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
    dep.partitioner.numPartitions <= bypassMergeThreshold
  }
}

2.若是不知足上面條件,檢查是否知足canUseSerializedShuffle()方法,若是知足該方法中的3個條件,則會返回SerializedShuffleHandle,啓用tungsten-sort shuffle機制app

def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
  val shufId = dependency.shuffleId
  val numPartitions = dependency.partitioner.numPartitions
  // 序列化器須要支持Relocation
  if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
    log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
      s"${dependency.serializer.getClass.getName}, does not support object relocation")
    false
    // 不能有map端聚合操做
  } else if (dependency.mapSideCombine) {
    log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
      s"map-side aggregation")
    false
    // 分區數不能大於16777215+1
  } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
    log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
      s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
    false
  } else {
    log.debug(s"Can use serialized shuffle for shuffle $shufId")
    true
  }
}

3.若是以上兩個條件都不知足的話,會返回BaseShuffleHandle,採用基本sort shuffle機制ide

2.2 getReader

/**
 * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
 * Called on executors by reduce tasks.
 */
override def getReader[K, C](
    handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext,
    metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
  val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
    handle.shuffleId, startPartition, endPartition)
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
    shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
}

這裏返回BlockStoreShuffleReader函數

2.3 getWriter

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Long,
    context: TaskContext,
    metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
  val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
    handle.shuffleId, _ => new OpenHashSet[Long](16))
  mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
  val env = SparkEnv.get
  // 根據handle獲取不一樣ShuffleWrite
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf,
        metrics,
        shuffleExecutorComponents)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
        bypassMergeSortHandle,
        mapId,
        env.conf,
        metrics,
        shuffleExecutorComponents)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(
        shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
  }
}

這裏會根據handle獲取不一樣ShuffleWrite,若是是SerializedShuffleHandle,使用UnsafeShuffleWriter,若是是BypassMergeSortShuffleHandle,採用BypassMergeSortShuffleWriter,不然使用SortShuffleWriterui

3.三種Writer的實現

如上文所說,當開啓bypass機制後,會使用BypassMergeSortShuffleWriter,若是serializer支持relocation而且map端沒有聚合同時分區數目不大於16777215+1三個條件都知足,使用UnsafeShuffleWriter,不然使用SortShuffleWriterthis

3.1 BypassMergeSortShuffleWriter

BypassMergeSortShuffleWriter繼承ShuffleWriter,用java實現,會將map端的多個輸出文件合併爲一個文件,同時生成一個索引文件,索引記錄到每一個分區的初始地址,write()方法以下:

@Override
public void write(Iterator<Product2<K, V>> records) throws IOException {
  assert (partitionWriters == null);
  // 新建一個ShuffleMapOutputWriter
  ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
  .createMapOutputWriter(shuffleId, mapId, numPartitions);
  try {
    // 若是沒有數據的話
    if (!records.hasNext()) {
      // 返回全部分區的寫入長度
      partitionLengths = mapOutputWriter.commitAllPartitions();
      // 更新mapStatus
      mapStatus = MapStatus$.MODULE$.apply(
        blockManager.shuffleServerId(), partitionLengths, mapId);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    // 建立和分區數相等的DiskBlockObjectWriter FileSegment
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    // 對於每一個分區
    for (int i = 0; i < numPartitions; i++) {
      // 建立一個臨時的block
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
      blockManager.diskBlockManager().createTempShuffleBlock();
      // 獲取temp block的file和id
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      // 對於每一個分區,建立一個DiskBlockObjectWriter
      partitionWriters[i] =
      blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
    // Creating the file to write to and creating a disk writer both involve interacting with
    // the disk, and can take a long time in aggregate when we open many files, so should be
    // included in the shuffle write time.
    // 建立文件和寫入文件都須要大量時間,也須要包含在shuffle寫入時間裏面
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

    // 若是有數據的話
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      // 對於每條數據按key寫入相應分區對應的文件
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    for (int i = 0; i < numPartitions; i++) {
      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
        // 提交
        partitionWriterSegments[i] = writer.commitAndGet();
      }
    }

    // 將全部分區文件合併成一個文件
    partitionLengths = writePartitionedData(mapOutputWriter);
    // 更新mapStatus
    mapStatus = MapStatus$.MODULE$.apply(
      blockManager.shuffleServerId(), partitionLengths, mapId);
  } catch (Exception e) {
    try {
      mapOutputWriter.abort(e);
    } catch (Exception e2) {
      logger.error("Failed to abort the writer after failing to write map output.", e2);
      e.addSuppressed(e2);
    }
    throw e;
  }
}

合併文件的方法writePartitionedData()以下,默認採用零拷貝的方式來合併文件:

private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
  // Track location of the partition starts in the output file
  if (partitionWriters != null) {
    // 開始時間
    final long writeStartTime = System.nanoTime();
    try {
      for (int i = 0; i < numPartitions; i++) {
        // 獲取每一個文件
        final File file = partitionWriterSegments[i].file();
        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
        if (file.exists()) {
          // 採起零拷貝方式
          if (transferToEnabled) {
            // Using WritableByteChannelWrapper to make resource closing consistent between
            // this implementation and UnsafeShuffleWriter.
            Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
            // 在這裏會調用Utils.copyFileStreamNIO方法,最終調用FileChannel.transferTo方法拷貝文件
            if (maybeOutputChannel.isPresent()) {
              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
            } else {
              writePartitionedDataWithStream(file, writer);
            }
          } else {
            // 不然採起流的方式拷貝
            writePartitionedDataWithStream(file, writer);
          }
          if (!file.delete()) {
            logger.error("Unable to delete file for partition {}", i);
          }
        }
      }
    } finally {
      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
    }
    partitionWriters = null;
  }
  return mapOutputWriter.commitAllPartitions();
}

3.2 UnsafeShuffleWriter

UnsafeShuffleWriter也是繼承ShuffleWriter,用java實現,write方法以下:

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
  // Keep track of success so we know if we encountered an exception
  // We do this rather than a standard try/catch/re-throw to handle
  // generic throwables.
  // 跟蹤異常
  boolean success = false;
  try {
    while (records.hasNext()) {
      // 將數據插入ShuffleExternalSorter進行外部排序
      insertRecordIntoSorter(records.next());
    }
    // 合併並輸出文件
    closeAndWriteOutput();
    success = true;
  } finally {
    if (sorter != null) {
      try {
        sorter.cleanupResources();
      } catch (Exception e) {
        // Only throw this error if we won't be masking another
        // error.
        if (success) {
          throw e;
        } else {
          logger.error("In addition to a failure during writing, we failed during " +
                       "cleanup.", e);
        }
      }
    }
  }
}

這裏主要有兩個方法:

3.2.1 insertRecordIntoSorter()

@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
  assert(sorter != null);
  // 獲取key和分區
  final K key = record._1();
  final int partitionId = partitioner.getPartition(key);
  // 重置緩衝區
  serBuffer.reset();
  // 將key和value寫入緩衝區
  serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
  serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
  serOutputStream.flush();

  // 獲取序列化數據大小
  final int serializedRecordSize = serBuffer.size();
  assert (serializedRecordSize > 0);

  // 將序列化後的數據插入ShuffleExternalSorter處理
  sorter.insertRecord(
    serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}

該方法會將數據進行序列化,而且將序列化後的數據經過insertRecord()方法插入外部排序器中,insertRecord()方法以下:

public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
  throws IOException {

  // for tests
  assert(inMemSorter != null);
  // 若是數據條數超過溢寫閾值,直接溢寫磁盤
  if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
    logger.info("Spilling data because number of spilledRecords crossed the threshold " +
      numElementsForSpillThreshold);
    spill();
  }

  // Checks whether there is enough space to insert an additional record in to the sort pointer
  // array and grows the array if additional space is required. If the required space cannot be
  // obtained, then the in-memory data will be spilled to disk.
  // 檢查是否有足夠的空間插入額外的記錄到排序指針數組中,若是須要額外的空間對數組進行擴容,若是空間不夠,內存中的數據將會被溢寫到磁盤上
  growPointerArrayIfNecessary();
  final int uaoSize = UnsafeAlignedOffset.getUaoSize();
  // Need 4 or 8 bytes to store the record length.
  // 須要額外的4或8個字節存儲數據長度
  final int required = length + uaoSize;
  // 若是須要更多的內存,會想TaskMemoryManager申請新的page
  acquireNewPageIfNecessary(required);

  assert(currentPage != null);
  final Object base = currentPage.getBaseObject();
  //Given a memory page and offset within that page, encode this address into a 64-bit long.
  //This address will remain valid as long as the corresponding page has not been freed.
  // 經過給定的內存頁和偏移量,將當前數據的邏輯地址編碼成一個long型
  final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
  // 寫長度值
  UnsafeAlignedOffset.putSize(base, pageCursor, length);
  // 移動指針
  pageCursor += uaoSize;
  // 寫數據
  Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);
  // 移動指針
  pageCursor += length;
  // 將編碼的邏輯地址和分區id傳給ShuffleInMemorySorter進行排序
  inMemSorter.insertRecord(recordAddress, partitionId);
}

在這裏對於數據的緩存和溢寫不借助於其餘高級數據結構,而是直接操做內存空間

growPointerArrayIfNecessary()方法以下:

/**
 * Checks whether there is enough space to insert an additional record in to the sort pointer
 * array and grows the array if additional space is required. If the required space cannot be
 * obtained, then the in-memory data will be spilled to disk.
 */
private void growPointerArrayIfNecessary() throws IOException {
  assert(inMemSorter != null);
  // 若是沒有空間容納新的數據
  if (!inMemSorter.hasSpaceForAnotherRecord()) {
    // 獲取當前內存使用量
    long used = inMemSorter.getMemoryUsage();
    LongArray array;
    try {
      // could trigger spilling
      // 分配給緩存原來兩倍的容量
      array = allocateArray(used / 8 * 2);
    } catch (TooLargePageException e) {
      // The pointer array is too big to fix in a single page, spill.
      // 若是超出了一頁的大小,直接溢寫,溢寫方法見後面
      // 一頁的大小爲128M,在PackedRecordPointer類中
      // static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;  // 128 megabytes
      spill();
      return;
    } catch (SparkOutOfMemoryError e) {
      // should have trigger spilling
      if (!inMemSorter.hasSpaceForAnotherRecord()) {
        logger.error("Unable to grow the pointer array");
        throw e;
      }
      return;
    }
    // check if spilling is triggered or not
    if (inMemSorter.hasSpaceForAnotherRecord()) {
      // 若是有了剩餘空間,則代表不必擴容,釋放分配的空間
      freeArray(array);
    } else {
      // 不然把原來的數組複製到新的數組
      inMemSorter.expandPointerArray(array);
    }
  }
}

spill()方法以下:

@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
  if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
    return 0L;
  }

  logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
    Thread.currentThread().getId(),
    Utils.bytesToString(getMemoryUsage()),
    spills.size(),
    spills.size() > 1 ? " times" : " time");

  // Sorts the in-memory records and writes the sorted records to an on-disk file.
  // This method does not free the sort data structures.
  // 對內存中的數據進行排序而且將有序記錄寫到一個磁盤文件中,這個方法不會釋放排序的數據結構
  writeSortedFile(false);
  final long spillSize = freeMemory();
  // 重置ShuffleInMemorySorter
  inMemSorter.reset();
  // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the
  // records. Otherwise, if the task is over allocated memory, then without freeing the memory
  // pages, we might not be able to get memory for the pointer array.
  taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
  return spillSize;
}

writeSortedFile()方法:

private void writeSortedFile(boolean isLastFile) {

  // This call performs the actual sort.
  // 返回一個排序好的迭代器
  final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
    inMemSorter.getSortedIterator();

  // If there are no sorted records, so we don't need to create an empty spill file.
  if (!sortedRecords.hasNext()) {
    return;
  }

  final ShuffleWriteMetricsReporter writeMetricsToUse;

  // 若是爲true,則爲輸出文件,不然爲溢寫文件
  if (isLastFile) {
    // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
    writeMetricsToUse = writeMetrics;
  } else {
    // We're spilling, so bytes written should be counted towards spill rather than write.
    // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
    // them towards shuffle bytes written.
    writeMetricsToUse = new ShuffleWriteMetrics();
  }

  // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
  // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
  // data through a byte array. This array does not need to be large enough to hold a single
  // record;
  // 建立一個字節緩衝數組,大小爲1m
  final byte[] writeBuffer = new byte[diskWriteBufferSize];

  // Because this output will be read during shuffle, its compression codec must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more details.
  // 建立一個臨時的shuffle block
  final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
    blockManager.diskBlockManager().createTempShuffleBlock();
  // 獲取文件和id
  final File file = spilledFileInfo._2();
  final TempShuffleBlockId blockId = spilledFileInfo._1();
  final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);

  // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
  // Our write path doesn't actually use this serializer (since we end up calling the `write()`
  // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
  // around this, we pass a dummy no-op serializer.
  // 不作任何轉換的序列化器,由於須要一個實例來構造DiskBlockObjectWriter
  final SerializerInstance ser = DummySerializerInstance.INSTANCE;

  int currentPartition = -1;
  final FileSegment committedSegment;
  try (DiskBlockObjectWriter writer =
      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {

    final int uaoSize = UnsafeAlignedOffset.getUaoSize();
    // 遍歷
    while (sortedRecords.hasNext()) {
      sortedRecords.loadNext();
      final int partition = sortedRecords.packedRecordPointer.getPartitionId();
      assert (partition >= currentPartition);
      if (partition != currentPartition) {
        // Switch to the new partition
        // 若是切換到了新的分區,提交當前分區,而且記錄當前分區大小
        if (currentPartition != -1) {
          final FileSegment fileSegment = writer.commitAndGet();
          spillInfo.partitionLengths[currentPartition] = fileSegment.length();
        }
        // 而後切換到下一個分區
        currentPartition = partition;
      }

      // 獲取指針,經過指針獲取頁號和偏移量
      final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
      final Object recordPage = taskMemoryManager.getPage(recordPointer);
      final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
      // 獲取剩餘數據
      int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);
      // 跳過數據前面存儲的長度
      long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length
      while (dataRemaining > 0) {
        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
        // 將數據拷貝到緩衝數組中
        Platform.copyMemory(
          recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
        // 從緩衝數組中轉入DiskBlockObjectWriter
        writer.write(writeBuffer, 0, toTransfer);
        // 更新位置
        recordReadPosition += toTransfer;
        // 更新剩餘數據
        dataRemaining -= toTransfer;
      }
      writer.recordWritten();
    }

    // 提交
    committedSegment = writer.commitAndGet();
  }
  // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
  // then the file might be empty. Note that it might be better to avoid calling
  // writeSortedFile() in that case.
  // 記錄溢寫文件的列表
  if (currentPartition != -1) {
    spillInfo.partitionLengths[currentPartition] = committedSegment.length();
    spills.add(spillInfo);
  }

  // 若是是溢寫文件,更新溢寫的指標
  if (!isLastFile) {  
    writeMetrics.incRecordsWritten(
      ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());
    taskContext.taskMetrics().incDiskBytesSpilled(
      ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());
  }
}

encodePageNumberAndOffset()方法以下:

public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
  // 若是開啓了堆外內存,偏移量爲絕對地址,可能須要64位進行編碼,因爲頁大小限制,將其減去當前頁的基地址,變爲相對地址
  if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
    // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
    // encode. Due to our page size limitation, though, we can convert this into an offset that's
    // relative to the page's base offset; this relative offset will fit in 51 bits.
    offsetInPage -= page.getBaseOffset();
  }
  return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
}

@VisibleForTesting
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
  assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
  // 高13位爲頁號,低51位爲偏移量
  // 頁號左移51位,再拼偏移量和上一個低51位都爲1的掩碼0x7FFFFFFFFFFFFL
  return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
}

ShuffleInMemorySorter的insertRecord()方法以下:

public void insertRecord(long recordPointer, int partitionId) {
  if (!hasSpaceForAnotherRecord()) {
    throw new IllegalStateException("There is no space for new record");
  }
  array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
  pos++;
}

PackedRecordPointer.packPointer()方法:

public static long packPointer(long recordPointer, int partitionId) {
  assert (partitionId <= MAXIMUM_PARTITION_ID);
  // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
  // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
  // 將頁號右移24位,和低27位拼在一塊兒,這樣邏輯地址被壓縮成40位
  final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
  final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
  // 將分區號放在高24位上
  return (((long) partitionId) << 40) | compressedAddress;
}

getSortedIterator()方法:

public ShuffleSorterIterator getSortedIterator() {
  int offset = 0;
  // 使用基數排序對內存分區ID進行排序。基數排序要快得多,可是在添加指針時須要額外的內存做爲保留內存
  if (useRadixSort) {
    offset = RadixSort.sort(
      array, pos,
      PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,
      PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false);
    // 不然採用timSort排序
  } else {
    MemoryBlock unused = new MemoryBlock(
      array.getBaseObject(),
      array.getBaseOffset() + pos * 8L,
      (array.size() - pos) * 8L);
    LongArray buffer = new LongArray(unused);
    Sorter<PackedRecordPointer, LongArray> sorter =
      new Sorter<>(new ShuffleSortDataFormat(buffer));

    sorter.sort(array, 0, pos, SORT_COMPARATOR);
  }
  return new ShuffleSorterIterator(pos, array, offset);
}

3.2.2 closeAndWriteOutput()

@VisibleForTesting
void closeAndWriteOutput() throws IOException {
  assert(sorter != null);
  updatePeakMemoryUsed();
  serBuffer = null;
  serOutputStream = null;
  // 獲取溢寫文件
  final SpillInfo[] spills = sorter.closeAndGetSpills();
  sorter = null;
  final long[] partitionLengths;
  try {
    // 合併溢寫文件
    partitionLengths = mergeSpills(spills);
  } finally {
    // 刪除溢寫文件
    for (SpillInfo spill : spills) {
      if (spill.file.exists() && !spill.file.delete()) {
        logger.error("Error while deleting spill file {}", spill.file.getPath());
      }
    }
  }
  // 更新mapstatus
  mapStatus = MapStatus$.MODULE$.apply(
    blockManager.shuffleServerId(), partitionLengths, mapId);
}

mergeSpills()方法:

private long[] mergeSpills(SpillInfo[] spills) throws IOException {
  long[] partitionLengths;
  // 若是沒有溢寫文件,建立空的
  if (spills.length == 0) {
    final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents
        .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
    return mapWriter.commitAllPartitions();
    // 若是隻有一個溢寫文件,將它合併輸出
  } else if (spills.length == 1) {
    Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
        shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);
    if (maybeSingleFileWriter.isPresent()) {
      // Here, we don't need to perform any metrics updates because the bytes written to this
      // output file would have already been counted as shuffle bytes written.
      partitionLengths = spills[0].partitionLengths;
      maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths);
    } else {
      partitionLengths = mergeSpillsUsingStandardWriter(spills);
    }
    // 若是有多個,合併輸出,合併的時候有NIO和BIO兩種方式
  } else {
    partitionLengths = mergeSpillsUsingStandardWriter(spills);
  }
  return partitionLengths;
}

3.3 SortShuffleWriter

SortShuffleWriter會使用PartitionedAppendOnlyMap或PartitionedPariBuffer在內存中進行排序,若是超過內存限制,會溢寫到文件中,在全局輸出有序文件的時候,對以前的全部輸出文件和當前內存中的數據進行全局歸併排序,對key相同的元素會使用定義的function進行聚合,入口爲write()方法:

override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 建立一個外部排序器,若是map端有預聚合,就傳入aggregator和keyOrdering,不然不須要傳入
  sorter = if (dep.mapSideCombine) {
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  // 將數據放入ExternalSorter進行排序
  sorter.insertAll(records)

  // Don't bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  // 建立一個輸出Wrtier
  val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
    dep.shuffleId, mapId, dep.partitioner.numPartitions)
  // 將外部排序的數據寫入Writer
  sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
  val partitionLengths = mapOutputWriter.commitAllPartitions()
  // 更新mapstatus
  mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

insertAll()方法:

def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
  val shouldCombine = aggregator.isDefined

  // 是否須要map端聚合
  if (shouldCombine) {
    // Combine values in-memory first using our AppendOnlyMap
    // 使用AppendOnlyMap在內存中聚合values
    // 獲取mergeValue()函數,將新值合併到當前聚合結果中
    val mergeValue = aggregator.get.mergeValue
    // 獲取createCombiner()函數,建立聚合初始值
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    // 若是一個key當前有聚合值,則合併,若是沒有建立初始值
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    // 遍歷
    while (records.hasNext) {
      // 增長讀取記錄數
      addElementsRead()
      kv = records.next()
      // map爲PartitionedAppendOnlyMap,將分區和key做爲key,聚合值做爲value
      map.changeValue((getPartition(kv._1), kv._1), update)
      // 是否須要溢寫到磁盤
      maybeSpillCollection(usingMap = true)
    }
    // 若是不須要map端聚合
  } else {
    // Stick values into our buffer
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
      // buffer爲PartitionedPairBuffer,將分區和key加進去
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      // 是否須要溢寫到磁盤
      maybeSpillCollection(usingMap = false)
    }
  }
}

該方法主要是判斷在插入數據時,是否須要在map端進行預聚合,分別採用兩種數據結構來保存

maybeSpillCollection()方法裏面會調用maybeSpill()方法檢查是否須要溢寫,若是發生溢寫,從新構造一個map或者buffer結構從頭開始緩存,以下:

private def maybeSpillCollection(usingMap: Boolean): Unit = {
  var estimatedSize = 0L
  if (usingMap) {
    estimatedSize = map.estimateSize()
    // 判斷是否須要溢寫
    if (maybeSpill(map, estimatedSize)) {
      map = new PartitionedAppendOnlyMap[K, C]
    }
  } else {
    estimatedSize = buffer.estimateSize()
    // 判斷是否須要溢寫
    if (maybeSpill(buffer, estimatedSize)) {
      buffer = new PartitionedPairBuffer[K, C]
    }
  }

  if (estimatedSize > _peakMemoryUsedBytes) {
    _peakMemoryUsedBytes = estimatedSize
  }
}


  protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    var shouldSpill = false
    // 若是讀取的記錄數是32的倍數,而且預估map或者buffer內存佔用大於默認的5m閾值
    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
      // Claim up to double our current memory from the shuffle memory pool
      // 嘗試申請2*currentMemory-5m的內存
      val amountToRequest = 2 * currentMemory - myMemoryThreshold
      val granted = acquireMemory(amountToRequest)
      // 更新閾值
      myMemoryThreshold += granted
      // If we were granted too little memory to grow further (either tryToAcquire returned 0,
      // or we already had more memory than myMemoryThreshold), spill the current collection
      // 判斷,若是仍是不夠,肯定溢寫
      shouldSpill = currentMemory >= myMemoryThreshold
    }
    // 若是shouldSpill爲false,可是讀取的記錄數大於Integer.MAX_VALUE,也是須要溢寫
    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    // Actually spill
    if (shouldSpill) {
      // 溢寫次數+1
      _spillCount += 1
      logSpillage(currentMemory)
      // 溢寫緩存的集合
      spill(collection)
      _elementsRead = 0
      _memoryBytesSpilled += currentMemory
      // 釋放內存
      releaseMemory()
    }
    shouldSpill
  }

maybeSpill()方法裏面會調用spill()進行溢寫,以下:

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    // 根據給定的比較器進行排序,返回排序結果的迭代器
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    // 將迭代器中的數據溢寫到磁盤文件中
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    // ArrayBuffer記錄全部溢寫的文件
    spills += spillFile
  }

spillMemoryIteratorToDisk()方法以下:

private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
    : SpilledFile = {
  // Because these files may be read during shuffle, their compression must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more context.
  // 建立一個臨時塊
  val (blockId, file) = diskBlockManager.createTempShuffleBlock()

  // These variables are reset after each flush
  var objectsWritten: Long = 0
  val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
  // 建立溢寫文件的DiskBlockObjectWriter
  val writer: DiskBlockObjectWriter =
    blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)

  // List of batch sizes (bytes) in the order they are written to disk
  // 記錄寫入批次大小
  val batchSizes = new ArrayBuffer[Long]

  // How many elements we have in each partition
  // 記錄每一個分區條數
  val elementsPerPartition = new Array[Long](numPartitions)

  // Flush the disk writer's contents to disk, and update relevant variables.
  // The writer is committed at the end of this process.
  // 將內存中的數據按批次刷寫到磁盤中
  def flush(): Unit = {
    val segment = writer.commitAndGet()
    batchSizes += segment.length
    _diskBytesSpilled += segment.length
    objectsWritten = 0
  }

  var success = false
  try {
    // 遍歷map或者buffer中的記錄
    while (inMemoryIterator.hasNext) {
      val partitionId = inMemoryIterator.nextPartition()
      require(partitionId >= 0 && partitionId < numPartitions,
        s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
      // 寫入並更新計數值
      inMemoryIterator.writeNext(writer)
      elementsPerPartition(partitionId) += 1
      objectsWritten += 1

      // 寫入條數達到10000條時,將這批刷寫到磁盤
      if (objectsWritten == serializerBatchSize) {
        flush()
      }
    }
    // 遍歷完之後,將剩餘的刷寫到磁盤
    if (objectsWritten > 0) {
      flush()
    } else {
      writer.revertPartialWritesAndClose()
    }
    success = true
  } finally {
    if (success) {
      writer.close()
    } else {
      // This code path only happens if an exception was thrown above before we set success;
      // close our stuff and let the exception be thrown further
      writer.revertPartialWritesAndClose()
      if (file.exists()) {
        if (!file.delete()) {
          logWarning(s"Error deleting ${file}")
        }
      }
    }
  }

  // 返回溢寫文件
  SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}

接下來就是排序合併操做,調用ExternalSorter.writePartitionedMapOutput()方法:

def writePartitionedMapOutput(
    shuffleId: Int,
    mapId: Long,
    mapOutputWriter: ShuffleMapOutputWriter): Unit = {
  var nextPartitionId = 0
  // 若是沒有發生溢寫
  if (spills.isEmpty) {
    // Case where we only have in-memory data
    val collection = if (aggregator.isDefined) map else buffer
    // 根據指定的比較器進行排序
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    while (it.hasNext()) {
      val partitionId = it.nextPartition()
      var partitionWriter: ShufflePartitionWriter = null
      var partitionPairsWriter: ShufflePartitionPairsWriter = null
      TryUtils.tryWithSafeFinally {
        partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)
        val blockId = ShuffleBlockId(shuffleId, mapId, partitionId)
        partitionPairsWriter = new ShufflePartitionPairsWriter(
          partitionWriter,
          serializerManager,
          serInstance,
          blockId,
          context.taskMetrics().shuffleWriteMetrics)
        // 將分區內的數據依次取出
        while (it.hasNext && it.nextPartition() == partitionId) {
          it.writeNext(partitionPairsWriter)
        }
      } {
        if (partitionPairsWriter != null) {
          partitionPairsWriter.close()
        }
      }
      nextPartitionId = partitionId + 1
    }
    // 若是發生溢寫,將溢寫文件和緩存數據進行歸併排序,排序完成後按照分區依次寫入ShufflePartitionPairsWriter
  } else {
    // We must perform merge-sort; get an iterator by partition and write everything directly.
    // 這裏會進行歸併排序
    for ((id, elements) <- this.partitionedIterator) {
      val blockId = ShuffleBlockId(shuffleId, mapId, id)
      var partitionWriter: ShufflePartitionWriter = null
      var partitionPairsWriter: ShufflePartitionPairsWriter = null
      TryUtils.tryWithSafeFinally {
        partitionWriter = mapOutputWriter.getPartitionWriter(id)
        partitionPairsWriter = new ShufflePartitionPairsWriter(
          partitionWriter,
          serializerManager,
          serInstance,
          blockId,
          context.taskMetrics().shuffleWriteMetrics)
        if (elements.hasNext) {
          for (elem <- elements) {
            partitionPairsWriter.write(elem._1, elem._2)
          }
        }
      } {
        if (partitionPairsWriter != null) {
          partitionPairsWriter.close()
        }
      }
      nextPartitionId = id + 1
    }
  }

  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
  context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
}

partitionedIterator()方法:

def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  val usingMap = aggregator.isDefined
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  if (spills.isEmpty) {
    // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
    // we don't even need to sort by anything other than partition ID
    // 若是沒有溢寫,而且沒有排序,只按照分區id排序
    if (ordering.isEmpty) {
      // The user hasn't requested sorted keys, so only sort by partition ID, not key
      groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
      // 若是沒有溢寫可是排序,先按照分區id排序,再按key排序
    } else {
      // We do need to sort by both partition ID and key
      groupByPartition(destructiveIterator(
        collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
    }
  } else {
    // Merge spilled and in-memory data
    // 若是有溢寫,就將溢寫文件和內存中的數據歸併排序
    merge(spills, destructiveIterator(
      collection.partitionedDestructiveSortedIterator(comparator)))
  }
}

歸併方法以下:

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
    : Iterator[(Int, Iterator[Product2[K, C]])] = {
  // 讀取溢寫文件
  val readers = spills.map(new SpillReader(_))
  val inMemBuffered = inMemory.buffered
  // 遍歷分區
  (0 until numPartitions).iterator.map { p =>
    val inMemIterator = new IteratorForPartition(p, inMemBuffered)
    // 合併溢寫文件和內存中的數據
    val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
    // 若是有聚合邏輯,按分區聚合,對key按照keyComparator排序
    if (aggregator.isDefined) {
      // Perform partial aggregation across partitions
      (p, mergeWithAggregation(
        iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
      // 若是沒有聚合,可是有排序邏輯,按照ordering作歸併
    } else if (ordering.isDefined) {
      // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
      // sort the elements without trying to merge them
      (p, mergeSort(iterators, ordering.get))
      // 什麼都沒有直接歸併
    } else {
      (p, iterators.iterator.flatten)
    }
  }
}

在write()方法中調用commitAllPartitions()方法輸出數據,其中調用writeIndexFileAndCommit()方法寫出數據和索引文件,以下:

def writeIndexFileAndCommit(
    shuffleId: Int,
    mapId: Long,
    lengths: Array[Long],
    dataTmp: File): Unit = {
  // 建立索引文件和臨時索引文件
  val indexFile = getIndexFile(shuffleId, mapId)
  val indexTmp = Utils.tempFileWith(indexFile)
  try {
    // 獲取shuffle data file
    val dataFile = getDataFile(shuffleId, mapId)
    // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure
    // the following check and rename are atomic.
    // 對於每一個executor只有一個IndexShuffleBlockResolver,確保原子性
    synchronized {
      // 檢查索引是否和數據文件已經有了對應關係
      val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
      if (existingLengths != null) {
        // Another attempt for the same task has already written our map outputs successfully,
        // so just use the existing partition lengths and delete our temporary map outputs.
        // 若是存在對應關係,說明shuffle write已經完成,刪除臨時索引文件
        System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
        if (dataTmp != null && dataTmp.exists()) {
          dataTmp.delete()
        }
      } else {
        // 若是不存在,建立一個BufferedOutputStream
        // This is the first successful attempt in writing the map outputs for this task,
        // so override any existing index and data files with the ones we wrote.
        val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
        Utils.tryWithSafeFinally {
          // We take in lengths of each block, need to convert it to offsets.
          // 獲取每一個分區的大小,累加偏移量,寫入臨時索引文件
          var offset = 0L
          out.writeLong(offset)
          for (length <- lengths) {
            offset += length
            out.writeLong(offset)
          }
        } {
          out.close()
        }

        // 刪除可能存在的其餘索引文件
        if (indexFile.exists()) {
          indexFile.delete()
        }
        // 刪除可能存在的其餘數據文件
        if (dataFile.exists()) {
          dataFile.delete()
        }
        // 將臨時文件重命名成正式文件
        if (!indexTmp.renameTo(indexFile)) {
          throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
        }
        if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
          throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
        }
      }
    }
  } finally {
    if (indexTmp.exists() && !indexTmp.delete()) {
      logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
    }
  }
}

4.小結

  • Spark在初始化SparkEnv的時候,會在create()方法裏面初始化ShuffleManager,包含sort和tungsten-sort兩種shuffle

  • ShuffleManager是一個特質,核心方法有registerShuffle()、getReader()、getWriter(),

  • SortShuffleManager是ShuffleManager的惟一實現類,在registerShuffle()方法裏面選擇採用哪一種shuffle機制,getReader()方法只會返回一種BlockStoreShuffleReader,getWriter()方法根據不一樣的handle選擇不一樣的Writer,共有三種

  • BypassMergeSortShuffleWriter:若是當前shuffle依賴中沒有map端的聚合操做,而且分區數小於spark.shuffle.sort.bypassMergeThreshold的值,默認爲200,啓用bypass機制,核心方法有:write()、writePartitionedData()(合併全部分區文件,默認採用零拷貝方式)

  • UnsafeShuffleWriter:若是serializer支持relocation而且map端沒有聚合同時分區數目不大於16777215+1三個條件都知足,採用該Writer,核心方法有:write()、insertRecordIntoSorter()(將數據插入外部選擇器排序)、closeAndWriteOutput()(合併並輸出文件),前一個方法裏核心方法有:insertRecord()(將序列化數據插入外部排序器)、growPointerArrayIfNecessary()(若是須要額外空間須要對數組擴容或溢寫到磁盤)、spill()(溢寫到磁盤)、writeSortedFile()(將內存中的數據進行排序並寫出到磁盤文件中)、encodePageNumberAndOffset()(對當前數據的邏輯地址進行編碼,轉成long型),後面的方法裏核心方法有:mergeSpills()(合併溢寫文件),合併文件的時候有BIO和NIO兩種方式

  • SortShuffleWriter:若是上面二者都不知足,採用該Writer,該Writer會使用PartitionedAppendOnlyMap或PartitionedPariBuffer在內存中進行排序,若是超過內存限制,會溢寫到文件中,在全局輸出有序文件的時候,對以前的全部輸出文件和當前內存中的數據進行全局歸併排序,對key相同的元素會使用定義的function進行聚合核心方法有:write()、insertAll()(將數據放入ExternalSorter進行排序)、maybeSpillCollection()(是否須要溢寫到磁盤)、maybeSpill()、spill()、spillMemoryIteratorToDisk()(將內存中數據溢寫到磁盤)、writePartitionedMapOutput()、commitAllPartitions()裏面調用writeIndexFileAndCommit()方法寫出數據和索引文件

相關文章
相關標籤/搜索