Spark ShuffleManager內存緩衝器UnsafeShuffleWriter設計思路剖析

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 從ShuffeManager講起

一張圖我已經用過屢次了,不要見怪,由於畢竟都是一個主題,有關shuffle的。英文註釋已經很詳細了,這裏簡單介紹一下:數組

  • 目前只有一個實現 SortShuffleManager。
  • SortShuffleManager依賴於ShuffleWriter提供服務,經過ShuffleWriter定義的規範,能夠將MapTask的任務中間結果按照約束的規範持久化到磁盤。
  • SortShuffleManager總共有三個子類, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。
  • SortShuffleManager依賴於ShuffleHandle樣例類,主要仍是負責向Task傳遞Shuffle信息。一個是序列化,一個是肯定什麼時候繞開合併和排序的Shuffle路徑。

官方英文介紹以下:緩存

* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.
複製代碼

2 UnsafeShuffleWriter的骨幹成員

static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;架構

  • private final BlockManager blockManager;app

  • private final IndexShuffleBlockResolver shuffleBlockResolver;框架

  • private final TaskMemoryManager memoryManager;oop

  • private final SerializerInstance serializer;post

  • private final Partitioner partitioner;學習

  • private final ShuffleWriteMetrics writeMetrics;this

  • private final int shuffleId;

  • private final int mapId;

  • private final TaskContext taskContext;

  • private final SparkConf sparkConf;

  • private final boolean transferToEnabled => 是否採用NIO的從文件流待文件流的複製方式,spark.file.transferTo屬性配置,默認是true。

  • private final int initialSortBufferSize =>初始化的排序緩衝大小,能夠經過spark.shuffle.sort.initialBuffer.size屬性設置,默認是4096

  • private final int inputBufferSizeInBytes;

  • private final int outputBufferSizeInBytes;

  • @Nullable private MapStatus mapStatus;

  • @Nullable private ShuffleExternalSorter sorter;

  • private long peakMemoryUsedBytes = 0; =>使用內存的峯值

3 UnsafeShuffleWriter核心實現方法Writer

看看精彩的代碼段:

public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
        // Keep track of success so we know if we encountered an exception
        // We do this rather than a standard try/catch/re-throw to handle
        // generic throwables.
        boolean success = false;
        try {
          while (records.hasNext()) {
          
            insertRecordIntoSorter(records.next());          <=點睛之筆(將mapTask數據寫入排序器)
            
          }
          
          
          closeAndWriteOutput();          <=點睛之筆(將mapTask數據持久化到磁盤)
          
          
          success = true;
        } finally {
          if (sorter != null) {
            try {
              sorter.cleanupResources();
            } catch (Exception e) {
              // Only throw this error if we won't be masking another
              // error.
              if (success) {
                throw e;
              } else {
                logger.error("In addition to a failure during writing, we failed during " +
                             "cleanup.", e);
              }
            }
          }
        }
      }
複製代碼

4 UnsafeShuffleWriter核心實現方法insertRecordIntoSorter

將mapTask數據寫入排序器,實現內存中排序,可是無聚合

void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
    assert(sorter != null);
    final K key = record._1();
    
    final int partitionId = partitioner.getPartition(key);   <=點睛之筆
    
    serBuffer.reset();
    serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
    serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
    serOutputStream.flush();

    final int serializedRecordSize = serBuffer.size();
    assert (serializedRecordSize > 0);

    sorter.insertRecord(                              <=點睛之筆,將serBuffer字節數組寫入Tungsten
      serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
  }
複製代碼

5 UnsafeShuffleWriter核心實現方法closeAndWriteOutput

將mapTask數據持久化到磁盤

void closeAndWriteOutput() throws IOException {
    assert(sorter != null);
    updatePeakMemoryUsed();
    serBuffer = null;
    serOutputStream = null;
    final SpillInfo[] spills = sorter.closeAndGetSpills();
    sorter = null;
    final long[] partitionLengths;
    final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    final File tmp = Utils.tempFileWith(output);
    try {
      try {
      
        partitionLengths = mergeSpills(spills, tmp);         <=點睛之筆(合併全部溢出文件爲正式Block文件)
        
      } finally {
        for (SpillInfo spill : spills) {
          if (spill.file.exists() && ! spill.file.delete()) {
            logger.error("Error while deleting spill file {}", spill.file.getPath());
          }
        }
      }
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);   <=點睛之筆(寫索引)
      
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
  }
複製代碼

5 總結

UnsafeShuffleWriter內部主要使用Tungsten緩存,固然也可能使用JVM內存。和ExternalSortWriter有明顯的區別。

秦凱新 於深圳 1:19

相關文章
相關標籤/搜索