spark源碼閱讀--shuffle過程分析

ShuffleManager(一)

本篇,咱們來看一下spark內核中另外一個重要的模塊,Shuffle管理器ShuffleManager。shuffle能夠說是分佈式計算中最重要的一個概念了,數據的join,聚合去重等操做都須要這個步驟。另外一方面,spark之因此比mapReduce的性能高其中一個主要的緣由就是對shuffle過程的優化,一方面spark的shuffle過程更好地利用內存(也就是咱們前面在分析內存管理時所說的執行內存),另外一方面對於shuffle過程當中溢寫的磁盤文件歸併排序和引入索引文件。固然,spark性能高的另外一個主要緣由還有對計算鏈的優化,把多步map類型的計算chain在一塊兒,大大減小中間過程的落盤,這也是spark顯著區別於mr的地方。
spark新版本的Shuffle管理器默認是SortShuffleManager。java

SparkEnv初始化部分的代碼:apache

val shortShuffleMgrNames = Map(
  "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
  "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)

ShuffleMapTask.runTask

看shuffle管理器的源碼,咱們首先應該ShuffleManager的調用時機。想一下shuffle的過程,無非就是兩個步驟,寫和讀。寫是在map階段,將數據按照必定的分區規則歸類到不一樣的分區中,讀是在reduce階段,每一個分區從map階段的輸出中拉取屬於本身的數據,因此咱們分析ShuffleManager源碼基本也能夠沿着這個思路。咱們先來分析寫的過程,由於對於一個完整的shuffle過程,確定是先寫而後纔讀的。
回顧一下以前的對做業運行過程的分析,咱們應該還記得做業被切分紅任務後是在executor端執行的,而Shuffle階段的的stage被切分紅了ShuffleMapTask,shuffle的寫過程正是在這個類中完成的,咱們看一下代碼:api

能夠看到經過ShuffleManager.getWriter獲取了一個shuffle寫入器,從而將rdd的計算數據寫入磁盤。數組

override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
  threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化RDD和shuffle, 關鍵的步驟
// 這裏思考rdd和shuffle反序列化時,內部的SparkContext對象是怎麼反序列化的
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
  threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

var writer: ShuffleWriter[Any, Any] = null
try {
  // shuffle管理器
  val manager = SparkEnv.get.shuffleManager
  // 獲取一個shuffle寫入器
  writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
  // 這裏能夠看到rdd計算的核心方法就是iterator方法
  // SortShuffleWriter的write方法能夠分爲幾個步驟:
  // 將上游rdd計算出的數據(經過調用rdd.iterator方法)寫入內存緩衝區,
  // 在寫的過程當中若是超過 內存閾值就會溢寫磁盤文件,可能會寫多個文件
  // 最後將溢寫的文件和內存中剩餘的數據一塊兒進行歸併排序後寫入到磁盤中造成一個大的數據文件
  // 這個排序是先按分區排序,在按key排序
  // 在最後歸併排序後寫的過程當中,沒寫一個分區就會手動刷寫一遍,並記錄下這個分區數據在文件中的位移
  // 因此實際上最後寫完一個task的數據後,磁盤上會有兩個文件:數據文件和記錄每一個reduce端partition數據位移的索引文件
  writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  // 主要是刪除中間過程的溢寫文件,向內存管理器釋放申請的內存
  writer.stop(success = true).get
} catch {
  case e: Exception =>
    try {
      if (writer != null) {
        writer.stop(success = false)
      }
    } catch {
      case e: Exception =>
        log.debug("Could not stop writer", e)
    }
    throw e
}
}

SortShuffleManager.getWriter

這裏根據shuffle類型獲取不一樣的ShuffleWriter對象,大多數狀況下,都是SortShuffleWriter類型,因此咱們直接看SortShuffleWriter.write方法。數據結構

