spark shuffle原理

1.spark中窄依賴的時候不須要shuffle,只有寬依賴的時候須要shuffle,mapreduce中map到reduce必須通過shufflegit

2.spark中的shuffle fetch的時候進行merge操做利用aggregator來進行,其實是個hashmap,放在內存中github

1 // Map: "cat" -> c, cat
2 val rdd1 = rdd.Map(x => (x.charAt(0), x))
3 // groupby same key and count
4 val rdd2 = rdd1.groupBy(x => x._1).
5                 Map(x => (x._1, x._2.toList.length))

第一個 Map 操做將 RDD 裏的各個元素進行映射, RDD 的各個數據元素之間不存在依賴,能夠在集羣的各個內存中獨立計算,也就是並行化,第二個 groupby 以後的 Map 操做,爲了計算相同 key 下的元素個數,須要把相同 key 的元素彙集到同一個 partition 下,因此形成了數據在內存中的從新分佈,即 shuffle 操做.shuffle 操做是 spark 中最耗時的操做,應儘可能避免沒必要要的 shuffle算法

  寬依賴主要有兩個過程: shuffle write 和 shuffle fetch. 相似 Hadoop 的 Map 和 Reduce 階段.shuffle write 將 ShuffleMapTask 任務產生的中間結果緩存到內存中, shuffle fetch 得到 ShuffleMapTask 緩存的中間結果進行 ShuffleReduceTask 計算,這個過程容易形成OutOfMemory
  shuffle 過程內存分配使用 ShuffleMemoryManager 類管理,會針對每一個 Task 分配內存,Task 任務完成後經過 Executor 釋放空間.這裏能夠把 Task 理解成不一樣 key 的數據對應一個 Task. 早期的內存分配機制使用公平分配,即不一樣 Task 分配的內存是同樣的,可是這樣容易形成內存需求過多的 Task 的 OutOfMemory, 從而形成多餘的 磁盤 IO 過程,影響總體的效率.(例:某一個 key 下的數據明顯偏多,但由於你們內存都同樣,這一個 key 的數據就容易 OutOfMemory).1.5版之後 Task 共用一個內存池,內存池的大小默認爲 JVM 最大運行時內存容量的16%,分配機制以下:假若有 N 個 Task,ShuffleMemoryManager 保證每一個 Task 溢出以前至少能夠申請到1/2N 內存,且至多申請到1/N,N 爲當前活動的 shuffle Task 數,由於N 是一直變化的,因此 manager 會一直追蹤 Task 數的變化,從新計算隊列中的1/N 和1/2N.可是這樣仍然容易形成內存須要多的 Task 任務溢出,因此最近有不少相關的研究是針對 shuffle 過程內存優化的.apache

spark shuffle process

早期的shuffle write有兩個比較大的問題:緩存

  1. Map的輸出必須先所有存儲到內存中,而後寫入磁盤。這對內存是一個很是大的開銷,當內存不足以存儲全部的Map output時就會出現OOM。
  2. 每個Mapper都會產生Reducer number個shuffle文件,若是Mapper個數是1k,Reducer個數也是1k,那麼就會產生1M個shuffle文件,這對於文件系統是一個很是大的負擔。同時在shuffle數據量不大而shuffle文件又很是多的狀況下,隨機寫也會嚴重下降IO的性能。

Spark 0.8顯著減小了shuffle的內存壓力,如今Map output不須要先所有存儲在內存中,再flush到硬盤,而是record-by-record寫入到磁盤中。網絡

爲了解決shuffle文件過多的狀況,Spark 0.8.1引入了新的shuffle consolidation,以期顯著減小shuffle文件的數量。app

spark shuffle  consolidation process

假定該job有4個Mapper和4個Reducer,有2個core,也就是能並行運行兩個task。咱們能夠算出Spark的shuffle write共須要16個bucket,也就有了16個write handler。在以前的Spark版本中,每個bucket對應的是一個文件,所以在這裏會產生16個shuffle文件。框架

而在shuffle consolidation中每個bucket並不是對應一個文件,而是對應文件中的一個segment,同時shuffle consolidation所產生的shuffle文件數量與Spark core的個數也有關係。在上面的圖例中,job的4個Mapper分爲兩批運行,在第一批2個Mapper運行時會申請8個bucket,產生8個shuffle文件;而在第二批Mapper運行時,申請的8個bucket並不會再產生8個新的文件,而是追加寫到以前的8個文件後面,這樣一共就只有8個shuffle文件,而在文件內部這有16個不一樣的segment。所以從理論上講shuffle consolidation所產生的shuffle文件數量爲C×R,其中C是Spark集羣的core number,R是Reducer的個數。socket

