ExternalSorter 外部排序器在Spark Shuffle過程當中的設計思路剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

Spark商業環境實戰及調優進階系列

1 ShuffleExternalSorter 外部排序器江湖地位

1. Shuffle 前置數據結構基礎

1.1 內存緩衝區的衍生拓撲圖

上面圖展現了Spark基於任務緩存的數據結構,以下簡要介紹不一樣數據結構的用途:數組

  • AppendOnlyMap:封裝了任務Task基於內存進行插入,更新,聚合,排序等基本操做方法。緩存

  • SizeTrackingAppendOnlyMap:以自身的大小進行樣本採集和大小估算。數據結構

  • PartionedAppendOnlyMap:架構

    (1)重載了特質WritablePartitionedPairCollection的partitionedDestructiveSortedIterator方法,在該虛方法中調用了AppendOnlyMap的destructiveSortedIterator對底層數組進行整理和排序後得到迭代器。app

    (2)重載了特質WritablePartitionedPairCollection的insert方法,使其插入時,根據分區(partition, key)做爲key。框架

  • PartitionedPairBuffer:內存緩存結構,主要功能是插入值是有順序的,主要起緩衝做用,只有順序插入,沒有更新和聚合操做。也即沒有changeValue(聚合)和update(更新)操做。ide

2. ShuffleManager 的使命

2.1 ShuffleManager 邏輯架構設計

ShuffleManager是一個特質,其惟一的實現類是 SortShuffleManager,2.0移除了HashShuffleManager。那麼他的主要功能是什麼呢?主要是用來對shuffle進行管理。SortShuffleManager依賴於存儲體系,來完成shuffle過程當中MapTask任務數據到內存(AppendOnlyMap)到Spill到磁盤的過程,該過程會涉及到排序,聚合等複雜操做。函數

3. ExternalSorter 架構精妙設計

3.1 ExternalSorter 功能分析

由上圖2.1 邏輯架構所示, ExternalSorter 是SortShuffleManager的底層組件之一,提供的主要功能以下;oop

  • (1) 將Map任務的輸出按照分區存儲到JVM的堆中,若是指定了聚合函數,則還會對數據進行聚合,
  • (2) 使用分區計算器將key分組到各個分區,而後使用自定義比較器對每個分區的鍵進行可選的排序。
  • (3) 將每一個分區輸出到單個文件中,經過索引文件進行檢索,經過reduce端的shuffle獲取。

