[Spark][翻譯]Spark 架構: Shuffle過程分析

This is my second article about Apache Spark architecture and today I will be more specific and tell you about the shuffle, one of the most interesting topics in the overall Spark design. The previous part was mostly about general Spark architecture and its memory management. It can be accessed here. The next one is about Spark memory management and it is available here. 這是我第二篇關於Spark架構的文章,我將更加側重shuffle部分,它是Spark設計中最有意義的話題。前面的話題是Spark的內存管理的文章,連接在 https://0x0fff.com/spark-architecture/ https://0x0fff.com/spark-memory-management/ 本文連接: https://0x0fff.com/spark-architecture-shuffle/架構

[這裏是shuffle stage切分的圖片]app

What is the shuffle in general? Imagine that you have a list of phone call detail records in a table and you want to calculate amount of calls happened each day. This way you would set the 「day」 as your key, and for each record (i.e. for each call) you would emit 「1」 as a value. After this you would sum up values for each key, which would be an answer to your question – total amount of records for each day. But when you store the data across the cluster, how can you sum up the values for the same key stored on different machines? The only way to do so is to make all the values for the same key be on the same machine, after this you would be able to sum them up.less


There are many different tasks that require shuffling of the data across the cluster, for instance table join – to join two tables on the field 「id」, you must be sure that all the data for the same values of 「id」 for both of the tables are stored in the same chunks. Imagine the tables with integer keys ranging from 1 to 1’000’000. By storing the data in same chunks I mean that for instance for both tables values of the key 1-100 are stored in a single partition/chunk, this way instead of going through the whole second table for each partition of the first one, we can join partition with partition directly, because we know that the key values 1-100 are stored only in these two partitions. To achieve this both tables should have the same number of partitions, this way their join would require much less computations. So now you can understand how important shuffling is.oop


Discussing this topic, I would follow the MapReduce naming convention. In the shuffle operation, the task that emits the data in the source executor is 「mapper」, the task that consumes the data into the target executor is 「reducer」, and what happens between them is 「shuffle」.ui

Shuffling in general has 2 important compression parameters: spark.shuffle.compress – whether the engine would compress shuffle outputs or not, and spark.shuffle.spill.compress – whether to compress intermediate shuffle spill files or not. Both have the value 「true」 by default, and both would use spark.io.compression.codec codec for compressing the data, which is snappy by default.spa


As you might know, there are a number of shuffle implementations available in Spark. Which implementation would be used in your particular case is determined by the value of spark.shuffle.manager parameter. Three possible options are: hash, sort, tungsten-sort, and the 「sort」 option is default starting from Spark 1.2.0.

Hash Shuffle

Prior to Spark 1.2.0 this was the default option of shuffle (spark.shuffle.manager = hash). But it has many drawbacks, mostly caused by the amount of files it creates – each mapper task creates separate file for each separate reducer, resulting in M * R total files on the cluster, where M is the number of 「mappers」 and R is the number of 「reducers」. With high amount of mappers and reducers this causes big problems, both with the output buffer size, amount of open files on the filesystem, speed of creating and dropping all these files. Here’s a good example of how Yahoo faced all these problems, with 46k mappers and 46k reducers generating 2 billion files on the cluster.

The logic of this shuffler is pretty dumb: it calculates the amount of 「reducers」 as the amount of partitions on the 「reduce」 side, creates a separate file for each of them, and looping through the records it needs to output, it calculates target partition for each of them and outputs the record to the corresponding file.


Here is how it looks like:


There is an optimization implemented for this shuffler, controlled by the parameter 「spark.shuffle.consolidateFiles」 (default is 「false」). When it is set to 「true」, the 「mapper」 output files would be consolidated. If your cluster has E executors (「–num-executors」 for YARN) and each of them has C cores (「spark.executor.cores」 or 「–executor-cores」 for YARN) and each task asks for T CPUs (「spark.task.cpus「), then the amount of execution slots on the cluster would be E * C / T, and the amount of files created during shuffle would be E * C / T * R. With 100 executors 10 cores each allocating 1 core for each task and 46000 「reducers」 it would allow you to go from 2 billion files down to 46 million files, which is much better in terms of performance.

This feature is implemented in a rather straightforward way: instead of creating new file for each of the reducers, it creates a pool of output files. When map task starts outputting the data, it requests a group of R files from this pool. When it is finished, it returns this R files group back to the pool. As each executor can execute only C / T tasks in parallel, it would create only C / T groups of output files, each group is of R files. After the first C / T parallel 「map」 tasks has finished, each next 「map」 task would reuse an existing group from this pool.

Here’s a general diagram of how it works: 這裏是一個它工做的圖表: 這裏是圖
