本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
一張圖我已經用過屢次了,不要見怪,由於畢竟都是一個主題,有關shuffle的。英文註釋已經很詳細了,這裏簡單介紹一下:數組
官方英文介紹以下:緩存
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the
* driver and on each executor, based on the spark.shuffle.manager setting. The driver
* registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
複製代碼
static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;架構
private final BlockManager blockManager;app
private final IndexShuffleBlockResolver shuffleBlockResolver;框架
private final TaskMemoryManager memoryManager;oop
private final SerializerInstance serializer;post
private final Partitioner partitioner;學習
private final ShuffleWriteMetrics writeMetrics;this
private final int shuffleId;
private final int mapId;
private final TaskContext taskContext;
private final SparkConf sparkConf;
private final boolean transferToEnabled => 是否採用NIO的從文件流待文件流的複製方式,spark.file.transferTo屬性配置,默認是true。
private final int initialSortBufferSize =>初始化的排序緩衝大小,能夠經過spark.shuffle.sort.initialBuffer.size屬性設置,默認是4096
private final int inputBufferSizeInBytes;
private final int outputBufferSizeInBytes;
@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
private long peakMemoryUsedBytes = 0; =>使用內存的峯值
看看精彩的代碼段:
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
// Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
try {
while (records.hasNext()) {
insertRecordIntoSorter(records.next()); <=點睛之筆(將mapTask數據寫入排序器)
}
closeAndWriteOutput(); <=點睛之筆(將mapTask數據持久化到磁盤)
success = true;
} finally {
if (sorter != null) {
try {
sorter.cleanupResources();
} catch (Exception e) {
// Only throw this error if we won't be masking another
// error.
if (success) {
throw e;
} else {
logger.error("In addition to a failure during writing, we failed during " +
"cleanup.", e);
}
}
}
}
}
複製代碼
將mapTask數據寫入排序器,實現內存中排序,可是無聚合
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key); <=點睛之筆
serBuffer.reset();
serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
serOutputStream.flush();
final int serializedRecordSize = serBuffer.size();
assert (serializedRecordSize > 0);
sorter.insertRecord( <=點睛之筆,將serBuffer字節數組寫入Tungsten
serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
}
複製代碼
將mapTask數據持久化到磁盤
void closeAndWriteOutput() throws IOException {
assert(sorter != null);
updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
sorter = null;
final long[] partitionLengths;
final File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
final File tmp = Utils.tempFileWith(output);
try {
try {
partitionLengths = mergeSpills(spills, tmp); <=點睛之筆(合併全部溢出文件爲正式Block文件)
} finally {
for (SpillInfo spill : spills) {
if (spill.file.exists() && ! spill.file.delete()) {
logger.error("Error while deleting spill file {}", spill.file.getPath());
}
}
}
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); <=點睛之筆(寫索引)
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
複製代碼
UnsafeShuffleWriter內部主要使用Tungsten緩存,固然也可能使用JVM內存。和ExternalSortWriter有明顯的區別。
秦凱新 於深圳 1:19