3.2 ExternalSorter主要成員

  • aggregator :對map任務的輸出數據進行聚合的聚合器。

  • partioner:對map任務的輸出數據按照key計算分區的分區計算器Partioner。

  • ordering: 對map任務的輸出數據按照key進行排序的實現類

  • map: 發現什麼沒?聚合時使用。 new PartitionedAppendOnlyMap[K, C],當設置了聚合器時,map端將中間結果溢出到磁盤前,先利用此數據結構在內存中對中間結果進行聚合。

    Data structures to store in-memory objects before we spill. Depending on whether we have an
       Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
       store them in an array buffer.
    複製代碼
  • buffer : 發現什麼沒?不聚合時使用,new PartitionedPairBuffer[K, C],當沒有設置聚合器時,map端將中間結果溢出到磁盤前,先利用此數據結構將中間結果存儲到內存中。

  • keyComparator : 用於在分區內按照key的hash值進行比較的默認比較器。

  • spills :緩存溢出的文件數組,new ArrayBuffer[SpilledFile],代碼段以下:

    private[this] case class SpilledFile(
     file: File,
     blockId: BlockId,
     serializerBatchSizes: Array[Long],
     elementsPerPartition: Array[Long])
    複製代碼
  • _peakMemoryUsedBytes : 內存中數據結構大小的峯值,用於maybeSpillCollection判斷。

    Peak size of the in-memory data structure observed so far, in bytes
    複製代碼
  • initialMemoryThreshold : 對集合的內存使用進行跟蹤的初始內存閾值,由SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)來決定。

  • myMemoryThreshold :初始時等於initialMemoryThreshold,用於maybe Spill的判斷。

    3.3 ExternalSorter核心設計-內存緩衝

    spark map任務在執行結束後,會將數據寫入磁盤,注意最終都會落盤。但在寫入磁盤以前,spark會對Map任務的輸出在內存中進行排序或者聚合。注意在內存中,不是在磁盤中。看下面源碼吧,清晰的說明了一切。

    • mergeValue 函數做用在於將新的Value值合併到聚合的結果中。

    • createCombiner 函數用於建立聚合的初始值。

    • update 函數做用是當有新的value值時,即(records.hasNext有值時),update函數調用mergeValue將新的value值合併到以前聚合的結果中。不然會調用createCombiner函數以value做爲聚合的初始值。

    • 將分區索引與key做爲調用AppendOnlyMap的changeValue方法的參數能夠。

    • maybeSpillCollection函數進行可能的磁盤溢出。

    • insertAll :可謂氣絕古今,一鼓作氣,若想實現內存緩衝,insertAll就是MapTask數據寫入入口。

      def insertAll(records: Iterator[Product2[K, V]]): Unit = {
           val shouldCombine = aggregator.isDefined
         if (shouldCombine) {
           // Combine values in-memory first using our AppendOnlyMap
           val mergeValue = aggregator.get.mergeValue
           val createCombiner = aggregator.get.createCombiner
           var kv: Product2[K, V] = null
           val update = (hadValue: Boolean, oldValue: C) => {
             if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
           }
           while (records.hasNext) {
             addElementsRead()
             kv = records.next()
      
             map.changeValue((getPartition(kv._1), kv._1), update)    ====>氣絕古今的亮點
             
             maybeSpillCollection(usingMap = true)
           }
         } else {
           // Stick values into our buffer
           while (records.hasNext) {
             addElementsRead()
             val kv = records.next()
             
             buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) ====>氣絕古今的亮點
             
             maybeSpillCollection(usingMap = false)
           }
         }
       }
      複製代碼

    3.4 ExternalSorter核心設計-內存溢出判斷

    • maybeSpillCollection 用因而否溢出的內存判斷,雖然ExternalSorter使用了PartitionedAppendOnlyMap和PartitionedPairBuffer,若數據量較少時不會有問題,一旦數據暴增時,將會引發系統OOM.

    • maybeSpillCollection控制了數據寫入磁盤的頻率,若shuffle寫入磁盤頻率太高,容易下降磁盤I/O的效率。

    • 使用了內存樣本採集和大小估算的PartitionedAppendOnlyMap和PartitionedPairBuffer。

      private def maybeSpillCollection(usingMap: Boolean): Unit = {
          var estimatedSize = 0L
          if (usingMap) {
            estimatedSize = map.estimateSize()
            if (maybeSpill(map, estimatedSize)) {
              map = new PartitionedAppendOnlyMap[K, C]
            }
          } else {
            estimatedSize = buffer.estimateSize()
            if (maybeSpill(buffer, estimatedSize)) {
              buffer = new PartitionedPairBuffer[K, C]
            }
          }
      
          if (estimatedSize > _peakMemoryUsedBytes) {
            _peakMemoryUsedBytes = estimatedSize
          }
        }
      複製代碼
  • 估算超過閾值,開始溢出操做,根據myMemoryThreshold進行判斷。

    * Spill some data to disk to release memory, which will be called by TaskMemoryManager
      * when there is not enough memory for the task.
    
       protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
         var shouldSpill = false
         if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
           // Claim up to double our current memory from the shuffle memory pool
           val amountToRequest = 2 * currentMemory - myMemoryThreshold
           val granted = acquireMemory(amountToRequest)
           myMemoryThreshold += granted
           // If we were granted too little memory to grow further (either tryToAcquire returned 0,
           // or we already had more memory than myMemoryThreshold), spill the current collection
           shouldSpill = currentMemory >= myMemoryThreshold
         }
         shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
         // Actually spill
         if (shouldSpill) {
           _spillCount += 1
           logSpillage(currentMemory)
           
           spill(collection)  ====>氣絕古今的亮點
           
           _elementsRead = 0
           _memoryBytesSpilled += currentMemory
           releaseMemory()
         }
         shouldSpill
       }
    複製代碼

3.4 ExternalSorter核心設計-內存數據溢出落盤

==>接下來的內容會至關精彩,由於我會很是詳細的介紹spark在內存使用上的獨孤九劍,Come on :

3.4.1 PartitionedAppendOnlyMap 第一絕招

由於PartitionedAppendOnlyMap的底層是散列存儲,所以溢出過程當中:

  • (1)首先會調用partitionedDestructiveSortedIterator方法,實現數組中元素向低索引端整理。
  • (2)而後根據指定的比較器對元素進行排序,先按照分區ID進行排序,而後會按照key進行排序。
  • (3)最後返回Iterator數據迭代器,用於內存數據向外迭代。

3.4.2 ExternalSorter 第二絕招

發現了什麼,調用了PartitionedAppendOnlyMap.destructiveSortedWritablePartitionedIterator,用到了獨孤九劍第一式。==>整理==>排序==>返回迭代器

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
        val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
        spills += spillFile
      } 
複製代碼

