Spark Shuffle 管理器SortShuffleManager內核原理深刻剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 從ShuffeManager講起

一張圖我已經用過屢次了,不要見怪,由於畢竟都是一個主題,有關shuffle的。英文註釋已經很詳細了,這裏簡單介紹一下:緩存

  • 目前只有一個實現 SortShuffleManager。
  • SortShuffleManager依賴於ShuffleWriter提供服務,經過ShuffleWriter定義的規範,能夠將MapTask的任務中間結果按照約束的規範持久化到磁盤。
  • SortShuffleManager總共有三個子類, UnsafeShuffleWriter,SortShuffleWriter ,BypassMergeSortShuffleWriter用於Shuffle的寫。
  • SortShuffleManager使用BlockStoreShuffleReader用於Shuffle的讀(請詳細參照BlockStoreShuffleReader的博客內容)。
  • SortShuffleManager依賴於ShuffleHandle樣例類,主要仍是負責向Task傳遞Shuffle信息。一個是序列化,一個是肯定什麼時候繞開合併和排序的Shuffle路徑。

官方英文介紹以下:架構

* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the 
     * driver and on each executor, based on the spark.shuffle.manager setting. The driver 
     * registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
     
     * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
     * boolean isDriver as parameters.
複製代碼

2 SortShuffleManager的進階

  • 管理基於排序的shuffle(也即輸入的記錄按照目標分區ID排序數據,這些記錄最終會輸出一份正式的單獨文件到磁盤,一個是寫,一個是讀)app

  • 寫的話,舉例如:經過SortShuffleWriter,利用其Write()函數,把MapTask的數據通過緩衝區,聚合排序後,寫入磁盤。中間過程如溢出,Merge等操做,最終落盤。框架

  • 讀的話,舉例如:經過BlockStoreShuffleReader,利用其read()方法,利用ShuffleBlockFetcherIterator來實現數據的迭代讀取,經過緩衝區,聚合,排序到內存中,部分會溢出到磁盤。ide

  • 英文解釋,很是精準:函數

    * In sort-based shuffle, incoming records are sorted according to their target partition ids, then
       * written to a single map output file. Reducers fetch contiguous regions of this file in order to
       * read their portion of the map output. In cases where the map output data is too large to fit in
       * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
       * to produce the final output file.
    複製代碼
  • numMapsForShuffle :成員變量,shuffle ID 與map任務的數量之間的映射關係。oop

  • shuffleBlockResolver :IndexShuffleBlockResolverpost

    * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
       * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
       * The offsets of the data blocks in the data file are stored in a separate index file.
       *
       * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
       * as the filename postfix for data file, and ".index" as the filename postfix for index file.
    複製代碼

2 SortShuffleManager的主幹方法

2.1 registerShuffle

override def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
    if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
      // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
      // need map-side aggregation, then write numPartitions files directly and just concatenate
      // them at the end. This avoids doing serialization and deserialization twice to merge
      // together the spilled files, which would happen with the normal code path. The downside is
      // having multiple files open at a time and thus more memory allocated to buffers.
      new BypassMergeSortShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
      new SerializedShuffleHandle[K, V](
        shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } else {
      // Otherwise, buffer map outputs in a deserialized form:
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }
  }
複製代碼

2.2 getReader

根據map任務的輸出的分區數據文件中從startPartition to endPartition-1範圍內的數據進行讀取的讀取器(BlockStoreShuffleReader)學習

override def getReader[K, C](
      handle: ShuffleHandle,
      startPartition: Int,
      endPartition: Int,
      context: TaskContext): ShuffleReader[K, C] = {
    new BlockStoreShuffleReader(
      handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
  }
複製代碼

2.2 getWriter

用於根據ShuffleHandle獲取ShuffleWriter。

override def getWriter[K, V](
          handle: ShuffleHandle,
          mapId: Int,
          context: TaskContext): ShuffleWriter[K, V] = {
        numMapsForShuffle.putIfAbsent(
          handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
        val env = SparkEnv.get
        handle match {
          case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
          
            new UnsafeShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              context.taskMemoryManager(),
              unsafeShuffleHandle,
              mapId,
              context,
              env.conf)                                                          <=點睛之筆
          case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
          
            new BypassMergeSortShuffleWriter(
              env.blockManager,
              shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
              bypassMergeSortHandle,
              mapId,
              context,
              env.conf)                                                          <=點睛之筆
          case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
          
            new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)    <=點睛之筆
        }
      }
複製代碼

3 SortShuffleManager總結

  • 註冊Shuffle
  • 獲取SHuffle的ShuffleWriter
  • 獲取SHuffle的ShuffleRead

秦凱新 於深圳

相關文章
相關標籤/搜索