Shuffle過程主要分爲Shuffle write和Shuffle read兩個階段,2.0版本以後hash shuffle被刪除,只保留sort shuffle,下面結合代碼分析:java
Spark在初始化SparkEnv的時候,會在create()方法裏面初始化ShuffleManagerapache
// Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
這裏能夠看到包含sort和tungsten-sort兩種shuffle,經過反射建立了ShuffleManager,ShuffleManager是一個特質,核心方法有下面幾個:數組
private[spark] trait ShuffleManager { /** * 註冊一個shuffle返回句柄 */ def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle /** 獲取一個Writer根據給定的分區,在executors執行map任務時被調用 */ def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] /** * 獲取一個Reader根據reduce分區的範圍,在executors執行reduce任務時被調用 */ def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] ... }
SortShuffleManager是ShuffleManager的惟一實現類,對於以上三個方法的實現以下:緩存
/** * Obtains a [[ShuffleHandle]] to pass to tasks. */ override def registerShuffle[K, V, C]( shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { // 1.首先檢查是否符合BypassMergeSort if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't // need map-side aggregation, then write numPartitions files directly and just concatenate // them at the end. This avoids doing serialization and deserialization twice to merge // together the spilled files, which would happen with the normal code path. The downside is // having multiple files open at a time and thus more memory allocated to buffers. new BypassMergeSortShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) // 2.不然檢查是否可以序列化 } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient: new SerializedShuffleHandle[K, V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) } else { // Otherwise, buffer map outputs in a deserialized form: new BaseShuffleHandle(shuffleId, dependency) } }
1.首先檢查是否符合BypassMergeSort,這裏須要知足兩個條件,首先是當前shuffle依賴中沒有map端的聚合操做,其次是分區數要小於spark.shuffle.sort.bypassMergeThreshold的值,默認爲200,若是知足這兩個條件,會返回BypassMergeSortShuffleHandle,啓用bypass merge-sort shuffle機制數據結構
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { // We cannot bypass sorting if we need to do map-side aggregation. if (dep.mapSideCombine) { false } else { // 默認值爲200 val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions <= bypassMergeThreshold } }
2.若是不知足上面條件,檢查是否知足canUseSerializedShuffle()方法,若是知足該方法中的3個條件,則會返回SerializedShuffleHandle,啓用tungsten-sort shuffle機制app
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { val shufId = dependency.shuffleId val numPartitions = dependency.partitioner.numPartitions // 序列化器須要支持Relocation if (!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " + s"${dependency.serializer.getClass.getName}, does not support object relocation") false // 不能有map端聚合操做 } else if (dependency.mapSideCombine) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " + s"map-side aggregation") false // 分區數不能大於16777215+1 } else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " + s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions") false } else { log.debug(s"Can use serialized shuffle for shuffle $shufId") true } }
3.若是以上兩個條件都不知足的話,會返回BaseShuffleHandle,採用基本sort shuffle機制ide
/** * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive). * Called on executors by reduce tasks. */ override def getReader[K, C]( handle: ShuffleHandle, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( handle.shuffleId, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) }
這裏返回BlockStoreShuffleReader函數
/** Get a writer for a given partition. Called on executors by map tasks. */ override def getWriter[K, V]( handle: ShuffleHandle, mapId: Long, context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent( handle.shuffleId, _ => new OpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) } val env = SparkEnv.get // 根據handle獲取不一樣ShuffleWrite handle match { case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => new UnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context, env.conf, metrics, shuffleExecutorComponents) case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => new BypassMergeSortShuffleWriter( env.blockManager, bypassMergeSortHandle, mapId, env.conf, metrics, shuffleExecutorComponents) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => new SortShuffleWriter( shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents) } }
這裏會根據handle獲取不一樣ShuffleWrite,若是是SerializedShuffleHandle,使用UnsafeShuffleWriter,若是是BypassMergeSortShuffleHandle,採用BypassMergeSortShuffleWriter,不然使用SortShuffleWriterui
如上文所說,當開啓bypass機制後,會使用BypassMergeSortShuffleWriter,若是serializer支持relocation而且map端沒有聚合同時分區數目不大於16777215+1三個條件都知足,使用UnsafeShuffleWriter,不然使用SortShuffleWriterthis
BypassMergeSortShuffleWriter繼承ShuffleWriter,用java實現,會將map端的多個輸出文件合併爲一個文件,同時生成一個索引文件,索引記錄到每一個分區的初始地址,write()方法以下:
@Override public void write(Iterator<Product2<K, V>> records) throws IOException { assert (partitionWriters == null); // 新建一個ShuffleMapOutputWriter ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, numPartitions); try { // 若是沒有數據的話 if (!records.hasNext()) { // 返回全部分區的寫入長度 partitionLengths = mapOutputWriter.commitAllPartitions(); // 更新mapStatus mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); return; } final SerializerInstance serInstance = serializer.newInstance(); final long openStartTime = System.nanoTime(); // 建立和分區數相等的DiskBlockObjectWriter FileSegment partitionWriters = new DiskBlockObjectWriter[numPartitions]; partitionWriterSegments = new FileSegment[numPartitions]; // 對於每一個分區 for (int i = 0; i < numPartitions; i++) { // 建立一個臨時的block final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); // 獲取temp block的file和id final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); // 對於每一個分區,建立一個DiskBlockObjectWriter partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } // Creating the file to write to and creating a disk writer both involve interacting with // the disk, and can take a long time in aggregate when we open many files, so should be // included in the shuffle write time. // 建立文件和寫入文件都須要大量時間,也須要包含在shuffle寫入時間裏面 writeMetrics.incWriteTime(System.nanoTime() - openStartTime); // 若是有數據的話 while (records.hasNext()) { final Product2<K, V> record = records.next(); final K key = record._1(); // 對於每條數據按key寫入相應分區對應的文件 partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } for (int i = 0; i < numPartitions; i++) { try (DiskBlockObjectWriter writer = partitionWriters[i]) { // 提交 partitionWriterSegments[i] = writer.commitAndGet(); } } // 將全部分區文件合併成一個文件 partitionLengths = writePartitionedData(mapOutputWriter); // 更新mapStatus mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); } catch (Exception e) { try { mapOutputWriter.abort(e); } catch (Exception e2) { logger.error("Failed to abort the writer after failing to write map output.", e2); e.addSuppressed(e2); } throw e; } }
合併文件的方法writePartitionedData()以下,默認採用零拷貝的方式來合併文件:
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException { // Track location of the partition starts in the output file if (partitionWriters != null) { // 開始時間 final long writeStartTime = System.nanoTime(); try { for (int i = 0; i < numPartitions; i++) { // 獲取每一個文件 final File file = partitionWriterSegments[i].file(); ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i); if (file.exists()) { // 採起零拷貝方式 if (transferToEnabled) { // Using WritableByteChannelWrapper to make resource closing consistent between // this implementation and UnsafeShuffleWriter. Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper(); // 在這裏會調用Utils.copyFileStreamNIO方法,最終調用FileChannel.transferTo方法拷貝文件 if (maybeOutputChannel.isPresent()) { writePartitionedDataWithChannel(file, maybeOutputChannel.get()); } else { writePartitionedDataWithStream(file, writer); } } else { // 不然採起流的方式拷貝 writePartitionedDataWithStream(file, writer); } if (!file.delete()) { logger.error("Unable to delete file for partition {}", i); } } } } finally { writeMetrics.incWriteTime(System.nanoTime() - writeStartTime); } partitionWriters = null; } return mapOutputWriter.commitAllPartitions(); }
UnsafeShuffleWriter也是繼承ShuffleWriter,用java實現,write方法以下:
@Override public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException { // Keep track of success so we know if we encountered an exception // We do this rather than a standard try/catch/re-throw to handle // generic throwables. // 跟蹤異常 boolean success = false; try { while (records.hasNext()) { // 將數據插入ShuffleExternalSorter進行外部排序 insertRecordIntoSorter(records.next()); } // 合併並輸出文件 closeAndWriteOutput(); success = true; } finally { if (sorter != null) { try { sorter.cleanupResources(); } catch (Exception e) { // Only throw this error if we won't be masking another // error. if (success) { throw e; } else { logger.error("In addition to a failure during writing, we failed during " + "cleanup.", e); } } } } }
這裏主要有兩個方法:
@VisibleForTesting void insertRecordIntoSorter(Product2<K, V> record) throws IOException { assert(sorter != null); // 獲取key和分區 final K key = record._1(); final int partitionId = partitioner.getPartition(key); // 重置緩衝區 serBuffer.reset(); // 將key和value寫入緩衝區 serOutputStream.writeKey(key, OBJECT_CLASS_TAG); serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG); serOutputStream.flush(); // 獲取序列化數據大小 final int serializedRecordSize = serBuffer.size(); assert (serializedRecordSize > 0); // 將序列化後的數據插入ShuffleExternalSorter處理 sorter.insertRecord( serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId); }
該方法會將數據進行序列化,而且將序列化後的數據經過insertRecord()方法插入外部排序器中,insertRecord()方法以下:
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId) throws IOException { // for tests assert(inMemSorter != null); // 若是數據條數超過溢寫閾值,直接溢寫磁盤 if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold " + numElementsForSpillThreshold); spill(); } // Checks whether there is enough space to insert an additional record in to the sort pointer // array and grows the array if additional space is required. If the required space cannot be // obtained, then the in-memory data will be spilled to disk. // 檢查是否有足夠的空間插入額外的記錄到排序指針數組中,若是須要額外的空間對數組進行擴容,若是空間不夠,內存中的數據將會被溢寫到磁盤上 growPointerArrayIfNecessary(); final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // Need 4 or 8 bytes to store the record length. // 須要額外的4或8個字節存儲數據長度 final int required = length + uaoSize; // 若是須要更多的內存,會想TaskMemoryManager申請新的page acquireNewPageIfNecessary(required); assert(currentPage != null); final Object base = currentPage.getBaseObject(); //Given a memory page and offset within that page, encode this address into a 64-bit long. //This address will remain valid as long as the corresponding page has not been freed. // 經過給定的內存頁和偏移量,將當前數據的邏輯地址編碼成一個long型 final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor); // 寫長度值 UnsafeAlignedOffset.putSize(base, pageCursor, length); // 移動指針 pageCursor += uaoSize; // 寫數據 Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); // 移動指針 pageCursor += length; // 將編碼的邏輯地址和分區id傳給ShuffleInMemorySorter進行排序 inMemSorter.insertRecord(recordAddress, partitionId); }
在這裏對於數據的緩存和溢寫不借助於其餘高級數據結構,而是直接操做內存空間
growPointerArrayIfNecessary()方法以下:
/** * Checks whether there is enough space to insert an additional record in to the sort pointer * array and grows the array if additional space is required. If the required space cannot be * obtained, then the in-memory data will be spilled to disk. */ private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); // 若是沒有空間容納新的數據 if (!inMemSorter.hasSpaceForAnotherRecord()) { // 獲取當前內存使用量 long used = inMemSorter.getMemoryUsage(); LongArray array; try { // could trigger spilling // 分配給緩存原來兩倍的容量 array = allocateArray(used / 8 * 2); } catch (TooLargePageException e) { // The pointer array is too big to fix in a single page, spill. // 若是超出了一頁的大小,直接溢寫,溢寫方法見後面 // 一頁的大小爲128M,在PackedRecordPointer類中 // static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes spill(); return; } catch (SparkOutOfMemoryError e) { // should have trigger spilling if (!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array"); throw e; } return; } // check if spilling is triggered or not if (inMemSorter.hasSpaceForAnotherRecord()) { // 若是有了剩餘空間,則代表不必擴容,釋放分配的空間 freeArray(array); } else { // 不然把原來的數組複製到新的數組 inMemSorter.expandPointerArray(array); } } }
spill()方法以下:
@Override public long spill(long size, MemoryConsumer trigger) throws IOException { if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) { return 0L; } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), Utils.bytesToString(getMemoryUsage()), spills.size(), spills.size() > 1 ? " times" : " time"); // Sorts the in-memory records and writes the sorted records to an on-disk file. // This method does not free the sort data structures. // 對內存中的數據進行排序而且將有序記錄寫到一個磁盤文件中,這個方法不會釋放排序的數據結構 writeSortedFile(false); final long spillSize = freeMemory(); // 重置ShuffleInMemorySorter inMemSorter.reset(); // Reset the in-memory sorter's pointer array only after freeing up the memory pages holding the // records. Otherwise, if the task is over allocated memory, then without freeing the memory // pages, we might not be able to get memory for the pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); return spillSize; }
writeSortedFile()方法:
private void writeSortedFile(boolean isLastFile) { // This call performs the actual sort. // 返回一個排序好的迭代器 final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords = inMemSorter.getSortedIterator(); // If there are no sorted records, so we don't need to create an empty spill file. if (!sortedRecords.hasNext()) { return; } final ShuffleWriteMetricsReporter writeMetricsToUse; // 若是爲true,則爲輸出文件,不然爲溢寫文件 if (isLastFile) { // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes. writeMetricsToUse = writeMetrics; } else { // We're spilling, so bytes written should be counted towards spill rather than write. // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count // them towards shuffle bytes written. writeMetricsToUse = new ShuffleWriteMetrics(); } // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to // be an API to directly transfer bytes from managed memory to the disk writer, we buffer // data through a byte array. This array does not need to be large enough to hold a single // record; // 建立一個字節緩衝數組,大小爲1m final byte[] writeBuffer = new byte[diskWriteBufferSize]; // Because this output will be read during shuffle, its compression codec must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more details. // 建立一個臨時的shuffle block final Tuple2<TempShuffleBlockId, File> spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock(); // 獲取文件和id final File file = spilledFileInfo._2(); final TempShuffleBlockId blockId = spilledFileInfo._1(); final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId); // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter. // Our write path doesn't actually use this serializer (since we end up calling the `write()` // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work // around this, we pass a dummy no-op serializer. // 不作任何轉換的序列化器,由於須要一個實例來構造DiskBlockObjectWriter final SerializerInstance ser = DummySerializerInstance.INSTANCE; int currentPartition = -1; final FileSegment committedSegment; try (DiskBlockObjectWriter writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) { final int uaoSize = UnsafeAlignedOffset.getUaoSize(); // 遍歷 while (sortedRecords.hasNext()) { sortedRecords.loadNext(); final int partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition); if (partition != currentPartition) { // Switch to the new partition // 若是切換到了新的分區,提交當前分區,而且記錄當前分區大小 if (currentPartition != -1) { final FileSegment fileSegment = writer.commitAndGet(); spillInfo.partitionLengths[currentPartition] = fileSegment.length(); } // 而後切換到下一個分區 currentPartition = partition; } // 獲取指針,經過指針獲取頁號和偏移量 final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer(); final Object recordPage = taskMemoryManager.getPage(recordPointer); final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer); // 獲取剩餘數據 int dataRemaining = UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage); // 跳過數據前面存儲的長度 long recordReadPosition = recordOffsetInPage + uaoSize; // skip over record length while (dataRemaining > 0) { final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining); // 將數據拷貝到緩衝數組中 Platform.copyMemory( recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer); // 從緩衝數組中轉入DiskBlockObjectWriter writer.write(writeBuffer, 0, toTransfer); // 更新位置 recordReadPosition += toTransfer; // 更新剩餘數據 dataRemaining -= toTransfer; } writer.recordWritten(); } // 提交 committedSegment = writer.commitAndGet(); } // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted, // then the file might be empty. Note that it might be better to avoid calling // writeSortedFile() in that case. // 記錄溢寫文件的列表 if (currentPartition != -1) { spillInfo.partitionLengths[currentPartition] = committedSegment.length(); spills.add(spillInfo); } // 若是是溢寫文件,更新溢寫的指標 if (!isLastFile) { writeMetrics.incRecordsWritten( ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled( ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten()); } }
encodePageNumberAndOffset()方法以下:
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { // 若是開啓了堆外內存,偏移量爲絕對地址,可能須要64位進行編碼,因爲頁大小限制,將其減去當前頁的基地址,變爲相對地址 if (tungstenMemoryMode == MemoryMode.OFF_HEAP) { // In off-heap mode, an offset is an absolute address that may require a full 64 bits to // encode. Due to our page size limitation, though, we can convert this into an offset that's // relative to the page's base offset; this relative offset will fit in 51 bits. offsetInPage -= page.getBaseOffset(); } return encodePageNumberAndOffset(page.pageNumber, offsetInPage); } @VisibleForTesting public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) { assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page"; // 高13位爲頁號,低51位爲偏移量 // 頁號左移51位,再拼偏移量和上一個低51位都爲1的掩碼0x7FFFFFFFFFFFFL return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS); }
ShuffleInMemorySorter的insertRecord()方法以下:
public void insertRecord(long recordPointer, int partitionId) { if (!hasSpaceForAnotherRecord()) { throw new IllegalStateException("There is no space for new record"); } array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++; }
PackedRecordPointer.packPointer()方法:
public static long packPointer(long recordPointer, int partitionId) { assert (partitionId <= MAXIMUM_PARTITION_ID); // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page. // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses. // 將頁號右移24位,和低27位拼在一塊兒,這樣邏輯地址被壓縮成40位 final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24; final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS); // 將分區號放在高24位上 return (((long) partitionId) << 40) | compressedAddress; }
getSortedIterator()方法:
public ShuffleSorterIterator getSortedIterator() { int offset = 0; // 使用基數排序對內存分區ID進行排序。基數排序要快得多,可是在添加指針時須要額外的內存做爲保留內存 if (useRadixSort) { offset = RadixSort.sort( array, pos, PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX, PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX, false, false); // 不然採用timSort排序 } else { MemoryBlock unused = new MemoryBlock( array.getBaseObject(), array.getBaseOffset() + pos * 8L, (array.size() - pos) * 8L); LongArray buffer = new LongArray(unused); Sorter<PackedRecordPointer, LongArray> sorter = new Sorter<>(new ShuffleSortDataFormat(buffer)); sorter.sort(array, 0, pos, SORT_COMPARATOR); } return new ShuffleSorterIterator(pos, array, offset); }
@VisibleForTesting void closeAndWriteOutput() throws IOException { assert(sorter != null); updatePeakMemoryUsed(); serBuffer = null; serOutputStream = null; // 獲取溢寫文件 final SpillInfo[] spills = sorter.closeAndGetSpills(); sorter = null; final long[] partitionLengths; try { // 合併溢寫文件 partitionLengths = mergeSpills(spills); } finally { // 刪除溢寫文件 for (SpillInfo spill : spills) { if (spill.file.exists() && !spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } } // 更新mapstatus mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId); }
mergeSpills()方法:
private long[] mergeSpills(SpillInfo[] spills) throws IOException { long[] partitionLengths; // 若是沒有溢寫文件,建立空的 if (spills.length == 0) { final ShuffleMapOutputWriter mapWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); return mapWriter.commitAllPartitions(); // 若是隻有一個溢寫文件,將它合併輸出 } else if (spills.length == 1) { Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter = shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId); if (maybeSingleFileWriter.isPresent()) { // Here, we don't need to perform any metrics updates because the bytes written to this // output file would have already been counted as shuffle bytes written. partitionLengths = spills[0].partitionLengths; maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths); } else { partitionLengths = mergeSpillsUsingStandardWriter(spills); } // 若是有多個,合併輸出,合併的時候有NIO和BIO兩種方式 } else { partitionLengths = mergeSpillsUsingStandardWriter(spills); } return partitionLengths; }
SortShuffleWriter會使用PartitionedAppendOnlyMap或PartitionedPariBuffer在內存中進行排序,若是超過內存限制,會溢寫到文件中,在全局輸出有序文件的時候,對以前的全部輸出文件和當前內存中的數據進行全局歸併排序,對key相同的元素會使用定義的function進行聚合,入口爲write()方法:
override def write(records: Iterator[Product2[K, V]]): Unit = { // 建立一個外部排序器,若是map端有預聚合,就傳入aggregator和keyOrdering,不然不須要傳入 sorter = if (dep.mapSideCombine) { new ExternalSorter[K, V, C]( context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. new ExternalSorter[K, V, V]( context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) } // 將數據放入ExternalSorter進行排序 sorter.insertAll(records) // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). // 建立一個輸出Wrtier val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions) // 將外部排序的數據寫入Writer sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) val partitionLengths = mapOutputWriter.commitAllPartitions() // 更新mapstatus mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) }
insertAll()方法:
def insertAll(records: Iterator[Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined // 是否須要map端聚合 if (shouldCombine) { // Combine values in-memory first using our AppendOnlyMap // 使用AppendOnlyMap在內存中聚合values // 獲取mergeValue()函數,將新值合併到當前聚合結果中 val mergeValue = aggregator.get.mergeValue // 獲取createCombiner()函數,建立聚合初始值 val createCombiner = aggregator.get.createCombiner var kv: Product2[K, V] = null // 若是一個key當前有聚合值,則合併,若是沒有建立初始值 val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } // 遍歷 while (records.hasNext) { // 增長讀取記錄數 addElementsRead() kv = records.next() // map爲PartitionedAppendOnlyMap,將分區和key做爲key,聚合值做爲value map.changeValue((getPartition(kv._1), kv._1), update) // 是否須要溢寫到磁盤 maybeSpillCollection(usingMap = true) } // 若是不須要map端聚合 } else { // Stick values into our buffer while (records.hasNext) { addElementsRead() val kv = records.next() // buffer爲PartitionedPairBuffer,將分區和key加進去 buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C]) // 是否須要溢寫到磁盤 maybeSpillCollection(usingMap = false) } } }
該方法主要是判斷在插入數據時,是否須要在map端進行預聚合,分別採用兩種數據結構來保存
maybeSpillCollection()方法裏面會調用maybeSpill()方法檢查是否須要溢寫,若是發生溢寫,從新構造一個map或者buffer結構從頭開始緩存,以下:
private def maybeSpillCollection(usingMap: Boolean): Unit = { var estimatedSize = 0L if (usingMap) { estimatedSize = map.estimateSize() // 判斷是否須要溢寫 if (maybeSpill(map, estimatedSize)) { map = new PartitionedAppendOnlyMap[K, C] } } else { estimatedSize = buffer.estimateSize() // 判斷是否須要溢寫 if (maybeSpill(buffer, estimatedSize)) { buffer = new PartitionedPairBuffer[K, C] } } if (estimatedSize > _peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize } } protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false // 若是讀取的記錄數是32的倍數,而且預估map或者buffer內存佔用大於默認的5m閾值 if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool // 嘗試申請2*currentMemory-5m的內存 val amountToRequest = 2 * currentMemory - myMemoryThreshold val granted = acquireMemory(amountToRequest) // 更新閾值 myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection // 判斷,若是仍是不夠,肯定溢寫 shouldSpill = currentMemory >= myMemoryThreshold } // 若是shouldSpill爲false,可是讀取的記錄數大於Integer.MAX_VALUE,也是須要溢寫 shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { // 溢寫次數+1 _spillCount += 1 logSpillage(currentMemory) // 溢寫緩存的集合 spill(collection) _elementsRead = 0 _memoryBytesSpilled += currentMemory // 釋放內存 releaseMemory() } shouldSpill }
maybeSpill()方法裏面會調用spill()進行溢寫,以下:
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = { // 根據給定的比較器進行排序,返回排序結果的迭代器 val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator) // 將迭代器中的數據溢寫到磁盤文件中 val spillFile = spillMemoryIteratorToDisk(inMemoryIterator) // ArrayBuffer記錄全部溢寫的文件 spills += spillFile }
spillMemoryIteratorToDisk()方法以下:
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator) : SpilledFile = { // Because these files may be read during shuffle, their compression must be controlled by // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use // createTempShuffleBlock here; see SPARK-3426 for more context. // 建立一個臨時塊 val (blockId, file) = diskBlockManager.createTempShuffleBlock() // These variables are reset after each flush var objectsWritten: Long = 0 val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics // 建立溢寫文件的DiskBlockObjectWriter val writer: DiskBlockObjectWriter = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics) // List of batch sizes (bytes) in the order they are written to disk // 記錄寫入批次大小 val batchSizes = new ArrayBuffer[Long] // How many elements we have in each partition // 記錄每一個分區條數 val elementsPerPartition = new Array[Long](numPartitions) // Flush the disk writer's contents to disk, and update relevant variables. // The writer is committed at the end of this process. // 將內存中的數據按批次刷寫到磁盤中 def flush(): Unit = { val segment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten = 0 } var success = false try { // 遍歷map或者buffer中的記錄 while (inMemoryIterator.hasNext) { val partitionId = inMemoryIterator.nextPartition() require(partitionId >= 0 && partitionId < numPartitions, s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})") // 寫入並更新計數值 inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) += 1 objectsWritten += 1 // 寫入條數達到10000條時,將這批刷寫到磁盤 if (objectsWritten == serializerBatchSize) { flush() } } // 遍歷完之後,將剩餘的刷寫到磁盤 if (objectsWritten > 0) { flush() } else { writer.revertPartialWritesAndClose() } success = true } finally { if (success) { writer.close() } else { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } } // 返回溢寫文件 SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition) }
接下來就是排序合併操做,調用ExternalSorter.writePartitionedMapOutput()方法:
def writePartitionedMapOutput( shuffleId: Int, mapId: Long, mapOutputWriter: ShuffleMapOutputWriter): Unit = { var nextPartitionId = 0 // 若是沒有發生溢寫 if (spills.isEmpty) { // Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer // 根據指定的比較器進行排序 val it = collection.destructiveSortedWritablePartitionedIterator(comparator) while (it.hasNext()) { val partitionId = it.nextPartition() var partitionWriter: ShufflePartitionWriter = null var partitionPairsWriter: ShufflePartitionPairsWriter = null TryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(partitionId) val blockId = ShuffleBlockId(shuffleId, mapId, partitionId) partitionPairsWriter = new ShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics) // 將分區內的數據依次取出 while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(partitionPairsWriter) } } { if (partitionPairsWriter != null) { partitionPairsWriter.close() } } nextPartitionId = partitionId + 1 } // 若是發生溢寫,將溢寫文件和緩存數據進行歸併排序,排序完成後按照分區依次寫入ShufflePartitionPairsWriter } else { // We must perform merge-sort; get an iterator by partition and write everything directly. // 這裏會進行歸併排序 for ((id, elements) <- this.partitionedIterator) { val blockId = ShuffleBlockId(shuffleId, mapId, id) var partitionWriter: ShufflePartitionWriter = null var partitionPairsWriter: ShufflePartitionPairsWriter = null TryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(id) partitionPairsWriter = new ShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics) if (elements.hasNext) { for (elem <- elements) { partitionPairsWriter.write(elem._1, elem._2) } } } { if (partitionPairsWriter != null) { partitionPairsWriter.close() } } nextPartitionId = id + 1 } } context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes) }
partitionedIterator()方法:
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer if (spills.isEmpty) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID // 若是沒有溢寫,而且沒有排序,只按照分區id排序 if (ordering.isEmpty) { // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None))) // 若是沒有溢寫可是排序,先按照分區id排序,再按key排序 } else { // We do need to sort by both partition ID and key groupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } } else { // Merge spilled and in-memory data // 若是有溢寫,就將溢寫文件和內存中的數據歸併排序 merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) } }
歸併方法以下:
private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)]) : Iterator[(Int, Iterator[Product2[K, C]])] = { // 讀取溢寫文件 val readers = spills.map(new SpillReader(_)) val inMemBuffered = inMemory.buffered // 遍歷分區 (0 until numPartitions).iterator.map { p => val inMemIterator = new IteratorForPartition(p, inMemBuffered) // 合併溢寫文件和內存中的數據 val iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator) // 若是有聚合邏輯,按分區聚合,對key按照keyComparator排序 if (aggregator.isDefined) { // Perform partial aggregation across partitions (p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined)) // 若是沒有聚合,可是有排序邏輯,按照ordering作歸併 } else if (ordering.isDefined) { // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey); // sort the elements without trying to merge them (p, mergeSort(iterators, ordering.get)) // 什麼都沒有直接歸併 } else { (p, iterators.iterator.flatten) } } }
在write()方法中調用commitAllPartitions()方法輸出數據,其中調用writeIndexFileAndCommit()方法寫出數據和索引文件,以下:
def writeIndexFileAndCommit( shuffleId: Int, mapId: Long, lengths: Array[Long], dataTmp: File): Unit = { // 建立索引文件和臨時索引文件 val indexFile = getIndexFile(shuffleId, mapId) val indexTmp = Utils.tempFileWith(indexFile) try { // 獲取shuffle data file val dataFile = getDataFile(shuffleId, mapId) // There is only one IndexShuffleBlockResolver per executor, this synchronization make sure // the following check and rename are atomic. // 對於每一個executor只有一個IndexShuffleBlockResolver,確保原子性 synchronized { // 檢查索引是否和數據文件已經有了對應關係 val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length) if (existingLengths != null) { // Another attempt for the same task has already written our map outputs successfully, // so just use the existing partition lengths and delete our temporary map outputs. // 若是存在對應關係,說明shuffle write已經完成,刪除臨時索引文件 System.arraycopy(existingLengths, 0, lengths, 0, lengths.length) if (dataTmp != null && dataTmp.exists()) { dataTmp.delete() } } else { // 若是不存在,建立一個BufferedOutputStream // This is the first successful attempt in writing the map outputs for this task, // so override any existing index and data files with the ones we wrote. val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp))) Utils.tryWithSafeFinally { // We take in lengths of each block, need to convert it to offsets. // 獲取每一個分區的大小,累加偏移量,寫入臨時索引文件 var offset = 0L out.writeLong(offset) for (length <- lengths) { offset += length out.writeLong(offset) } } { out.close() } // 刪除可能存在的其餘索引文件 if (indexFile.exists()) { indexFile.delete() } // 刪除可能存在的其餘數據文件 if (dataFile.exists()) { dataFile.delete() } // 將臨時文件重命名成正式文件 if (!indexTmp.renameTo(indexFile)) { throw new IOException("fail to rename file " + indexTmp + " to " + indexFile) } if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) { throw new IOException("fail to rename file " + dataTmp + " to " + dataFile) } } } } finally { if (indexTmp.exists() && !indexTmp.delete()) { logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}") } } }
Spark在初始化SparkEnv的時候,會在create()方法裏面初始化ShuffleManager,包含sort和tungsten-sort兩種shuffle
ShuffleManager是一個特質,核心方法有registerShuffle()、getReader()、getWriter(),
SortShuffleManager是ShuffleManager的惟一實現類,在registerShuffle()方法裏面選擇採用哪一種shuffle機制,getReader()方法只會返回一種BlockStoreShuffleReader,getWriter()方法根據不一樣的handle選擇不一樣的Writer,共有三種
BypassMergeSortShuffleWriter:若是當前shuffle依賴中沒有map端的聚合操做,而且分區數小於spark.shuffle.sort.bypassMergeThreshold的值,默認爲200,啓用bypass機制,核心方法有:write()、writePartitionedData()(合併全部分區文件,默認採用零拷貝方式)
UnsafeShuffleWriter:若是serializer支持relocation而且map端沒有聚合同時分區數目不大於16777215+1三個條件都知足,採用該Writer,核心方法有:write()、insertRecordIntoSorter()(將數據插入外部選擇器排序)、closeAndWriteOutput()(合併並輸出文件),前一個方法裏核心方法有:insertRecord()(將序列化數據插入外部排序器)、growPointerArrayIfNecessary()(若是須要額外空間須要對數組擴容或溢寫到磁盤)、spill()(溢寫到磁盤)、writeSortedFile()(將內存中的數據進行排序並寫出到磁盤文件中)、encodePageNumberAndOffset()(對當前數據的邏輯地址進行編碼,轉成long型),後面的方法裏核心方法有:mergeSpills()(合併溢寫文件),合併文件的時候有BIO和NIO兩種方式
SortShuffleWriter:若是上面二者都不知足,採用該Writer,該Writer會使用PartitionedAppendOnlyMap或PartitionedPariBuffer在內存中進行排序,若是超過內存限制,會溢寫到文件中,在全局輸出有序文件的時候,對以前的全部輸出文件和當前內存中的數據進行全局歸併排序,對key相同的元素會使用定義的function進行聚合核心方法有:write()、insertAll()(將數據放入ExternalSorter進行排序)、maybeSpillCollection()(是否須要溢寫到磁盤)、maybeSpill()、spill()、spillMemoryIteratorToDisk()(將內存中數據溢寫到磁盤)、writePartitionedMapOutput()、commitAllPartitions()裏面調用writeIndexFileAndCommit()方法寫出數據和索引文件