/** Get a writer for a given partition. Called on executors by map tasks. */
// 獲取一個shuffle存儲器,在executor端被調用,在執行一個map task調用
override def getWriter[K, V](
  handle: ShuffleHandle,
  mapId: Int,
  context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
  handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
  case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
    new UnsafeShuffleWriter(
      env.blockManager,
      shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
      context.taskMemoryManager(),
      unsafeShuffleHandle,
      mapId,
      context,
      env.conf)
  case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
    new BypassMergeSortShuffleWriter(
      env.blockManager,
      shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
      bypassMergeSortHandle,
      mapId,
      context,
      env.conf)
  case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
    new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}

SortShuffleWriter.write

總結一下這個方法的主要邏輯:app

  • 獲取一個排序器,根據是否須要map端聚合傳遞不一樣的參數
  • 將數據插入排序器中,這個過程或溢寫出多個磁盤文件
  • 根據shuffleid和分區id獲取一個磁盤文件名,
  • 將多個溢寫的磁盤文件和內存中的排序數據進行歸併排序,並寫到一個文件中,同時返回每一個reduce端分區的數據在這個文件中的位移
  • 將索引寫入一個索引文件,並將數據文件的文件名由臨時文件名改爲正式的文件名。
  • 最後封裝一個MapStatus對象,用於ShuffleMapTask.runTask的返回值。
  • 在stop方法中還會作一些收尾工做,統計磁盤io耗時,刪除中間溢寫文件分佈式

    override def write(records: Iterator[Product2[K, V]]): Unit = {
      sorter = if (dep.mapSideCombine) {
        // map端進行合併的狀況,此時用戶應該提供聚合器和順序
        require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
        new ExternalSorter[K, V, C](
          context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      } else {
        // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
        // care whether the keys get sorted in each partition; that will be done on the reduce side
        // if the operation being run is sortByKey.
        new ExternalSorter[K, V, V](
          context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
      }
      // 將map數據所有寫入排序器中,
      // 這個過程當中可能會生成多個溢寫文件
      sorter.insertAll(records)
    
      // Don't bother including the time to open the merged output file in the shuffle write time,
      // because it just opens a single file, so is typically too fast to measure accurately
      // (see SPARK-3570).
      // mapId就是shuffleMap端RDD的partitionId
      // 獲取這個map分區的shuffle輸出文件名
      val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
      // 加一個uuid後綴
      val tmp = Utils.tempFileWith(output)
      try {
        val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
        // 這一步將溢寫到的磁盤的文件和內存中的數據進行歸併排序,
        // 並溢寫到一個文件中,這一步寫的文件是臨時文件名
        val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
        // 這一步主要是寫入索引文件,使用move方法原子第將臨時索引和臨時數據文件重命名爲正常的文件名
        shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
        // 返回一個狀態對象,包含shuffle服務Id和各個分區數據在文件中的位移
        mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
      } finally {
        if (tmp.exists() && !tmp.delete()) {
          logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
        }
      }
      }

IndexShuffleBlockResolver

咱們首先看一下獲取shuffle輸出文件名,是經過IndexShuffleBlockResolver組件獲取的,而它的內部又是經過BlockManager內部的DiskBlockManager分配文件名的,這個DiskBlockManager我在以前分析塊管理器時提到過,它的做用就是管理文件名的分配,以及spark使用的目錄,子目錄的建立刪除等。咱們看到對於數據文件和索引文件的命名規則是不同的,他們的命名規則分別定義在ShuffleDataBlockId和ShuffleIndexBlockId中。ide

def getDataFile(shuffleId: Int, mapId: Int): File = {
  blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
  blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

ExternalSorter.insertAll

咱們根據SortShuffleWriter中的調用順序,首先看一下ExternalSorter.insertAll方法:性能

  • 首選根據是否在愛map端合併分爲兩種狀況,這兩種狀況使用的內存存儲結構也不同,對於在map端合併的狀況使用的是PartitionedAppendOnlyMap結構,不在map合併則使用PartitionedPairBuffer。其中,PartitionedAppendOnlyMap是用數組和線性探測法實現的map結構。
  • 而後將數據一條一條地循環插入內存的存儲結構中,同時考慮到map端合併的狀況優化

    def insertAll(records: Iterator[Product2[K, V]]): Unit = {
      // TODO: stop combining if we find that the reduction factor isn't high
      val shouldCombine = aggregator.isDefined
    
      // 在map端進行合併的狀況
      if (shouldCombine) {
        // Combine values in-memory first using our AppendOnlyMap
        val mergeValue = aggregator.get.mergeValue
        val createCombiner = aggregator.get.createCombiner
        var kv: Product2[K, V] = null
        val update = (hadValue: Boolean, oldValue: C) => {
          if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
        }
        while (records.hasNext) {
          addElementsRead()
          kv = records.next()
          // 向內存緩衝中插入一條數據
          map.changeValue((getPartition(kv._1), kv._1), update)
          // 若是緩衝超過閾值,就會溢寫到磁盤生成一個文件
          // 每寫入一條數據就檢查一遍內存
          maybeSpillCollection(usingMap = true)
        }
      } else {// 再也不map端合併的狀況
        // Stick values into our buffer
        while (records.hasNext) {
          addElementsRead()
          val kv = records.next()
          buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
          maybeSpillCollection(usingMap = false)
        }
      }
      }

AppendOnlyMap.changeValue

咱們看一個稍微複雜一點的結構,AppendOnlyMap,

  • 首先考慮空值的狀況
  • 計算key的hash,而後對容量取餘。注意,這裏因爲容量是2的整數次冪,因此對容量取餘的操做等同於和容量-1進行位與操做,java HashMap中的操做。
  • 若是,不存在舊值,那麼直接插入,
  • 若是存在舊值,更新舊值
  • 若是發生hash碰撞,那麼須要向後探測,而且是跳躍性的探測,

能夠看出,這個結構設計仍是很精良的,這裏面有個很重的方法,incrementSize方法中會判斷當前數據量的大小,若是超過閾值就會擴容,這個擴容的方法比較複雜,就是一個從新hash再分佈的過程,不過有一點,發不管是在插入新數據仍是從新hash再分佈的過程當中,對於hash碰撞的處理策略必定要相同,不然可能形成不一致。

// 向數組中插入一個kv對,
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
// 處理key爲空的狀況
if (k.eq(null)) {
  // 若是是第一次插入空值,那麼須要將大小增長1
  if (!haveNullValue) {
    incrementSize()
  }
  nullValue = updateFunc(haveNullValue, nullValue)
  haveNullValue = true
  return nullValue
}
var pos = rehash(k.hashCode) & mask
// 線性探測法處理hash碰撞
// 這裏是一個加速的線性探測,即第一次碰撞時走1步,
// 第二次碰撞時走2步,第三次碰撞時走3步
var i = 1
while (true) {
  val curKey = data(2 * pos)
  if (curKey.eq(null)) {// 若是舊值不存在,直接插入
    val newValue = updateFunc(false, null.asInstanceOf[V])
    data(2 * pos) = k
    data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
    incrementSize()
    return newValue
  } else if (k.eq(curKey) || k.equals(curKey)) {// 若是舊值存在,須要更新
    val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
    data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
    return newValue
  } else {// 發生hash碰撞,向後探測,跳躍性的探測
    val delta = i
    pos = (pos + delta) & mask
    i += 1
  }
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}

ExternalSorter.maybeSpillCollection

咱們回到ExternalSorter的插入方法中,沒插入一條數據都要檢查內存佔用,判斷是否須要溢寫到磁盤,若是須要就溢寫到磁盤。
這個方法裏調用了map.estimateSize來估算當前插入的數據的內存佔用大小,對於內存佔用的追蹤和估算的功能是在SizeTracker特質中實現的,這個特質我在以前分析MemoryStore時提到過,在將對象類型的數據插入內存中時使用了一箇中間態的數據結構DeserializedValuesHolder,它的內部有一個SizeTrackingVector,這個類就是經過繼承SizeTracker特徵從而實現對象大小的追蹤和估算。

private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
  estimatedSize = map.estimateSize()
  if (maybeSpill(map, estimatedSize)) {
    map = new PartitionedAppendOnlyMap[K, C]
  }
} else {
  estimatedSize = buffer.estimateSize()
  if (maybeSpill(buffer, estimatedSize)) {
    buffer = new PartitionedPairBuffer[K, C]
  }
}

if (estimatedSize > _peakMemoryUsedBytes) {
  _peakMemoryUsedBytes = estimatedSize
}
}

ExternalSorter.maybeSpill

首先檢查當前內存佔用是否超過閾值,若是超過會申請一次執行內存,若是沒有申請到足夠的執行內存,那麼依然須要溢寫到磁盤

protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
// 每寫入32條數據檢查一次
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
  // Claim up to double our current memory from the shuffle memory pool
  val amountToRequest = 2 * currentMemory - myMemoryThreshold
  // 向內存管理器申請執行內存
  val granted = acquireMemory(amountToRequest)
  myMemoryThreshold += granted
  // If we were granted too little memory to grow further (either tryToAcquire returned 0,
  // or we already had more memory than myMemoryThreshold), spill the current collection
  // 若是內存佔用超過了閾值,那麼就須要溢寫
  shouldSpill = currentMemory >= myMemoryThreshold
}
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
  _spillCount += 1
  logSpillage(currentMemory)
  // 溢寫到磁盤
  spill(collection)
  _elementsRead = 0
  _memoryBytesSpilled += currentMemory
  // 釋放內存
  releaseMemory()
}
shouldSpill
}

