This is my second article about Apache Spark architecture and today I will be more specific and tell you about the shuffle, one of the most interesting topics in the overall Spark design. The previous part was mostly about general Spark architecture and its memory management. It can be accessed here. The next one is about Spark memory management and it is available here. 這是我第二篇關於Spark架構的文章,我將更加側重shuffle部分,它是Spark設計中最有意義的話題。前面的話題是Spark的內存管理的文章,連接在 https://0x0fff.com/spark-architecture/ https://0x0fff.com/spark-memory-management/ 本文連接: https://0x0fff.com/spark-architecture-shuffle/架構
[這裏是shuffle stage切分的圖片]app
What is the shuffle in general? Imagine that you have a list of phone call detail records in a table and you want to calculate amount of calls happened each day. This way you would set the 「day」 as your key, and for each record (i.e. for each call) you would emit 「1」 as a value. After this you would sum up values for each key, which would be an answer to your question – total amount of records for each day. But when you store the data across the cluster, how can you sum up the values for the same key stored on different machines? The only way to do so is to make all the values for the same key be on the same machine, after this you would be able to sum them up.less
一般說的shuffle是什麼?假設你有一張表,裏面是一些電話通話詳單記錄,你想計算出天天的通話量。這樣的話,第一步:你須要把「天」設計爲你的key,每條打電話的記錄後面標誌一個數量「1」。而後:將將單獨的每個key的數量進行sum求和,這樣的話,你就獲得每一天的電話通話數據量。可是在一個集羣中,數據分佈在不一樣的機器上,你怎麼在不一樣的機器上進行求和呢?惟一的辦法就是在一臺機器上存着相同的key,而後你再進行求和操做。ide
There are many different tasks that require shuffling of the data across the cluster, for instance table join – to join two tables on the field 「id」, you must be sure that all the data for the same values of 「id」 for both of the tables are stored in the same chunks. Imagine the tables with integer keys ranging from 1 to 1’000’000. By storing the data in same chunks I mean that for instance for both tables values of the key 1-100 are stored in a single partition/chunk, this way instead of going through the whole second table for each partition of the first one, we can join partition with partition directly, because we know that the key values 1-100 are stored only in these two partitions. To achieve this both tables should have the same number of partitions, this way their join would require much less computations. So now you can understand how important shuffling is.oop
有不少任務須要shuffle數據,好比:表join,在兩張表的id字段上進行join。你必須肯定全部的數據中兩張表的同一個id的數據存在相同的數據塊上。假設表的id範圍從1到1000000(一百萬)。好比:這兩張表id爲1-100的數據存儲在一個partition或者chunk(塊)上,經過這樣的方式,我對第一張表1-100的數據和第二張表的數據進行join,就不用在讀取完第一張表的一個分區後,而後去遍歷第二張表的全部數據。由於第一張表的1-100在一個partition上,第二張表的1-100在一個partition上,我能夠對着兩個partition直接進行join.爲了減小join的大量的遍歷計算,咱們就須要這兩張表有相同數量的partitions。如今你應該瞭解到shuffle有多麼重要。優化
Discussing this topic, I would follow the MapReduce naming convention. In the shuffle operation, the task that emits the data in the source executor is 「mapper」, the task that consumes the data into the target executor is 「reducer」, and what happens between them is 「shuffle」.ui
在討論shuffle中,咱們能夠看看MapReduce的過程當中,在shuffle操做中,數據從mapper task到reducer task,中間發生的過程就是shuffle操做。this
Shuffling in general has 2 important compression parameters: spark.shuffle.compress – whether the engine would compress shuffle outputs or not, and spark.shuffle.spill.compress – whether to compress intermediate shuffle spill files or not. Both have the value 「true」 by default, and both would use spark.io.compression.codec codec for compressing the data, which is snappy by default.spa
shffle有兩個重要的壓縮參數:spark.shuffle.compress(引擎是否壓縮shuffle的輸出),spark.shuffle.spill.compress(是否壓縮shuffle中間spill的文件)。他們默認值都是true,他們都使用spark.io.compression.codec配置的編譯碼器來壓縮數據,默認是snappy的實現。設計
As you might know, there are a number of shuffle implementations available in Spark. Which implementation would be used in your particular case is determined by the value of spark.shuffle.manager parameter. Three possible options are: hash, sort, tungsten-sort, and the 「sort」 option is default starting from Spark 1.2.0.
你如今知道,在Spark中有一些shuffle的實現。在特定的使用場景下使用什麼樣的實現,由參數spark.shuffle.manager決定。有三個可用選項,hash,tungsten-sort,sort(since version 1.2.0)
Hash Shuffle
Prior to Spark 1.2.0 this was the default option of shuffle (spark.shuffle.manager = hash). But it has many drawbacks, mostly caused by the amount of files it creates – each mapper task creates separate file for each separate reducer, resulting in M * R total files on the cluster, where M is the number of 「mappers」 and R is the number of 「reducers」. With high amount of mappers and reducers this causes big problems, both with the output buffer size, amount of open files on the filesystem, speed of creating and dropping all these files. Here’s a good example of how Yahoo faced all these problems, with 46k mappers and 46k reducers generating 2 billion files on the cluster.
Hash Shuffle
hash是Spark 1.2.0的默認shuffle方式,可是它有不少很差的地方,它在shuffle的過程當中會產生不少的文件-每個mapper任務會爲每個reducer任務建立單獨的文件,這樣在集羣中會建立M x R個文件(M爲mappers的個數 R爲reducers的個數)。若是mapper的個數和reduecer的個數比較多,這樣產生buffer size比較大,系統中也會打開不少的文件。Yahoo有過這樣的問題,46k mapper個數和46k的reducer數量,在集羣中產生了20億個小文件。
The logic of this shuffler is pretty dumb: it calculates the amount of 「reducers」 as the amount of partitions on the 「reduce」 side, creates a separate file for each of them, and looping through the records it needs to output, it calculates target partition for each of them and outputs the record to the corresponding file.
shuffle的邏輯很是笨:它先用reducers的個數做爲reduce端partitions的個數,而後爲每個partition建立一份單獨的文件,而後遍歷它須要輸出的結果,而後計算每個partition,而後輸出結果到相符的文件上。
Here is how it looks like:
這裏是圖
There is an optimization implemented for this shuffler, controlled by the parameter 「spark.shuffle.consolidateFiles」 (default is 「false」). When it is set to 「true」, the 「mapper」 output files would be consolidated. If your cluster has E executors (「–num-executors」 for YARN) and each of them has C cores (「spark.executor.cores」 or 「–executor-cores」 for YARN) and each task asks for T CPUs (「spark.task.cpus「), then the amount of execution slots on the cluster would be E * C / T, and the amount of files created during shuffle would be E * C / T * R. With 100 executors 10 cores each allocating 1 core for each task and 46000 「reducers」 it would allow you to go from 2 billion files down to 46 million files, which is much better in terms of performance.
對這個shuffler的管理器有一個優化點,經過修改**「spark.shuffle.consolidateFiles」(默認爲false),當它設置成true的時候,mapper的輸出文件將會合並。當你的集羣中有E個executors(「–num-executors」 for YARN),每一個executor有C個cores(「spark.executor.cores」 or 「–executor-cores」 for YARN),每個task須要T個CPUs(「spark.task.cpus「),這樣集羣中的execution slots數量將會有E * C / T**,在shuffle過程當中建立的文件數量將會E * C / T * R。100個executors,每一個10cores,一個task有1個core,那麼46000 reducers,那shuffle建立的文件數量將從20億降爲4600萬,這樣更好一些。
This feature is implemented in a rather straightforward way: instead of creating new file for each of the reducers, it creates a pool of output files. When map task starts outputting the data, it requests a group of R files from this pool. When it is finished, it returns this R files group back to the pool. As each executor can execute only C / T tasks in parallel, it would create only C / T groups of output files, each group is of R files. After the first C / T parallel 「map」 tasks has finished, each next 「map」 task would reuse an existing group from this pool.
這個特性的實現使用一個直觀的方式:它不是在每一個reducers中建立新的文件,而是建立一個輸出文件的池。當map任務開始輸出數據的的時候,它須要從這個池中獲取一個R個文件的組。當它完成後,它將這R個文件返回到池子中。每個executor僅有 C/T的任務在並行,它將會建立C/T個輸出文件組,每個組有R個文件。當第一個C/T並行的map任務結束後,下一個map任務將會重複使用池中的這個文件組。
Here’s a general diagram of how it works: 這裏是一個它工做的圖表: 這裏是圖