3.4.3 spillMemoryIteratorToDisk 第三絕招

  • (1)迭代寫入磁盤

  • (2)建立惟一的blockId和文件,調用diskBlockManager開始寫入文件。

  • (3)按照分區順序排序。

    private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
          : SpilledFile = {
        // Because these files may be read during shuffle, their compression must be controlled by
        // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
        // createTempShuffleBlock here; see SPARK-3426 for more context.
        val (blockId, file) = diskBlockManager.createTempShuffleBlock()
    
        // These variables are reset after each flush
        var objectsWritten: Long = 0
        val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
        val writer: DiskBlockObjectWriter =
          blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)
    
        // List of batch sizes (bytes) in the order they are written to disk
        val batchSizes = new ArrayBuffer[Long]
    
        // How many elements we have in each partition
        val elementsPerPartition = new Array[Long](numPartitions)
    
        // Flush the disk writer's contents to disk, and update relevant variables.
        // The writer is committed at the end of this process.
        def flush(): Unit = {
          val segment = writer.commitAndGet()
          batchSizes += segment.length
          _diskBytesSpilled += segment.length
          objectsWritten = 0
        }
    
        var success = false
        try {
          while (inMemoryIterator.hasNext) {
    
            val partitionId = inMemoryIterator.nextPartition()
            require(partitionId >= 0 && partitionId < numPartitions,
              s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
            inMemoryIterator.writeNext(writer)
            elementsPerPartition(partitionId) += 1
            objectsWritten += 1
    
            if (objectsWritten == serializerBatchSize) {
              flush()
            }
          }
          if (objectsWritten > 0) {
            flush()
          } else {
            writer.revertPartialWritesAndClose()
          }
          success = true
        } finally {
          if (success) {
            writer.close()
          } else {
            // This code path only happens if an exception was thrown above before we set success;
            // close our stuff and let the exception be thrown further
            writer.revertPartialWritesAndClose()
            if (file.exists()) {
              if (!file.delete()) {
                logWarning(s"Error deleting ${file}")
              }
            }
          }
    }
    
    SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
    複製代碼

    }

3.5 ExternalSorter核心設計-內存緩衝與溢出磁盤文件最終合併交付

ExternalSorter的writePartitionedFile閃亮登場,經過整合內存和多個溢寫文件,最終每一個MapTask只會生成一份正式的Block文件。

注意此時各個分區的數據按照分區ID和key的順序輸出的最終的正式文件中。也即每個MapTask只會生成一個磁盤文件。在Spark1.6以前的版本,由於HashSorter的存在,一個MapTask會生成N個bucket文件(數量有reduce Task而定)。

3.5.1 咱們接下來看看writePartitionedFile的代碼片斷吧:

  • 注意磁盤中若沒有溢出文件,spills數組爲空,則直接按照分區順序寫入磁盤便可。

  • 注意磁盤中如有溢出文件,spills數組不爲空,則開始讀取磁盤文件到內存,進行聚合排序後統一寫成正式的Block文件。

    * Write all the data added into this ExternalSorter into a file in the disk store. This is
       * called by the SortShuffleWriter.
    
         def writePartitionedFile(
              blockId: BlockId,
              outputFile: File): Array[Long] = {
        
            // Track location of each range in the output file
            val lengths = new Array[Long](numPartitions)
            val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
              context.taskMetrics().shuffleWriteMetrics)
        
            if (spills.isEmpty) {
              // Case where we only have in-memory data
              val collection = if (aggregator.isDefined) map else buffer
              val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
              while (it.hasNext) {
                val partitionId = it.nextPartition()                 ====>氣絕古今的亮點
                while (it.hasNext && it.nextPartition() == partitionId) {
                  it.writeNext(writer)
                }
                val segment = writer.commitAndGet()
                lengths(partitionId) = segment.length
              }
            } else {
            
              // We must perform merge-sort; get an iterator by partition and write everything directly.
              
              for ((id, elements) <- this.partitionedIterator) {      ====>氣絕古今的亮點
              
                if (elements.hasNext) {
                  for (elem <- elements) {
                    writer.write(elem._1, elem._2)
                  }
                  val segment = writer.commitAndGet()
                  lengths(id) = segment.length              
                }
              }
            }
        
            writer.close()
            context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
            context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
            context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
        
            lengths
          }
    複製代碼

3.5.2 spills數組不爲空,讓咱們接下來看看partitionedIterator的代碼片斷吧,氣絕古今的亮點在於封裝了merge方法,直接給出一個迭代器,根據此迭代器,能夠寫正式Block文件:

先看看Spark官方的解釋吧,經過groupByPartition會給每個分區生成一個IteratorForPartition迭代器:

