Spark ShuffleManager內存緩衝器SortShuffleWriter設計思路剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 從ShuffeManager講起

一張圖我已經用過屢次了,不要見怪,由於畢竟都是一個主題,有關shuffle的。英文註釋已經很詳細了,這裏簡單介紹一下:緩存

  • 目前只有一個實現 SortShuffleManager。
  • SortShuffleManager依賴於ShuffleWriter提供服務,經過ShuffleWriter定義的規範,能夠將MapTask的任務中間結果按照約束的規範持久化到磁盤。
  • SortShuffleManager總共有三個子類, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter。

官方英文介紹以下:架構

* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.
複製代碼

ShuffeManager代碼欣賞,能夠看到,只是定義了標準規範:app

/**
       * Register a shuffle with the manager and obtain a handle for it to pass to tasks.
       */
      def registerShuffle[K, V, C](
          shuffleId: Int,
          numMaps: Int,
          dependency: ShuffleDependency[K, V, C]): ShuffleHandle
    
      /** Get a writer for a given partition. Called on executors by map tasks. */
      def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
    
      /**
       * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
       * Called on executors by reduce tasks.
       */
      def getReader[K, C](
          handle: ShuffleHandle,
          startPartition: Int,
          endPartition: Int,
          context: TaskContext): ShuffleReader[K, C]
    
      /**
       * Remove a shuffle's metadata from the ShuffleManager.
       * @return true if the metadata removed successfully, otherwise false.
       */
      def unregisterShuffle(shuffleId: Int): Boolean
    
      /**
       * Return a resolver capable of retrieving shuffle block data based on block coordinates.
       */
      def shuffleBlockResolver: ShuffleBlockResolver
    
      /** Shut down this ShuffleManager. */
      def stop(): Unit
    }
複製代碼
  • SortShuffleManager依賴於ShuffleHandle樣例類,主要仍是負責向Task傳遞Shuffle信息。一個是序列化,一個是肯定什麼時候繞開合併和排序的Shuffle路徑。

2 再講MapStatus

MapStatus的主要做用用於給ShuffleMapTask返回TaskScheduler的執行結果。看看MapStatus的代碼:框架

* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
 * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
複製代碼
  • 特質MapStatus,其中location和getSizeForBlock一個表示地址,一個表示大小。ide

    private[spark] sealed trait MapStatus {
        /** Location where this task was run. */
        def location: BlockManagerId
      
        /**
         * Estimated size for the reduce block, in bytes.
         *
         * If a block is non-empty, then this method MUST return a non-zero size.  This invariant is
         * necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
         */
        def getSizeForBlock(reduceId: Int): Long
      }
    複製代碼
  • 伴生對象用於實現壓縮oop

    private[spark] object MapStatus {
            def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
              if (uncompressedSizes.length > 2000) {
                HighlyCompressedMapStatus(loc, uncompressedSizes)
              } else {
                new CompressedMapStatus(loc, uncompressedSizes)
              }
            }
    複製代碼

3 三聊SortShuffleWriter(重磅戲)

  • 針對MapTask輸出提供了數據排序,聚合以及緩存功能
  • SortShuffleWriter底層藉助於PartionedAppendOnlyMap和PartionPairBuffer功能,實現數據的寫入緩存,以及在緩存中排序,聚合等。

3.1 SortShuffleWriter核心成員介紹

  • blockManager: SparkEnv.get.blockManager子組件實現數據存儲服務統一對外管理器post

  • sorter :御用成員ExternalSorter,實現內存中緩衝,排序,聚合功能。學習

  • mapStatus :數據輸出的規範,方便reducer查找。fetch

  • dep :handle.dependency傳入,主要是ShuffleDependency相關屬性。

  • shuffleBlockResolver :索引文件生成器

    * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
       * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
       * The offsets of the data blocks in the data file are stored in a separate index file.
       *
       * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
       * as the filename postfix for data file, and ".index" as the filename postfix for index file.
    複製代碼

