本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
一張圖我已經用過屢次了,不要見怪,由於畢竟都是一個主題,有關shuffle的。英文註釋已經很詳細了,這裏簡單介紹一下:數組
官方英文介紹以下:緩存
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the
* driver and on each executor, based on the spark.shuffle.manager setting. The driver
* registers shuffles with it, and executors (or tasks running locally in the driver) can ask * to read and write data.
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
複製代碼
從命名來看,絕對是投機取巧,繞開合併和排序的ShuffleWriter,姑且稱之爲投機俠吧。架構
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
* writes incoming records to separate files, one file per reduce partition, then concatenates these
* per-partition files to form a single output file, regions of which are served to reducers.
* Records are not buffered in memory. It writes output in a format
複製代碼
BypassMergeSortShuffleWriter但是直接開掛的節奏,徹底沒有什麼排序器啊,我來承擔一切。我最屌,我承擔一切,心聲,嘿嘿。app
partitionWriters : 看看初始化爲數組 ==> private DiskBlockObjectWriter[] partitionWriters,每個DiskBlockObjectWriter負責處理一個分區的數據。框架
private final int fileBufferSize ==>文件緩衝大小,經過Spark.shuffle.file.buffer屬性配置,默認是32KB。oop
private final boolean transferToEnabled => 是否採用NIO的從文件流待文件流的複製方式,spark.file.transferTo屬性配置,默認是true。post
private final int numPartitions => 分區數學習
private final BlockManager blockManager優化
private final Partitioner partitioner => 分區計算器
private final ShuffleWriteMetrics writeMetrics
private final int shuffleId;
private final int mapId ==>map任務的身份標識。
private final Serializer serializer;
private final IndexShuffleBlockResolver shuffleBlockResolver
private FileSegment[] partitionWriterSegments ==>FileSegment數組,每個DiskBlockObjectWriter對應一個分區,也所以對應一個處理的文件片。
@Nullable private MapStatus mapStatus;
private long[] partitionLengths;
先欣賞代碼段:
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions]; <=點睛之筆
partitionWriterSegments = new FileSegment[numPartitions]; <=點睛之筆
for (int i = 0; i < numPartitions; i++) { <=點睛之筆(按照分區來寫片斷)
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] = <=點睛之筆(獲得不一樣分區的DiskBlockObjectWriter)
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
partitionWriterSegments[i] = writer.commitAndGet(); <= 生成一堆臨時文件,寫入到磁盤
writer.close();
}
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); <==獲取一堆臨時文件
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp); <==多個分區文件合併
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
<==生成索引
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
複製代碼
聚合每個分區文件爲正式的Block文件
Concatenate all of the per-partition files into a single combined file.
private long[] writePartitionedFile(File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
// We were passed an empty iterator
return lengths;
}
final FileOutputStream out = new FileOutputStream(outputFile, true);
final long writeStartTime = System.nanoTime();
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
}
複製代碼
不廢話,這張圖簡直畫的太好了,望原圖做者看到留言於我。
本節內容是做者投入大量時間優化後的內容,採用最平實的語言來剖析 ShuffeManager之統一存儲服務BypassMergeSortShuffleWriter設計思路。
秦凱新 於深圳 0:53分