須要注意的是當 M=C時shuffle consolidation所產生的文件數和以前的實現是同樣的。oop

Shuffle consolidation顯著減小了shuffle文件的數量,解決了以前版本一個比較嚴重的問題,可是writer handler的buffer開銷過大依然沒有減小,若要減小writer handler的buffer開銷,咱們只能減小Reducer的數量,可是這又會引入新的問題,下文將會有詳細介紹。

Shuffle Fetch and Aggregator

Shuffle write寫出去的數據要被Reducer使用,就須要shuffle fetcher將所需的數據fetch過來,這裏的fetch包括本地和遠端,由於shuffle數據有可能一部分是存儲在本地的。Spark對shuffle fetcher實現了兩套不一樣的框架:NIO經過socket鏈接去fetch數據;OIO經過netty server去fetch數據。分別對應的類是BasicBlockFetcherIteratorNettyBlockFetcherIterator

在Spark 0.7和更早的版本中,只支持BasicBlockFetcherIterator,而BasicBlockFetcherIterator在shuffle數據量比較大的狀況下performance始終不是很好,沒法充分利用網絡帶寬,爲了解決這個問題,添加了新的shuffle fetcher來試圖取得更好的性能。對於早期shuffle性能的評測能夠參看Spark usergroup。固然如今BasicBlockFetcherIterator的性能也已經好了不少,使用的時候能夠對這兩種實現都進行測試比較。

接下來講一下aggregator。咱們都知道在Hadoop MapReduce的shuffle過程當中,shuffle fetch過來的數據會進行merge sort,使得相同key下的不一樣value按序歸併到一塊兒供Reducer使用,這個過程能夠參看下圖:

mapreduce shuffle process

全部的merge sort都是在磁盤上進行的,有效地控制了內存的使用,可是代價是更多的磁盤IO。

那麼Spark是否也有merge sort呢,仍是以別的方式實現,下面咱們就細細說明。

首先雖然Spark屬於MapReduce體系,可是對傳統的MapReduce算法進行了必定的改變。Spark假定在大多數用戶的case中,shuffle數據的sort不是必須的,好比word count,強制地進行排序只會使性能變差,所以Spark並不在Reducer端作merge sort。既然沒有merge sort那Spark是如何進行reduce的呢?這就要說到aggregator了。

aggregator本質上是一個hashmap,它是以map output的key爲key,以任意所要combine的類型爲value的hashmap。當咱們在作word count reduce計算count值的時候,它會將shuffle fetch到的每個key-value pair更新或是插入到hashmap中(若在hashmap中沒有查找到,則插入其中;若查找到則更新value值)。這樣就不須要預先把全部的key-value進行merge sort,而是來一個處理一個,省下了外部排序這一步驟。但同時須要注意的是reducer的內存必須足以存放這個partition的全部key和count值,所以對內存有必定的要求。

在上面word count的例子中,由於value會不斷地更新,而不須要將其所有記錄在內存中,所以內存的使用仍是比較少的。考慮一下若是是group by key這樣的操做,Reducer須要獲得key對應的全部value。在Hadoop MapReduce中,因爲有了merge sort,所以給予Reducer的數據已是group by key了,而Spark沒有這一步,所以須要將key和對應的value所有存放在hashmap中,並將value合併成一個array。能夠想象爲了可以存放全部數據,用戶必須確保每個partition足夠小到內存可以容納,這對於內存是一個很是嚴峻的考驗。所以Spark文檔中建議用戶涉及到這類操做的時候儘可能增長partition,也就是增長Mapper和Reducer的數量。

增長Mapper和Reducer的數量當然能夠減少partition的大小,使得內存能夠容納這個partition。可是咱們在shuffle write中提到,bucket和對應於bucket的write handler是由Mapper和Reducer的數量決定的,task越多,bucket就會增長的更多,由此帶來write handler所需的buffer也會更多。在一方面咱們爲了減小內存的使用採起了增長task數量的策略,另外一方面task數量增多又會帶來buffer開銷更大的問題,所以陷入了內存使用的兩難境地。

爲了減小內存的使用,只能將aggregator的操做從內存移到磁盤上進行,Spark社區也意識到了Spark在處理數據規模遠遠大於內存大小時所帶來的問題。所以PR303提供了外部排序的實現方案,相信在Spark 0.9 release的時候,這個patch應該能merge進去,到時候內存的使用量能夠顯著地減小。

相關文章
相關標籤/搜索