Spark ShuffleManager內存緩衝器BypassMergeSortShuffleWriter設計思路剖析-Spark商業環境實戰


1 從ShuffeManager講起


  • 目前只有一個實現 SortShuffleManager。
  • SortShuffleManager依賴於ShuffleWriter提供服務,經過ShuffleWriter定義的規範,能夠將MapTask的任務中間結果按照約束的規範持久化到磁盤。
  • SortShuffleManager總共有三個子類, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
  • SortShuffleManager依賴於ShuffleHandle樣例類,主要仍是負責向Task傳遞Shuffle信息。一個是序列化,一個是肯定什麼時候繞開合併和排序的Shuffle路徑。


* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.

1 華山論劍之BypassMergeSortShuffleWriter


* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
 * writes incoming records to separate files, one file per reduce partition, then concatenates these
 * per-partition files to form a single output file, regions of which are served to reducers.
 * Records are not buffered in memory. It writes output in a format

2 華山論劍之成員力量


2.1 BypassMergeSortShuffleWriter的孩子:

  • partitionWriters : 看看初始化爲數組 ==> private DiskBlockObjectWriter[] partitionWriters,每個DiskBlockObjectWriter負責處理一個分區的數據。框架

  • private final int fileBufferSize ==>文件緩衝大小,經過Spark.shuffle.file.buffer屬性配置,默認是32KB。oop

  • private final boolean transferToEnabled => 是否採用NIO的從文件流待文件流的複製方式,spark.file.transferTo屬性配置,默認是true。post

  • private final int numPartitions => 分區數學習

  • private final BlockManager blockManager優化

  • private final Partitioner partitioner => 分區計算器

  • private final ShuffleWriteMetrics writeMetrics

  • private final int shuffleId;

  • private final int mapId ==>map任務的身份標識。

  • private final Serializer serializer;

  • private final IndexShuffleBlockResolver shuffleBlockResolver

  • private FileSegment[] partitionWriterSegments ==>FileSegment數組,每個DiskBlockObjectWriter對應一個分區,也所以對應一個處理的文件片。

  • @Nullable private MapStatus mapStatus;

  • private long[] partitionLengths;

2 BypassMergeSortShuffleWriter核心實現方法Writer


public void write(Iterator<Product2<K, V>> records) throws IOException {
        assert (partitionWriters == null);
        if (!records.hasNext()) {
          partitionLengths = new long[numPartitions];
          shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
          mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
        final SerializerInstance serInstance = serializer.newInstance();
        final long openStartTime = System.nanoTime();
        partitionWriters = new DiskBlockObjectWriter[numPartitions];    <=點睛之筆
        partitionWriterSegments = new FileSegment[numPartitions];       <=點睛之筆
        for (int i = 0; i < numPartitions; i++) {              <=點睛之筆(按照分區來寫片斷)
          final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
          final File file = tempShuffleBlockIdPlusFile._2();
          final BlockId blockId = tempShuffleBlockIdPlusFile._1();
          partitionWriters[i] =                     <=點睛之筆(獲得不一樣分區的DiskBlockObjectWriter)
            blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
        // Creating the file to write to and creating a disk writer both involve interacting with
        // the disk, and can take a long time in aggregate when we open many files, so should be
        // included in the shuffle write time.
        writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
        while (records.hasNext()) {
          final Product2<K, V> record =;
          final K key = record._1();
          partitionWriters[partitioner.getPartition(key)].write(key, record._2());
        for (int i = 0; i < numPartitions; i++) {
          final DiskBlockObjectWriter writer = partitionWriters[i];
          partitionWriterSegments[i] = writer.commitAndGet();  <= 生成一堆臨時文件,寫入到磁盤
        File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);  <==獲取一堆臨時文件
        File tmp = Utils.tempFileWith(output);
        try {
          partitionLengths = writePartitionedFile(tmp);   <==多個分區文件合併
          shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
        mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);

2 BypassMergeSortShuffleWriter核心實現方法writePartitionedFile


Concatenate all of the per-partition files into a single combined file.

    private long[] writePartitionedFile(File outputFile) throws IOException {
        // Track location of the partition starts in the output file
        final long[] lengths = new long[numPartitions];
        if (partitionWriters == null) {
          // We were passed an empty iterator
          return lengths;
        final FileOutputStream out = new FileOutputStream(outputFile, true);
        final long writeStartTime = System.nanoTime();
        boolean threwException = true;
        try {
          for (int i = 0; i < numPartitions; i++) {
            final File file = partitionWriterSegments[i].file();
            if (file.exists()) {
              final FileInputStream in = new FileInputStream(file);
              boolean copyThrewException = true;
              try {
                lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
                copyThrewException = false;
              } finally {
                Closeables.close(in, copyThrewException);
              if (!file.delete()) {
                logger.error("Unable to delete file for partition {}", i);
          threwException = false;
        } finally {
          Closeables.close(out, threwException);
          writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
        partitionWriters = null;
        return lengths;

3 BypassMergeSortShuffleWriter核心shuffle write流程

  • 根據分區ID,爲每個分區建立DiskBlockObjectWriter
  • 按照分區ID升序寫入正式的Shuffle數據文件
  • 最終經過writeIndexFileAndCommit創建MapTask輸出的數據索引


4 總結

本節內容是做者投入大量時間優化後的內容,採用最平實的語言來剖析 ShuffeManager之統一存儲服務BypassMergeSortShuffleWriter設計思路。

秦凱新 於深圳 0:53分