ExternalSorter.spill

接着上面的方法,

override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
// 獲取一個排序後的迭代器
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
// 將數據寫入磁盤文件中
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}

WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator

這個方法返回按照分區和key排序過的迭代器,其具體的排序邏輯在AppendOnlyMap.destructiveSortedIterator中

AppendOnlyMap.destructiveSortedIterator

這段代碼分爲兩塊,首先對數組進行壓緊,是的稀疏的數據所有轉移到數組的頭部;
而後對數組按照比較器進行排序,比較器首先是按照分區進行比較,若是分區相同才按照key進行比較;
而後返回一個迭代器,這個迭代器僅僅是對數組的封裝。經過這個方法,咱們大概知道了AppendonlyMap的排序邏輯。

def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = {
destroyed = true
// Pack KV pairs into the front of the underlying array
// 這段代碼將稀疏的數據所有轉移到數組頭部,將數據壓緊
var keyIndex, newIndex = 0
while (keyIndex < capacity) {
  if (data(2 * keyIndex) != null) {
    data(2 * newIndex) = data(2 * keyIndex)
    data(2 * newIndex + 1) = data(2 * keyIndex + 1)
    newIndex += 1
  }
  keyIndex += 1
}
assert(curSize == newIndex + (if (haveNullValue) 1 else 0))

// 根據比較器對數據進行排序
new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator)