* Return an iterator over all the data written to this object, grouped by partition and
       * aggregated by the requested aggregator. For each partition we then have an iterator over its
       * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
       * partition without reading the previous one). Guaranteed to return a key-value pair for each
       * partition, in order of partition ID.
       *
       * For now, we just merge all the spilled files in once pass, but this can be modified to
       * support hierarchical merging.
       * Exposed for testing.



        def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
        val usingMap = aggregator.isDefined
        val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
        if (spills.isEmpty) {      ====>氣絕古今的亮點,從新判斷一次
        
          * Special case: if we have only in-memory data, we don't need to merge streams, 
          *and perhaps we don't even need to sort by anything other than partition ID
          
          if (!ordering.isDefined) {
            // The user hasn't requested sorted keys, so only sort by partition ID, not key
            groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
          } else {
            // We do need to sort by both partition ID and key
            groupByPartition(destructiveIterator(                            ====>氣絕古今的亮點
              collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
          }
        } else {
          // Merge spilled and in-memory data
          merge(spills, destructiveIterator(
            collection.partitionedDestructiveSortedIterator(comparator)))     ====>氣絕古今的亮點
        }
      }
複製代碼

經過groupByPartition會給每個分區生成一個IteratorForPartition迭代器,實際上使用的都是相同的數據源,重點在這裏IteratorForPartition,成精了:

* Given a stream of ((partition, key), combiner) pairs 
       * assumed to be sorted by partition ID*,group together the pairs for each partition 
       *  into a sub-iterator.param data an iterator of elements, assumed to already be sorted
       * by partition ID

          private def groupByPartition(data: Iterator[((Int, K), C)])
              : Iterator[(Int, Iterator[Product2[K, C]])] =
          {
            val buffered = data.buffered
            (0 until numPartitions).iterator.map(p => (p, new IteratorForPartition(p, buffered)))
          }
複製代碼

IteratorForPartition可謂是煞費苦心,直接經過判斷給定的partitionId和數據源中的分區Id是否對應的上,來過濾源數據:

* An iterator that reads only the elements for a given partition ID from an 
       * underlying buffered,stream, assuming this partition is the next one to be read. Used to
       * make it easier to return partitioned iterators from our in-memory collection.

      private[this] class IteratorForPartition(partitionId: Int, data: BufferedIterator[((Int, K), C)])
        extends Iterator[Product2[K, C]]
      {
        override def hasNext: Boolean = data.hasNext && data.head._1._1 == partitionId
    
        override def next(): Product2[K, C] = {
          if (!hasNext) {
            throw new NoSuchElementException
          }
          val elem = data.next()
          (elem._1._2, elem._2)
        }
      }
複製代碼

3.5.2 spills數組不爲空,讓咱們接下來看看merge的代碼片斷吧:

* Merge a sequence of sorted files, giving an iterator over partitions and then over elements
   * inside each partition. This can be used to either write out a new file or return data to
   * the user.
   *
   * Returns an iterator over all the data written to this object, grouped by partition. For each
   * partition we then have an iterator over its contents, and these are expected to be accessed
   * in order (you can't "skip ahead" to one partition without reading the previous one).
   * Guaranteed to return a key-value pair for each partition, in order of partition ID.
複製代碼

桃李不言,下自成蹊。merge最終返回一個迭代器,方便按照分區順序寫正式的文件:

private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
      : Iterator[(Int, Iterator[Product2[K, C]])] = {
    val readers = spills.map(new SpillReader(_))           ====>氣絕古今的亮點,讀磁盤到內存
    val inMemBuffered = inMemory.buffered
    (0 until numPartitions).iterator.map { p =>
      val inMemIterator = new IteratorForPartition(p, inMemBuffered)
      
      val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) ====>氣絕古今的亮點
      
      if (aggregator.isDefined) {
        // Perform partial aggregation across partitions 
        (p, mergeWithAggregation(
          iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) ====>氣絕古今的亮點
      } else if (ordering.isDefined) {
        // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
        // sort the elements without trying to merge them
        (p, mergeSort(iterators, ordering.get))             ====>氣絕古今的亮點
      } else {
        (p, iterators.iterator.flatten)                      ====>氣絕古今的亮點,簡單累加,輸出
      }
    }
  }
複製代碼

3.6 超級總結

spark map任務在執行結束後,須要進行持久化過程,所以會出現兩種狀況:

  • 有溢出文件 將溢出文件與內存中的文件合併後寫入磁盤。
  • 無溢出文件 將內存中的數據進行整理,排序後寫入磁盤。 實際過程以下:

4 最後

幾經易稿,終於成文,需進一步完善。

秦凱新 於深圳

相關文章
相關標籤/搜索