3.2 SortShuffleWriter的Write方法(神來之筆)

  • ShuffleDependency的mapSideCombine屬性爲True時,則容許使用aggregator和keyOrdering屬性進行聚合和排序。 不然則不傳遞。這也說明一個問題,到底是使用PartitionedAppendOnlyMap仍是使用PartitionedPairBuffer。
  • insertAll實現了map任務的輸出記錄插入到內存。
  • ShuffleBlockId:獲取Shuffle的數據文件,主要是MapTask的輸出文件句柄。
  • writePartitionedFile:重要的夥伴,開始迭代Map端的緩存數據到磁盤,該過程可能會合並溢出到磁盤的中間數據,歸併排序後迭代寫入正式的Block文件到磁盤。
  • writeIndexFileAndCommit:爲最終的Block正式文件創建對應的索引,此索引會記錄不一樣分區Id對應的偏移值,以便reducer任務前來拉取。

3.3 SortShuffleWriter的精彩代碼段欣賞

override def write(records: Iterator[Product2[K, V]]): Unit = {
    
        sorter = if (dep.mapSideCombine) {
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          new ExternalSorter[K, V, C](
            context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
        } else {
          // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
          // care whether the keys get sorted in each partition; that will be done on the reduce side
          // if the operation being run is sortByKey.
          new ExternalSorter[K, V, V](
            context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
        }
        sorter.insertAll(records)                           ======> 神來之筆
    
        // Don't bother including the time to open the merged output file in the shuffle write time,
        // because it just opens a single file, so is typically too fast to measure accurately
        // (see SPARK-3570).
        val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)  
        val tmp = Utils.tempFileWith(output)
        try {
        
          val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)                        ======> 神來之筆
          
          val partitionLengths = sorter.writePartitionedFile(blockId, tmp) ======> 神來之筆
          
          shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)                                                             ======> 神來之筆
          
          mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
        } finally {
          if (tmp.exists() && !tmp.delete()) {
            logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
          }
        }
      }
複製代碼

3.4 SortShuffleWriter的Write方法示意圖,可謂一圖道盡全部

不廢話,這張圖簡直畫的太好了,望原圖做者看到留言於我。

3.5 SortShuffleWriter伴生對象shouldBypassMergeSort

是否是須要繞過聚合和排序。spark.shuffle.sort.bypassMergeThreshold默認值是200.

* We cannot bypass sorting if we need to do map-side aggregation.
    
    private[spark] object SortShuffleWriter {
      def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
        // We cannot bypass sorting if we need to do map-side aggregation.
        if (dep.mapSideCombine) {
          require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
          false
        } else {
          val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
          dep.partitioner.numPartitions <= bypassMergeThreshold
        }
      }
複製代碼

4 SortShuffleManager 如何扛霸子

根據須要選擇想要的UnsafeShuffleWriter 仍是BypassMergeSortShuffleWriter 仍是SortShuffleWriter,而後執行內存緩衝排序集合。

SortShuffleManager所以是組織者,對外暴露的管理者
/** Get a writer for a given partition. Called on executors by map tasks. */
  override def getWriter[K, V](
      handle: ShuffleHandle,
      mapId: Int,
      context: TaskContext): ShuffleWriter[K, V] = {
    numMapsForShuffle.putIfAbsent(
      handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
    val env = SparkEnv.get
    handle match {
      case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
        new UnsafeShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          context.taskMemoryManager(),
          unsafeShuffleHandle,
          mapId,
          context,
          env.conf)
      case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
        new BypassMergeSortShuffleWriter(
          env.blockManager,
          shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
          bypassMergeSortHandle,
          mapId,
          context,
          env.conf)
      case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
        new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
     }
 }
複製代碼

5 總結

本節內容是做者投入大量時間優化後的內容,採用最平實的語言來剖析 ShuffeManager之統一存儲服務SortShuffleWriter設計思路。

秦凱新 於深圳

相關文章
相關標籤/搜索