new Iterator[(K, V)] {
  var i = 0
  var nullValueReady = haveNullValue
  def hasNext: Boolean = (i < newIndex || nullValueReady)
  def next(): (K, V) = {
    if (nullValueReady) {
      nullValueReady = false
      (null.asInstanceOf[K], nullValue)
    } else {
      val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V])
      i += 1
      item
    }
  }
}
}

ExternalSorter.spillMemoryIteratorToDisk

回到ExternalSorter.spill方法中,在獲取了通過排序後 的迭代器以後,咱們就能夠將數據溢寫到磁盤上了。
這個方法的代碼我不貼了,總結一下主要步驟:

  • 首先經過DiskBlockManager獲取一個臨時塊的BlockId和臨時文件名
  • 經過blockManager獲取一個磁盤寫入器,即DiskBlockObjectWriter對象,內部封裝了調用java流api寫文件的邏輯
  • 循環將每條數據寫入磁盤,並按期進行刷寫(每隔必定的數據條數將內存中的數據刷寫到磁盤上)
  • 若是發生異常,則會對以前寫入的文件進行回滾

小結

總結一下數據經過ExternalSorter向磁盤溢寫的全過程:

  • 首先,數據會被一條一條地向內部的map結構中插入
  • 每插入一條數據都會檢查內存佔用狀況,若是內存佔用超過閾值,而且申請不到足夠的執行內存,就會將目前內存中的數據溢寫到磁盤
  • 對於溢寫的過程:首先會將數據按照分區和key進行排序,相同分區的數據排在一塊兒,而後根據提供的排序器按照key的順序排;而後經過DiskBlockManager和BlockManager獲取DiskBlockWriter將數據寫入磁盤造成一個文件。,並將溢寫的文件信息
  • 在整個寫入過程當中,會溢寫多個文件

ExternalSorter.writePartitionedFile

總結一下主要的步驟:

  • 仍然是經過blockManager獲取一個磁盤寫入器
  • 將內部溢寫的多個磁盤文件和滯留在內存的數據進行歸併排序,並分裝成一個按照分區歸類的迭代器
  • 循環將數據寫入磁盤,每當一個分區的數據寫完後,進行一次刷寫,將數據從os的文件緩衝區同步到磁盤上,而後獲取此時的文件長度,記錄下每一個分區在文件中的位移

    def writePartitionedFile(
        blockId: BlockId,
        outputFile: File): Array[Long] = {
    
      // Track location of each range in the output file
      val lengths = new Array[Long](numPartitions)
      val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
        context.taskMetrics().shuffleWriteMetrics)
    
      // 若是前面沒有數據溢寫到磁盤中,
      // 則只須要將內存中的數據溢寫到磁盤
      if (spills.isEmpty) {
        // Case where we only have in-memory data
        val collection = if (aggregator.isDefined) map else buffer
        // 返回排序後的迭代器
        val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
        while (it.hasNext) {
          val partitionId = it.nextPartition()
          while (it.hasNext && it.nextPartition() == partitionId) {
            it.writeNext(writer)
          }
          // 寫完一個分區刷寫一次
          val segment = writer.commitAndGet()
          // 記錄下分區的數據在文件中的位移
          lengths(partitionId) = segment.length
        }
      } else {// 有溢寫到磁盤的文件
        // We must perform merge-sort; get an iterator by partition and write everything directly.
        // 封裝一個用於歸併各個溢寫文件以及內存緩衝區數據的迭代器
        // TODO 這個封裝的迭代器是實現歸併排序的關鍵
        for ((id, elements) <- this.partitionedIterator) {
          if (elements.hasNext) {
            for (elem <- elements) {
              writer.write(elem._1, elem._2)
            }
            // 每寫完一個分區,主動刷寫一次,獲取文件位移,
            // 這個位移就是寫入的分區的位移,
            // reduce端在拉取數據時就會根據這個位移直接找到應該拉取的數據的位置
            val segment = writer.commitAndGet()
            lengths(id) = segment.length
          }
        }
      }
    
      writer.close()
      // 寫完後更新一些統計信息
      context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
      context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
      context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
    
      // 返回每一個reduce端分區數據在文件中的位移信息
      lengths
      }

IndexShuffleBlockResolver.writeIndexFileAndCommit

仍然回到SortShuffleWriter.write方法,最後一步調用了IndexShuffleBlockResolver.writeIndexFileAndCommit方法,
這個方法的做用主要是將每一個的分區的位移值寫入到一個索引文件中,並將臨時的索引文件和臨時的數據文件重命名爲正常的文件名(重命名操做是一個原子操做)

總結

我總結shuffle寫數據的過程,能夠分爲兩個主要的步驟:

  • 一是在數據寫入的過程當中會因爲內存不足從而溢寫多個數據文件到磁盤中,而全部的文件都是按照分區和key排序的,這爲第二部歸併排序打下基礎
  • 第二部就是將這些溢寫的小文件和最後內存中剩下的數據進行歸併排序,而後寫入一個大文件中,而且在寫入的過程當中記錄每一個分區數據在文件中的位移,
  • 最後還要寫入一個索引文件,索引文件即記錄了每一個reduce端分區在數據文件中的位移,這樣reduce在拉取數據的時候才能很快定位到本身分區所須要的數據
相關文章
相關標籤/搜索