* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the
* driver and on each executor, based on the spark.shuffle.manager setting. The driver
* registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
* Remove a shuffle's metadata from the ShuffleManager.
* @return true if the metadata removed successfully, otherwise false.
def unregisterShuffle(shuffleId: Int): Boolean
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
def shuffleBlockResolver: ShuffleBlockResolver
/** Shut down this ShuffleManager. */
def stop(): Unit
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
private[spark] sealed trait MapStatus {
/** Location where this task was run. */
def location: BlockManagerId
* Estimated size for the reduce block, in bytes.
* If a block is non-empty, then this method MUST return a non-zero size. This invariant is
* necessary for correctness, since block fetchers are allowed to skip zero-size blocks.
def getSizeForBlock(reduceId: Int): Long
private[spark] object MapStatus {
def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > 2000) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
blockManager: SparkEnv.get.blockManager子組件實現數據存儲服務統一對外管理器post
sorter :御用成員ExternalSorter,實現內存中緩衝,排序,聚合功能。學習
mapStatus :數據輸出的規範,方便reducer查找。fetch
dep :handle.dependency傳入,主要是ShuffleDependency相關屬性。
shuffleBlockResolver :索引文件生成器
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
* The offsets of the data blocks in the data file are stored in a separate index file.
* We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
* as the filename postfix for data file, and ".index" as the filename postfix for index file.
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
sorter.insertAll(records) ======> 神來之筆
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) ======> 神來之筆
val partitionLengths = sorter.writePartitionedFile(blockId, tmp) ======> 神來之筆
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) ======> 神來之筆
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
* We cannot bypass sorting if we need to do map-side aggregation.
private[spark] object SortShuffleWriter {
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
根據須要選擇想要的UnsafeShuffleWriter 仍是BypassMergeSortShuffleWriter 仍是SortShuffleWriter,而後執行內存緩衝排序集合。
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
本節內容是做者投入大量時間優化後的內容,採用最平實的語言來剖析 ShuffeManager之統一存儲服務SortShuffleWriter設計思路。
