Shuffle,翻譯成中文就是洗牌。之因此須要Shuffle,仍是由於具備某種共同特徵的一類數據須要最終匯聚(aggregate)到一個計算節點上進行計算。這些數據分佈在各個存儲節點上而且由不一樣節點的計算單元處理。以最簡單的Word Count爲例,其中數據保存在Node一、Node2和Node3;apache
通過處理後,這些數據最終會匯聚到Nodea、Nodeb處理,以下圖所示:緩存
這個數據從新打亂而後匯聚到不一樣節點的過程就是Shuffle。可是實際上,Shuffle過程可能會很是複雜,有如下幾個問題:網絡
1)數據量會很大,好比單位爲TB或PB的數據分散到幾百甚至數千、數萬臺機器上。架構
2)爲了將這個數據匯聚到正確的節點,須要將這些數據放入正確的Partition,由於數據大小已經大於節點的內存,所以這個過程當中可能會發生屢次硬盤續寫。app
3)爲了節省帶寬,這個數據可能須要壓縮,如何在壓縮率和壓縮解壓時間中間作一個比較好的選擇?oop
4)數據須要經過網絡傳輸,所以數據的序列化和反序列化也變得相對複雜。性能
通常來講,每一個Task處理的數據能夠徹底載入內存(若是不能,能夠減少每一個Partition的大小),所以Task能夠作到在內存中計算。可是對於Shuffle來講,若是不持久化這個中間結果,一旦數據丟失,就須要從新計算依賴的所有RDD,所以有必要持久化這個中間結果。因此這就是爲何Shuffle過程會產生文件的緣由。測試
Shuffle Write數據是如何持久化到文件中,以使得下游的Task能夠獲取到其須要處理的數據的(即Shuffle Read)。在Spark 0.8以前,Shuffle Write是持久化到緩存的,但後來發現實際應用中,shuffle過程帶來的數據一般是巨量的,因此常常會發生內存溢出的狀況,因此在Spark 0.8之後,Shuffle Write會將數據持久化到硬盤,再以後Shuffle Write不斷進行演進優化,可是數據落地到本地文件系統的實現並無改變。優化
在Spark1.0之前,Spark只支持Hash Based Shuffle。由於在不少運算場景中並不須要排序,所以多餘的排序只能使性能變差,好比Hadoop的Map Reduce就是這麼實現的,也就是Reducer拿到的數據都是已經排好序的。spa
實際上Spark的實現很簡單:每一個Shuffle Map Task根據key的哈希值,計算出每一個key須要寫入的Partition而後將數據單獨寫入一個文件,這個Partition實際上就對應了下游的一個Shuffle Map Task或者Result Task。所以下游的Task在計算時會經過網絡(若是該Task與上游的Shuffle Map Task運行在同一個節點上,那麼此時就是一個本地的硬盤讀寫)讀取這個文件並進行計算。
Hash Based Shuffle Write存在的問題:
因爲每一個Shuffle Map Task須要爲每一個下游的Task建立一個單獨的文件,所以文件的數量就是:number(shuffle_map_task)*number(following_task)。
若是Shuffle Map Task是1000,下游的Task是500,那麼理論上會產生500000個文件(對於size爲0的文件Spark有特殊的處理)。生產環境中Task的數量實際上會更多,所以這個簡單的實現會帶來如下問題:
1)每一個節點可能會同時打開多個文件,每次打開文件都會佔用必定內存。假設每一個Write Handler的默認須要100KB的內存,那麼同時打開這些文件須要50GB的內存,對於一個集羣來講,仍是有必定的壓力的。尤爲是若是Shuffle Map Task和下游的Task同時增大10倍,那麼總體的內存就增加到5TB。
2)從總體的角度來看,打開多個文件對於系統來講意味着隨機讀,尤爲是每一個文件比較小可是數量很是多的狀況。而如今機械硬盤在隨機讀方面的性能特別差,很是容易成爲性能的瓶頸。若是集羣依賴的是固態硬盤,也許狀況會改善不少,可是隨機寫的性能確定不如順序寫的。
在Spark1.2.0中,Spark Core的一個重要的升級就是將默認的Hash Based Shuffle換成了Sort Based Shuffle,即spark.shuffle.manager從Hash換成了Sort,對應的實現類分別是以下兩個類:
org.apache.spark.shuffle.hash.HashShuffleManager
org.apache.spark.shuffle.sort.SortShuffleManager
那麼Sort Based Shuffle「取代」Hash Based Shuffle做爲默認選項的緣由是什麼?
正如前面提到的,Hash Based Shuffle的每一個Mapper都須要爲每一個Reducer寫一個文件,供Reducer讀取,即須要產生M*R個數量的文件,若是Mapper和Reducer的數量比較大,產生的文件數會很是多。
而Sort Based Shuffle的模式是:每一個Shuffle Map Task不會爲每一個Reducer生成一個單獨的文件;相反,它會將全部的結果寫到一個文件裏,同時會生成一個Index文件,
Reducer能夠經過這個Index文件取得它須要處理的數據。避免產生大量文件的直接收益就是節省了內存的使用和順序Disk IO帶來的低延時。節省內存的使用能夠減小GC的風險和頻率。而減小文件的數量能夠避免同時寫多個文件給系統帶來的壓力。
Sort Based Write實現詳解:
Shuffle Map Task會按照key相對應的Partition ID進行Sort,其中屬於同一個Partition的key不會Sort。由於對於不須要Sort的操做來講,這個Sort是負收益的;要知道以前Spark剛開始使用Hash Based的Shuffle而不是Sort Based就是爲了不Hadoop Map Reduce對於全部計算都會Sort的性能損耗。對於那些須要Sort的運算,好比sortByKey,這個Sort在Spark 1.2.0裏仍是由Reducer完成的。
Shuffle是Spark Core比較複雜的模塊,它也是很是影響性能的操做之一。所以,在這裏整理了會影響Shuffle性能的各項配置。
Spark 1.2.0官方版本支持兩種方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0以前僅支持Hash Based Shuffle。Spark 1.1引入了Sort Based Shuffle。Spark 1.2的默認Shuffle機制從Hash變成了Sort。若是須要Hash Based Shuffle,只需將spark.shuffle.manager設置成「hash」便可。
①進入spark安裝目錄的conf目錄
②cp spark-defaults.conf.template spark-defaults.conf
③spark.shuffle.manager=hash
當產生的臨時文件不是不少時,性能可能會比sort shuffle要好。
若是對性能有比較苛刻的要求,那麼就要理解這兩種不一樣的Shuffle機制的原理,結合具體的應用場景進行選擇。
對於不須要進行排序且Shuffle產生的文件數量不是特別多時,Hash Based Shuffle多是更好的選擇;由於Sort Based Shuffle會按照Reducer的Partition進行排序。
而Sort Based Shuffle的優點就在於可擴展性,它的出現實際上很大程度上是解決Hash Based Shuffle的可擴展性的問題。因爲Sort Based Shuffle還在不斷地演進中,所以它的性能會獲得不斷改善。
對於選擇哪一種Shuffle,若是性能要求苛刻,最好仍是經過實際測試後再作決定。不過選擇默認的Sort,能夠知足大部分的場景須要。
這個參數的默認值是true,用於指定Shuffle過程當中若是內存中的數據超過閾值(參考spark.shuffle.memoryFraction的設置)時是否須要將部分數據臨時寫入外部存儲。若是設置爲false,那麼這個過程就會一直使用內存,會有內存溢出的風險。所以只有在肯定內存足夠使用時,才能夠將這個選項設置爲false。
在啓用spark.shuffle.spill的狀況下,spark.shuffle.memoryFraction決定了當Shuffle過程當中使用的內存達到總內存多少比例的時候開始spill。在Spark 1.2.0裏,這個值是0.2。經過這個參數能夠設置Shuffle過程佔用內存的大小,它直接影響了寫入到外部存儲的頻率和垃圾回收的頻率。能夠適當調大此值,能夠減小磁盤I/O次數。
這個配置的默認值是200,用於設置在Reducer的Partition數目少於多少的時候,Sort Based Shuffle內部不使用歸併排序的方式處理數據,而是直接將每一個Partition寫入單獨的文件。這個方式和Hash Based的方式相似,區別就是在最後這些文件仍是會合併成一個單獨的文件,並經過一個Index索引文件來標記不一樣Partition的位置信息。
這個能夠看做Sort Based Shuffle在Shuffle量比較小的時候對於Hash Based Shuffle的一種折中。固然了它和Hash Based Shuffle同樣,也存在同時打開文件過多致使內存佔用增長的問題。所以若是GC比較嚴重或者內存比較緊張,能夠適當下降這個值。
在Spark 1.2.0中這個配置的默認值是netty,而在以前的版本中是nio。它主要是用於在各個Executor之間傳輸Shuffle數據。netty的實現更加簡潔,但實際上用戶不用太關心這個選項。除非有特殊需求,不然採用默認配置便可。
這個配置的默認值是false。主要是爲了解決在Hash Based Shuffle過程當中產生過多文件的問題。若是配置選項爲true,那麼對於同一個Core上運行的Shuffle Map Task不會產生一個新的Shuffle文件而是重用原來的。
可是consolidateFiles的機制在Spark 0.8.1就引入了,到Spark 1.2.0仍是沒有穩定下來。從源碼實現的角度看,實現源碼是很是簡單的,可是因爲涉及本地文件系統等限制,這個策略可能會帶來各類各樣的問題。通常不建議開啓。
這兩個參數的默認配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用來設置Shuffle過程當中是否對Shuffle數據進行壓縮。其中,前者針對最終寫入本地文件系統的輸出文件;後者針對在處理過程須要寫入到外部存儲的中間數據,即針對最終的shuffle輸出文件。
須要評估壓縮解壓時間帶來的時間消耗和由於數據壓縮帶來的時間節省。若是網絡成爲瓶頸,好比集羣廣泛使用的是千兆網絡,那麼將這個選項設置爲true可能更合理;若是計算是CPU密集型的,那麼將這個選項設置爲false可能更好。
若是設置爲true,表明處理的中間結果在spill到本地硬盤時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。所以要綜合考慮CPU因爲引入壓縮、解壓的消耗時間和Disk IO由於壓縮帶來的節省時間的比較。在Disk IO成爲瓶頸的場景下,設置爲true可能比較合適;若是本地硬盤是SSD,那麼設置爲false可能比較合適。
這個參數用於限制一個Reducer Task向其餘的Executor請求Shuffle數據時所佔用的最大內存數,尤爲是若是網卡是千兆和千兆如下的網卡時。默認值是 設置這個值須要綜合考慮網卡帶寬和內存。
上一篇:Spark的架構
下一篇:SparkSQL簡介及入門