【Spark-core學習之八】 SparkShuffle & Spark內存管理
環境
虛擬機:VMware 10
Linux版本:CentOS-6.5-x86_64
客戶端:Xshell4
FTP:Xftp4
jdk1.8
scala-2.10.4(依賴jdk1.8)
spark-1.6html
1、SparkShuffle
1. SparkShuffle概念
reduceByKey會將上一個RDD中的每個key對應的全部value聚合成一個value,而後生成一個新的RDD,元素類型是<key,value>對的形式,這樣每個key對應一個聚合起來的value。問題:聚合以前,每個key對應的value不必定都是在一個partition中,也不太可能在同一個節點上,由於RDD是分佈式的彈性的數據集,RDD的partition極有可能分佈在各個節點上。shell
如何聚合?兩個過程:
–Shuffle Write:上一個stage的每一個map task就必須保證將本身處理的當前分區的數據相同的key寫入一個分區文件中,不一樣的key寫入多個不一樣的分區文件中。緩存
–Shuffle Read:reduce task就會從上一個stage的全部task所在的機器上尋找屬於己的那些分區文件,這樣就能夠保證每個key所對應的value都會匯聚到同一個節點上去處理和聚合。網絡
Spark中有兩種Shuffle類型,HashShuffle和SortShuffle,Spark1.2以前是HashShuffle默認的分區器是HashPartitioner,Spark1.2引入SortShuffle默認的分區器是RangePartitioner。數據結構
2. HashShuffle
1)普通機制
1.1)普通機制示意圖
1.2)執行流程
a) 每個map task將不一樣結果寫到不一樣的buffer中,每一個buffer的大小爲32K。buffer起到數據緩存的做用。
b) 每一個buffer文件最後對應一個磁盤小文件。
c) reduce task來拉取對應的磁盤小文件。架構
1.3)總結
①map task的計算結果會根據分區器(默認是hashPartitioner)來決定寫入到哪個磁盤小文件中去。ReduceTask會去Map端拉取相應的磁盤小文件。
②產生的磁盤小文件的個數:M(map task的個數)*R(reduce task的個數)分佈式
1.4)存在的問題
產生的磁盤小文件過多,會致使如下問題:
a) 在Shuffle Write過程當中會產生不少寫磁盤小文件的對象。
b) 在Shuffle Read過程當中會產生不少讀取磁盤小文件的對象。
c) 在JVM堆內存中對象過多會形成頻繁的gc,gc還沒法解決運行所須要的內存 的話,就會OOM。
d)在數據傳輸過程當中會有頻繁的網絡通訊,頻繁的網絡通訊出現通訊故障的可能性大大增長,一旦網絡通訊出現了故障會致使shuffle file cannot find 因爲這個錯誤致使的task失敗,TaskScheduler不負責重試,由DAGScheduler負責重試Stage。ide
2)合併機制(優化)
2.1)合併機制示意圖
2.2)總結:產生磁盤小文件的個數:C(core的個數)*R(reduce的個數)性能
三、SortShuffle
1)普通機制
1.1)普通機制示意圖
1.2)執行流程
a)map task 的計算結果會寫入到一個內存數據結構裏面,內存數據結構默認是5M
b)在shuffle的時候會有一個定時器,不按期的去估算這個內存結構的大小,當內存結構中的數據超過5M時,好比如今內存結構中的數據爲5.01M,那麼他會申請5.01*2-5=5.02M內存給內存數據結構。
c)若是申請成功不會進行溢寫,若是申請不成功,這時候會發生溢寫磁盤。
d)在溢寫以前內存結構中的數據會進行排序分區
e)而後開始溢寫磁盤,寫磁盤是以batch的形式去寫,一個batch是1萬條數據,
f)map task執行完成後,會將這些磁盤小文件合併成一個大的磁盤文件,同時生成一個索引文件。
g)reduce task去map端拉取數據的時候,首先解析索引文件,根據索引文件再去拉取對應的數據。學習
1.3)總結
產生磁盤小文件的個數:2*M(map task的個數)
2)bypass機制(適用於不須要排序的場景)
2.1)bypass機制示意圖
2.2)總結
①bypass運行機制的觸發條件以下:
shuffle reduce task的數量小於spark.shuffle.sort.bypassMergeThreshold的參數值。這個值默認是200。
②產生的磁盤小文件爲:2*M(map task的個數)
四、Shuffle文件尋址
1)MapOutputTracker
MapOutputTracker是Spark架構中的一個模塊,是一個主從架構。管理磁盤小文件的地址。
MapOutputTrackerMaster是主對象,存在於Driver中。MapOutputTrackerWorker是從對象,存在於Excutor中。
2)BlockManager
BlockManager塊管理者,是Spark架構中的一個模塊,也是一個主從架構。
BlockManagerMaster,主對象,存在於Driver中。BlockManagerMaster會在集羣中有用到廣播變量和緩存數據或者刪除緩存數據的時候,通知BlockManagerSlave傳輸或者刪除數據。
BlockManagerWorker,從對象,存在於Excutor中。
BlockManagerWorker會與BlockManagerWorker之間通訊。
不管在Driver端的BlockManager仍是在Excutor端的BlockManager都含有四個對象:
①DiskStore:負責磁盤的管理。
②MemoryStore:負責內存的管理。
③ConnectionManager:負責鏈接其餘的BlockManagerWorker。
④BlockTransferService:負責數據的傳輸。
3)Shuffle文件尋址圖
4)Shuffle文件尋址流程
a)當map task執行完成後,會將task的執行狀況和磁盤小文件的地址封裝到MpStatus對象中,經過MapOutputTrackerWorker對象向Driver中的MapOutputTrackerMaster彙報。
b)在全部的map task執行完畢後,Driver中就掌握了全部的磁盤小文件的地址。
c)在reduce task執行以前,會經過Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster獲取磁盤小文件的地址。
d) 獲取到磁盤小文件的地址後,會經過BlockManager中的ConnectionManager鏈接數據所在節點上的ConnectionManager,而後經過BlockTransferService進行數據的傳輸。
e)BlockTransferService默認啓動5個task去節點拉取數據。默認狀況下,5個task拉取數據量不能超過48M。
2、Shuffle調優
一、SparkShuffle調優配置項如何使用?
1)在代碼中,不推薦使用硬編碼。
new SparkConf().set(「spark.shuffle.file.buffer」,」64」)
2)在提交spark任務的時候,推薦使用。
spark-submit --conf spark.shuffle.file.buffer=64 –conf ….
3)在conf下的spark-default.conf配置文件中,不推薦,由於是寫死後全部應用程序都要用。
二、調優參數
(1)spark.shuffle.file.buffer 默認值:32k 參數說明:該參數用於設置shuffle write task的BufferedOutputStream的buffer緩衝大小。將數據寫到磁盤文件以前,會先寫入buffer緩衝中,待緩衝寫滿以後,纔會溢寫到磁盤。 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比64k),從而減小shuffle write過程當中溢寫磁盤文件的次數,也就能夠減小磁盤IO次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。 (2)spark.reducer.maxSizeInFlight 默認值:48m 參數說明:該參數用於設置shuffle read task的buffer緩衝大小,而這個buffer緩衝決定了每次可以拉取多少數據。 調優建議:若是做業可用的內存資源較爲充足的話,能夠適當增長這個參數的大小(好比96m),從而減小拉取數據的次數,也就能夠減小網絡傳輸的次數,進而提高性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提高。 (3)spark.shuffle.io.maxRetries 默認值:3 參數說明:shuffle read task從shuffle write task所在節點拉取屬於本身的數據時,若是由於網絡異常致使拉取失敗,是會自動進行重試的。該參數就表明了能夠重試的最大次數。若是在指定次數以內拉取仍是沒有成功,就可能會致使做業執行失敗。 調優建議:對於那些包含了特別耗時的shuffle操做的做業,建議增長重試最大次數(好比60次),以免因爲JVM的fullgc或者網絡不穩定等因素致使的數據拉取失敗。在實踐中發現,對於針對超大數據量(數十億~上百億)的shuffle過程,調節該參數能夠大幅度提高穩定性。 shuffle file not find taskScheduler不負責重試task,由DAGScheduler負責重試stage (4)spark.shuffle.io.retryWait 默認值:5s 參數說明:具體解釋同上,該參數表明了每次重試拉取數據的等待間隔,默認是5s。 調優建議:建議加大間隔時長(好比60s),以增長shuffle操做的穩定性。 (5)spark.shuffle.memoryFraction 默認值:0.2 參數說明:該參數表明了Executor內存中,分配給shuffle read task進行聚合操做的內存比例,默認是20%。 調優建議:若是內存充足,並且不多使用持久化操做,建議調高這個比例,給shuffle read的聚合操做更多內存,以免因爲內存不足致使聚合過程當中頻繁讀寫磁盤。在實踐中發現,合理調節該參數能夠將性能提高10%左右。 (6)spark.shuffle.manager 默認值:sort|hash 參數說明:該參數用於設置ShuffleManager的類型。Spark 1.5之後,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默認選項,可是Spark 1.2以及以後的版本默認都是SortShuffleManager了。tungsten-sort與sort相似,可是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。 調優建議:因爲SortShuffleManager默認會對數據進行排序,所以若是你的業務邏輯中須要該排序機制的話,則使用默認的SortShuffleManager就能夠;而若是你的業務邏輯不須要對數據進行排序,那麼建議參考後面的幾個參數調優,經過bypass機制或優化的HashShuffleManager來避免排序操做,同時提供較好的磁盤讀寫性能。這裏要注意的是,tungsten-sort要慎用,由於以前發現了一些相應的bug。 (7)spark.shuffle.sort.bypassMergeThreshold----針對SortShuffle 默認值:200 參數說明:當ShuffleManager爲SortShuffleManager時,若是shuffle read task的數量小於這個閾值(默認是200),則shuffle write過程當中不會進行排序操做,而是直接按照未經優化的HashShuffleManager的方式去寫數據,可是最後會將每一個task產生的全部臨時磁盤文件都合併成一個文件,並會建立單獨的索引文件。 調優建議:當你使用SortShuffleManager時,若是的確不須要排序操做,那麼建議將這個參數調大一些,大於shuffle read task的數量。那麼此時就會自動啓用bypass機制,map-side就不會進行排序了,減小了排序的性能開銷。可是這種方式下,依然會產生大量的磁盤文件,所以shuffle write性能有待提升。 (8)spark.shuffle.consolidateFiles----針對HashShuffle 默認值:false 參數說明:若是使用HashShuffleManager,該參數有效。若是設置爲true,那麼就會開啓consolidate機制,會大幅度合併shuffle write的輸出文件,對於shuffle read task數量特別多的狀況下,這種方法能夠極大地減小磁盤IO開銷,提高性能。 調優建議:若是的確不須要SortShuffleManager的排序機制,那麼除了使用bypass機制,還能夠嘗試將spark.shffle.manager參數手動指定爲hash,使用HashShuffleManager,同時開啓consolidate機制。在實踐中嘗試過,發現其性能比開啓了bypass機制的SortShuffleManager要高出10%~30%。
3、Spark內存管理
Spark執行應用程序時,Spark集羣會啓動Driver和Executor兩種JVM進程,Driver負責建立SparkContext上下文,提交任務,task的分發等。Executor負責task的計算任務,並將結果返回給Driver。同時須要爲須要持久化的RDD提供儲存。Driver端的內存管理比較簡單,這裏所說的Spark內存管理針對Executor端的內存管理。
Spark內存管理分爲靜態內存管理和統一內存管理,Spark1.6以前使用的是靜態內存管理,Spark1.6以後引入了統一內存管理。
一、靜態內存管理
(1)靜態內存管理中存儲內存、執行內存和其餘內存的大小在Spark應用程序運行期間均爲固定的,但用戶能夠應用程序啓動前進行配置。Spark1.6以上版本默認使用的是統一內存管理,能夠經過參數spark.memory.useLegacyMode設置爲true(默認爲false)使用靜態內存管理。
(2)靜態內存管理分佈圖
二、統一內存管理
(1)統一內存管理與靜態內存管理的區別在於儲存內存和執行內存共享同一塊空間,能夠互相借用對方的空間。
(2)統一內存管理分佈圖
三、reduce 中OOM如何處理?
1) 減小每次拉取的數據量
2) 提升shuffle聚合的內存比例
3) 提升Excutor的總內存
參考: