本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
ShuffleExternalSorter和 ExternalSorter 外部排序器功能相似,可是也有不一樣的地方。不過在詳細剖析ShuffleExternalSorter以前,咱們先看看ShuffleExternalSorter在下圖中所處的位置。能夠看到最終的調用方是unsafeShuffleWriter。在下一節,我會詳細剖析UnsafeShuffleWriter。數組
ShuffleInMemorySorter :用於在內存中對插入的記錄進行排序,算法仍是TimSort。緩存
spills :溢出文件的元數據信息列表。數據結構
numElementsForSpillThreshold :磁盤溢出的元素數量。能夠經過spark.shuffle.spill.numElementsForceSpillThreshold屬性來進行配置,默認是1M架構
taskMemoryManager:app
allocatedPages:已經分配的Page列表(即MemoryBlock)列表框架
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
* itself)。
複製代碼
數據溢出,經過inMemSorter.numRecords() >= numElementsForSpillThreshold來判斷,若知足直接溢出操做。oop
growPointerArrayIfNecessary:進行空間檢查和數據容量擴容。post
acquireNewPageIfNecessary:進行空間檢查,若不知足申請新page。學習
Platform.copyMemory:將數據拷貝到Page所表明的的內存塊中。
inMemSorter.insertRecord:將記錄的元數據存到內部的長整型數組中,便於排序。其中高24位是存儲分區ID,中間13位爲存儲頁號,低27位存儲偏移量。
Write a record to the shuffle sorter.
public void insertRecord(Object recordBase, long recordOffset, int length, int partitionId)
throws IOException {
// for tests
assert(inMemSorter != null);
if (inMemSorter.numRecords() >= numElementsForSpillThreshold) { <= 神來之筆
logger.info("Spilling data because number of spilledRecords crossed the threshold " +
numElementsForSpillThreshold);
spill();
}
growPointerArrayIfNecessary(); <= 神來之筆
// Need 4 bytes to store the record length.
final int required = length + 4;
acquireNewPageIfNecessary(required);
assert(currentPage != null);
final Object base = currentPage.getBaseObject();
final long recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);
Platform.putInt(base, pageCursor, length);
pageCursor += 4;
Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length); <= 神來之筆
pageCursor += length;
inMemSorter.insertRecord(recordAddress, partitionId); <= 神來之筆,排序後寫入內存。
}
複製代碼
writeSortedFile:做用在於將內存中的記錄排序後輸出到磁盤中,排序規則有兩種: 一種:對分區ID進行排序。二種是採用基數排序(Radix Sort)
public long spill(long size, MemoryConsumer trigger) throws IOException {
if (trigger != this || inMemSorter == null || inMemSorter.numRecords() == 0) {
return 0L;
}
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
spills.size(),
spills.size() > 1 ? " times" : " time");
writeSortedFile(false); <= 神來之筆
final long spillSize = freeMemory();
inMemSorter.reset();
// Reset the in-memory sorter's pointer array only after freeing up the memory pages
holding the records. Otherwise, if the task is over allocated memory, then without freeing the memory pages, we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
return spillSize;
}
複製代碼
本篇須要挖掘的點還有不少,鑑於可參考的資料太少,只能暫時到此結束,後續會繼續完善
秦凱新 於深圳