val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
override def runTask(context: TaskContext): MapStatus = { // Deserialize the RDD using the broadcast variable. val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() // 反序列化RDD和shuffle, 關鍵的步驟 // 這裏思考rdd和shuffle反序列化時,內部的SparkContext對象是怎麼反序列化的 val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { // shuffle管理器 val manager = SparkEnv.get.shuffleManager // 獲取一個shuffle寫入器 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 這裏能夠看到rdd計算的核心方法就是iterator方法 // SortShuffleWriter的write方法能夠分爲幾個步驟: // 將上游rdd計算出的數據(經過調用rdd.iterator方法)寫入內存緩衝區, // 在寫的過程當中若是超過 內存閾值就會溢寫磁盤文件,可能會寫多個文件 // 最後將溢寫的文件和內存中剩餘的數據一塊兒進行歸併排序後寫入到磁盤中造成一個大的數據文件 // 這個排序是先按分區排序,在按key排序 // 在最後歸併排序後寫的過程當中,沒寫一個分區就會手動刷寫一遍,並記錄下這個分區數據在文件中的位移 // 因此實際上最後寫完一個task的數據後,磁盤上會有兩個文件:數據文件和記錄每一個reduce端partition數據位移的索引文件 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 主要是刪除中間過程的溢寫文件,向內存管理器釋放申請的內存 writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
/** Get a writer for a given partition. Called on executors by map tasks. */ // 獲取一個shuffle存儲器,在executor端被調用,在執行一個map task調用 override def getWriter[K, V]( handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] = { numMapsForShuffle.putIfAbsent( handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) val env = SparkEnv.get handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], bypassMergeSortHandle, mapId, context, env.conf) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) } }
override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { // map端進行合併的狀況,此時用戶應該提供聚合器和順序 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 將map數據所有寫入排序器中, // 這個過程當中可能會生成多個溢寫文件 sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). // mapId就是shuffleMap端RDD的partitionId // 獲取這個map分區的shuffle輸出文件名 val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) // 加一個uuid後綴 val tmp = Utils.tempFileWith(output) try { val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) // 這一步將溢寫到的磁盤的文件和內存中的數據進行歸併排序, // 並溢寫到一個文件中,這一步寫的文件是臨時文件名 val partitionLengths = sorter.writePartitionedFile(blockId, tmp) // 這一步主要是寫入索引文件,使用move方法原子第將臨時索引和臨時數據文件重命名爲正常的文件名 shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) // 返回一個狀態對象,包含shuffle服務Id和各個分區數據在文件中的位移 mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") } } }
def getDataFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) } private def getIndexFile(shuffleId: Int, mapId: Int): File = { blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID)) }
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined // 在map端進行合併的狀況 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap val mergeValue = aggregator.get.mergeValue val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { addElementsRead() kv = records.next() // 向內存緩衝中插入一條數據 map.changeValue((getPartition(kv._1), kv._1), update) // 若是緩衝超過閾值,就會溢寫到磁盤生成一個文件 // 每寫入一條數據就檢查一遍內存 maybeSpillCollection(usingMap = true) } } else {// 再也不map端合併的狀況 // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) maybeSpillCollection(usingMap = false) } } }
// 向數組中插入一個kv對, def changeValue(key: K, updateFunc: (Boolean, V) => V): V = { assert(!destroyed, destructionMessage) val k = key.asInstanceOf[AnyRef] // 處理key爲空的狀況 if (k.eq(null)) { // 若是是第一次插入空值,那麼須要將大小增長1 if (!haveNullValue) { incrementSize() } nullValue = updateFunc(haveNullValue, nullValue) haveNullValue = true return nullValue } var pos = rehash(k.hashCode) & mask // 線性探測法處理hash碰撞 // 這裏是一個加速的線性探測,即第一次碰撞時走1步, // 第二次碰撞時走2步,第三次碰撞時走3步 var i = 1 while (true) { val curKey = data(2 * pos) if (curKey.eq(null)) {// 若是舊值不存在,直接插入 val newValue = updateFunc(false, null.asInstanceOf[V]) data(2 * pos) = k data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] incrementSize() return newValue } else if (k.eq(curKey) || k.equals(curKey)) {// 若是舊值存在,須要更新 val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V]) data(2 * pos + 1) = newValue.asInstanceOf[AnyRef] return newValue } else {// 發生hash碰撞,向後探測,跳躍性的探測 val delta = i pos = (pos + delta) & mask i += 1 } } null.asInstanceOf[V] // Never reached but needed to keep compiler happy }
private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } }
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false // 每寫入32條數據檢查一次 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold // 向內存管理器申請執行內存 val granted = acquireMemory(amountToRequest) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection // 若是內存佔用超過了閾值,那麼就須要溢寫 shouldSpill = currentMemory >= myMemoryThreshold } shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 logSpillage(currentMemory) // 溢寫到磁盤 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory // 釋放內存 releaseMemory() } shouldSpill }
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { // 獲取一個排序後的迭代器 val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) // 將數據寫入磁盤文件中 val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) spills += spillFile }
def destructiveSortedIterator(keyComparator: Comparator[K]): Iterator[(K, V)] = { destroyed = true // Pack KV pairs into the front of the underlying array // 這段代碼將稀疏的數據所有轉移到數組頭部,將數據壓緊 var keyIndex, newIndex = 0 while (keyIndex < capacity) { if (data(2 * keyIndex) != null) { data(2 * newIndex) = data(2 * keyIndex) data(2 * newIndex + 1) = data(2 * keyIndex + 1) newIndex += 1 } keyIndex += 1 } assert(curSize == newIndex + (if (haveNullValue) 1 else 0)) // 根據比較器對數據進行排序 new Sorter(new KVArraySortDataFormat[K, AnyRef]).sort(data, 0, newIndex, keyComparator) new Iterator[(K, V)] { var i = 0 var nullValueReady = haveNullValue def hasNext: Boolean = (i < newIndex || nullValueReady) def next(): (K, V) = { if (nullValueReady) { nullValueReady = false (null.asInstanceOf[K], nullValue) } else { val item = (data(2 * i).asInstanceOf[K], data(2 * i + 1).asInstanceOf[V]) i += 1 item } } } }
回到ExternalSorter.spill方法中,在獲取了通過排序後 的迭代器以後,咱們就能夠將數據溢寫到磁盤上了。
def writePartitionedFile( blockId: BlockId, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics().shuffleWriteMetrics) // 若是前面沒有數據溢寫到磁盤中, // 則只須要將內存中的數據溢寫到磁盤 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer // 返回排序後的迭代器 val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext) { val partitionId = it.nextPartition() while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(writer) } // 寫完一個分區刷寫一次 val segment = writer.commitAndGet() // 記錄下分區的數據在文件中的位移 lengths(partitionId) = segment.length } } else {// 有溢寫到磁盤的文件 // We must perform merge-sort; get an iterator by partition and write everything directly. // 封裝一個用於歸併各個溢寫文件以及內存緩衝區數據的迭代器 // TODO 這個封裝的迭代器是實現歸併排序的關鍵 for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { for (elem <- elements) { writer.write(elem._1, elem._2) } // 每寫完一個分區,主動刷寫一次,獲取文件位移, // 這個位移就是寫入的分區的位移, // reduce端在拉取數據時就會根據這個位移直接找到應該拉取的數據的位置 val segment = writer.commitAndGet() lengths(id) = segment.length } } } writer.close() // 寫完後更新一些統計信息 context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) // 返回每一個reduce端分區數據在文件中的位移信息 lengths }