Spark 性能調優參數總結html
一、Shuffle 相關node
Shuffle 操做大概是對Spark 性能影響最大的步驟之一(由於可能涉及到排序,磁盤IO,網算法
絡IO 等衆多CPU 或IO 密集的操做),這也是爲何在Spark 1.1 的代碼中對整個Shuffleapache
框架代碼進行了重構,將Shuffle 相關讀寫操做抽象封裝到Pluggable 的Shuffle Manager網絡
中,便於試驗和實現不一樣的Shuffle 功能模塊。例如爲了解決Hash Based 的Shuffle Manager數據結構
在文件讀寫效率方面的問題而實現的Sort Base 的Shuffle Manager。併發
①spark.shuffle.managerapp
用來配置所使用的Shuffle Manager,目前可選的Shuffle Manager 包括默認的框架
org.apache.spark.shuffle.sort.HashShuffleManager(配置參數值爲hash)和新的oop
org.apache.spark.shuffle.sort.SortShuffleManager(配置參數值爲sort)。
這兩個ShuffleManager 如何選擇呢,首先須要瞭解他們在實現方式上的區別。
HashShuffleManager,故名思義也就是在Shuffle 的過程當中寫數據時不作排序操做,只是將
數據根據Hash 的結果,將各個Reduce 分區的數據寫到各自的磁盤文件中。帶來的問題就
是若是Reduce 分區的數量比較大的話,將會產生大量的磁盤文件。若是文件數量特別巨大,
對文件讀寫的性能會帶來比較大的影響,此外因爲同時打開的文件句柄數量衆多,序列化,
以及壓縮等操做須要分配的臨時內存空間也可能會迅速膨脹到沒法接受的地步,對內存的使
用和GC 帶來很大的壓力,在Executor 內存比較小的狀況下尤其突出,例如Spark on Yarn
模式。
SortShuffleManager,是1.1 版本以後實現的一個試驗性(也就是一些功能和接口還在開發
演變中)的ShuffleManager,它在寫入分區數據的時候,首先會根據實際狀況對數據採用
不一樣的方式進行排序操做,底線是至少按照Reduce 分區Partition 進行排序,這樣來至於
同一個Map 任務Shuffle 到不一樣的Reduce 分區中去的全部數據均可以寫入到同一個外部磁
盤文件中去,用簡單的Offset 標誌不一樣Reduce 分區的數據在這個文件中的偏移量。這樣
一個Map 任務就只須要生成一個shuffle 文件,從而避免了上述HashShuffleManager 可能
遇到的文件數量巨大的問題
二者的性能比較,取決於內存,排序,文件操做等因素的綜合影響。
對於不須要進行排序的Shuffle 操做來講,如repartition 等,若是文件數量不是特別巨大,
HashShuffleManager 面臨的內存問題不大,而SortShuffleManager 須要額外的根據
Partition 進行排序,顯然HashShuffleManager 的效率會更高。
而對於原本就須要在Map 端進行排序的Shuffle 操做來講,如ReduceByKey 等,使用
HashShuffleManager 雖然在寫數據時不排序,但在其它的步驟中仍然須要排序,而
SortShuffleManager 則能夠將寫數據和排序兩個工做合併在一塊兒執行,所以即便不考慮
HashShuffleManager 的內存使用問題,SortShuffleManager 依舊可能更快。
②spark.shuffle.sort.bypassMergeThreshold
這個參數僅適用於SortShuffleManager,如前所述,SortShuffleManager 在處理不須要排
序的Shuffle 操做時,因爲排序帶來性能的降低。這個參數決定了在這種狀況下,當Reduce
分區的數量小於多少的時候,在SortShuffleManager 內部不使用Merge Sort 的方式處理數
據,而是與Hash Shuffle 相似,直接將分區文件寫入單獨的文件,不一樣的是,在最後一步
仍是會將這些文件合併成一個單獨的文件。這樣經過去除Sort 步驟來加快處理速度,代價
是須要併發打開多個文件,因此內存消耗量增長,本質上是相對HashShuffleMananger 一
個折衷方案。這個參數的默認值是200 個分區,若是內存GC 問題嚴重,能夠下降這個值。
spark.shuffle.consolidateFiles
這個配置參數僅適用於HashShuffleMananger 的實現,一樣是爲了解決生成過多文件的問
題,採用的方式是在不一樣批次運行的Map 任務之間重用Shuffle 輸出文件,也就是說合並
的是不一樣批次的Map 任務的輸出數據,可是每一個Map 任務所須要的文件仍是取決於Reduce
分區的數量,所以,它並不減小同時打開的輸出文件的數量,所以對內存使用量的減小並沒
有幫助。只是HashShuffleManager 裏的一個折中的解決方案。
須要注意的是,這部分的代碼實現儘管原理上說很簡單,可是涉及到底層具體的文件系統的
實現和限制等因素,例如在併發訪問等方面,須要處理的細節不少,所以一直存在着這樣那
樣的bug 或者問題,致使在例如EXT3 上使用時,特定狀況下性能反而可能降低,所以從
Spark 0.8 的代碼開始,一直尚未被標誌爲Stable,不是默認採用的方式。此外由於並不
減小同時打開的輸出文件的數量,所以對性能具體能帶來多大的改善也取決於具體的文件數
量的狀況。因此即便你面臨着Shuffle 文件數量巨大的問題,這個配置參數是否使用,在什
麼版本中可使用,也最好仍是實際測試之後再決定。
③spark.shuffle.spill
shuffle 的過程當中,若是涉及到排序,聚合等操做,勢必會須要在內存中維護一些數據結構,
進而佔用額外的內存。若是內存不夠用怎麼辦,那只有兩條路能夠走,一就是out of memory
出錯了,二就是將部分數據臨時寫到外部存儲設備中去,最後再合併到最終的Shuffle 輸出
文件中去。
這裏spark.shuffle.spill 決定是否Spill 到外部存儲設備(默認打開),若是你的內存足夠使
用,或者數據集足夠小,固然也就不須要Spill,畢竟Spill 帶來了額外的磁盤操做。
spark.shuffle.memoryFraction / spark.shuffle.safetyFraction
在啓用Spill 的狀況下,spark.shuffle.memoryFraction(1.1 後默認爲0.2)決定了當Shuffle
過程當中使用的內存達到總內存多少比例的時候開始Spill。
經過spark.shuffle.memoryFraction 能夠調整Spill 的觸發條件,即Shuffle 佔用內存的大小,
進而調整Spill 的頻率和GC 的行爲。總的來講,若是Spill 太過頻繁,能夠適當增長
spark.shuffle.memoryFraction 的大小,增長用於Shuffle 的內存,減小Spill 的次數。固然
這樣一來爲了不內存溢出,對應的可能須要減小RDD cache 佔用的內存,即減少
spark.storage.memoryFraction 的值,這樣RDD cache 的容量減小,有可能帶來性能影響,
所以須要綜合考慮。
因爲Shuffle 數據的大小是估算出來的,一來爲了下降開銷,並非每增長一個數據項都完
整的估算一次,二來估算也會有偏差,因此實際暫用的內存可能比估算值要大,這裏
spark.shuffle.safetyFraction(默認爲0.8)用來做爲一個保險係數,下降實際Shuffle 使用
的內存閥值,增長必定的緩衝,下降實際內存佔用超過用戶配置值的機率。
④spark.shuffle.spill.compress / spark.shuffle.compress
這兩個配置參數都是用來設置Shuffle 過程當中是否使用壓縮算法對Shuffle 數據進行壓縮,
前者針對Spill 的中間數據,後者針對最終的shuffle 輸出文件,默認都是True
理論上說,spark.shuffle.compress 設置爲True 一般都是合理的,由於若是使用千兆如下
的網卡,網絡帶寬每每最容易成爲瓶頸。此外,目前的Spark 任務調度實現中,以Shuffle
劃分Stage,下一個Stage 的任務是要等待上一個Stage 的任務所有完成之後才能開始執
行,因此shuffle 數據的傳輸和CPU 計算任務之間一般不會重疊,這樣Shuffle 數據傳輸量
的大小和所需的時間就直接影響到了整個任務的完成速度。可是壓縮也是要消耗大量的
CPU 資源的,因此打開壓縮選項會增長Map 任務的執行時間,所以若是在CPU 負載的影
響遠大於磁盤和網絡帶寬的影響的場合下,也可能將spark.shuffle.compress 設置爲False
纔是最佳的方案
對於spark.shuffle.spill.compress 而言,狀況相似,可是spill 數據不會被髮送到網絡中,僅
僅是臨時寫入本地磁盤,並且在一個任務中同時須要執行壓縮和解壓縮兩個步驟,因此對
CPU 負載的影響會更大一些,而磁盤帶寬(若是標配12HDD 的話)可能每每不會成爲Spark
應用的主要問題,因此這個參數相對而言,或許更有機會須要設置爲False。
總之,Shuffle 過程當中數據是否應該壓縮,取決於CPU/DISK/NETWORK 的實際能力和負載,
應該綜合考慮。
二、Storage 相關配置參數
spark.local.dir
Spark 用於寫中間數據,如RDD Cache,Shuffle,Spill 等數據的位置,那麼有什麼能夠注
意的呢。
首先,最基本的固然是咱們能夠配置多個路徑(用逗號分隔)到多個磁盤上增長總體IO 帶
寬。
其次,目前的實現中,Spark 是經過對文件名採用hash 算法分佈到多個路徑下的目錄中去,
若是你的存儲設備有快有慢,好比SSD+HDD 混合使用,那麼你能夠經過在SSD 上配置更
多的目錄路徑來增大它被Spark 使用的比例,從而更好地利用SSD 的IO 帶寬能力。固然
這只是一種變通的方法,終極解決方案仍是應該像目前HDFS 的實現方向同樣,讓Spark
可以感知具體的存儲設備類型,針對性的使用。
須要注意的是,在Spark 1.0 之後,SPARK_LOCAL_DIRS (Standalone, Mesos) or
LOCAL_DIRS (YARN)參數會覆蓋這個配置。好比Spark On YARN 的時候,Spark Executor
的本地路徑依賴於Yarn 的配置,而不取決於這個參數。
spark.executor.memory
Executor 內存的大小,和性能自己固然並無直接的關係,可是幾乎全部運行時性能相關
的內容都或多或少間接和內存大小相關。這個參數最終會被設置到Executor 的JVM 的heap
尺寸上,對應的就是Xmx 和Xms 的值
理論上Executor 內存固然是多多益善,可是實際受機器配置,以及運行環境,資源共享,
JVM GC 效率等因素的影響,仍是有可能須要爲它設置一個合理的大小。多大算合理,要看
實際狀況
Executor 的內存基本上是Executor 內部全部任務共享的,而每一個Executor 上能夠支持的
任務的數量取決於Executor 所管理的CPU Core 資源的多少,所以你須要瞭解每一個任務的
數據規模的大小,從而推算出每一個Executor 大體須要多少內存便可知足基本的需求。
如何知道每一個任務所需內存的大小呢,這個很難統一的衡量,由於除了數據集自己的開銷,
還包括算法所需各類臨時內存空間的使用,而根據具體的代碼算法等不一樣,臨時內存空間的
開銷也不一樣。可是數據集自己的大小,對最終所需內存的大小仍是有必定的參考意義的。
一般來講每一個分區的數據集在內存中的大小,多是其在磁盤上源數據大小的若干倍(不考
慮源數據壓縮,Java 對象相對於原始裸數據也還要算上用於管理數據的數據結構的額外開
銷),須要準確的知道大小的話,能夠將RDD cache 在內存中,從BlockManager 的Log
輸出能夠看到每一個Cache 分區的大小(其實也是估算出來的,並不徹底準確)
如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134 (size: 495.3 MB)
反過來講,若是你的Executor 的數量和內存大小受機器物理配置影響相對固定,那麼你就
須要合理規劃每一個分區任務的數據規模,例如採用更多的分區,用增長任務數量(進而須要
更多的批次來運算全部的任務)的方式來減少每一個任務所需處理的數據大小。
spark.storage.memoryFraction
如前面所說spark.executor.memory 決定了每一個Executor 可用內存的大小,而
spark.storage.memoryFraction 則決定了在這部份內存中有多少能夠用於Memory Store 管
理RDD Cache 數據,剩下的內存用來保證任務運行時各類其它內存空間的須要。
spark.executor.memoryFraction 默認值爲0.6,官方文檔建議這個比值不要超過JVM Old
Gen 區域的比值。這也很容易理解,由於RDD Cache 數據一般都是長期駐留內存的,理論
上也就是說最終會被轉移到Old Gen 區域(若是該RDD 尚未被刪除的話),若是這部分
數據容許的尺寸太大,勢必把Old Gen 區域佔滿,形成頻繁的FULL GC。
如何調整這個比值,取決於你的應用對數據的使用模式和數據的規模,粗略的來講,若是頻
繁發生Full GC,能夠考慮下降這個比值,這樣RDD Cache 可用的內存空間減小(剩下的
部分Cache 數據就須要經過Disk Store 寫到磁盤上了),會帶來必定的性能損失,可是騰
出更多的內存空間用於執行任務,減小Full GC 發生的次數,反而可能改善程序運行的總體
性能
spark.streaming.blockInterval
這個參數用來設置Spark Streaming 裏Stream Receiver 生成Block 的時間間隔,默認爲
200ms。具體的行爲表現是具體的Receiver 所接收的數據,每隔這裏設定的時間間隔,就
從Buffer 中生成一個StreamBlock 放進隊列,等待進一步被存儲到BlockManager 中供後
續計算過程使用。理論上來講,爲了每一個Streaming Batch 間隔裏的數據是均勻的,這個
時間間隔固然應該能被Batch 的間隔時間長度所整除。整體來講,若是內存大小夠用,
Streaming 的數據來得及處理,這個blockInterval 時間間隔的影響不大,固然,若是數據
Cache Level 是Memory+Ser,即作了序列化處理,那麼BlockInterval 的大小會影響序列化
後數據塊的大小,對於Java 的GC 的行爲會有一些影響。
此外spark.streaming.blockQueueSize 決定了在StreamBlock 被存儲到BlockMananger 之
前,隊列中最多能夠容納多少個StreamBlock。默認爲10,由於這個隊列Poll 的時間間隔
是100ms,因此若是CPU 不是特別繁忙的話,基本上應該沒有問題。
壓縮和序列化相關
spark.serializer
默認爲org.apache.spark.serializer.JavaSerializer, 可選
org.apache.spark.serializer.KryoSerializer, 實際上只要是org.apache.spark.serializer 的子
類就能夠了,不過若是隻是應用,大概你不會本身去實現一個的。
序列化對於spark 應用的性能來講,仍是有很大影響的,在特定的數據格式的狀況
下,KryoSerializer 的性能能夠達到JavaSerializer 的10 倍以上,固然放到整個Spark 程序中
來考量,比重就沒有那麼大了,可是以Wordcount 爲例,一般也很容易達到30%以上的性能
提高。而對於一些Int 之類的基本類型數據,性能的提高就幾乎能夠忽略了。KryoSerializer
依賴Twitter 的Chill 庫來實現,相對於JavaSerializer,主要的問題在於不是全部的Java
Serializable 對象都能支持。
須要注意的是,這裏可配的Serializer 針對的對象是Shuffle 數據,以及RDD Cache 等場
合,而Spark Task 的序列化是經過spark.closure.serializer 來配置,可是目前只支持
JavaSerializer,因此等於無法配置啦。
更多Kryo 序列化相關優化配置,能夠參
考http://spark.apache.org/docs/latest/tuning.html#data-serialization 一節
spark.rdd.compress
這個參數決定了RDD Cache 的過程當中,RDD 數據在序列化以後是否進一步進行壓縮再儲
存到內存或磁盤上。固然是爲了進一步減少Cache 數據的尺寸,對於Cache 在磁盤上而言,
絕對大小大概沒有太大關係,主要是考慮Disk 的IO 帶寬。而對於Cache 在內存中,那主
要就是考慮尺寸的影響,是否可以Cache 更多的數據,是否能減少Cache 數據對GC 形成
的壓力等。
這二者,前者一般不會是主要問題,尤爲是在RDD Cache 自己的目的就是追求速度,減小
重算步驟,用IO 換CPU 的狀況下。然後者,GC 問題固然是須要考量的,數據量小,佔用
空間少,GC 的問題大概會減輕,可是是否真的須要走到RDD Cache 壓縮這一步,或許用
其它方式來解決可能更加有效。
因此這個值默認是關閉的,可是若是在磁盤IO 的確成爲問題或者GC 問題真的沒有其它更
好的解決辦法的時候,能夠考慮啓用RDD 壓縮。
spark.broadcast.compress
是否對Broadcast 的數據進行壓縮,默認值爲True。
Broadcast 機制是用來減小運行每一個Task 時,所須要發送給TASK 的RDD 所使用到的相
關數據的尺寸,一個Executor 只須要在第一個Task 啓動時,得到一份Broadcast 數據,
以後的Task 都從本地的BlockManager 中獲取相關數據。在1.1 版本之後的代碼中,RDD
自己也改成以Broadcast 的形式發送給Executor(以前的實現RDD 自己是隨每一個任務發送
的),所以基本上不太須要顯式的決定哪些數據須要broadcast 了。
由於Broadcast 的數據須要經過網絡發送,而在Executor 端又須要存儲在本地
BlockMananger 中,加上最新的實現,默認RDD 經過Boradcast 機制發送,所以大大增長
了Broadcast 變量的比重,因此經過壓縮減少尺寸,來減小網絡傳輸開銷和內存佔用,一般
都是有利於提升總體性能的。
什麼狀況可能不壓縮更好呢,大體上我的以爲一樣仍是在網絡帶寬和內存不是問題的時候,
若是Driver 端CPU 資源很成問題(畢竟壓縮的動做基本都在Driver 端執行),那或許有調
整的必要。
spark.io.compression.codec
RDD Cache 和Shuffle 數據壓縮所採用的算法Codec,默認值曾經是使用LZF 做爲默認
Codec,最近由於LZF 的內存開銷的問題,默認的Codec 已經改成Snappy。
LZF 和Snappy 相比較,前者壓縮率比較高(固然要看具體數據內容了,一般要高20%左
右),可是除了內存問題之外,CPU 代價也大一些(大概也差20%~50%?)
在用於Shuffle 數據的場合下,內存方面,應該主要是在使用HashShuffleManager 的時候
有可能成爲問題,由於若是Reduce 分區數量巨大,須要同時打開大量的壓縮數據流用於寫
文件,進而在Codec 方面須要大量的buffer。可是若是使用SortShuffleManager,因爲shuffle
文件數量大大減小,不會產生大量的壓縮數據流,因此內存開銷大概不會成爲主要問題。
剩下的就是CPU 和壓縮率的權衡取捨,和前面同樣,取決於CPU/網絡/磁盤的能力和負載,
我的認爲CPU 一般更容易成爲瓶頸。因此要調整性能,要不不壓縮,要不使用Snappy 可
能性大一些?
對於RDD Cache 的場合來講,絕大多數場合都是內存操做或者本地IO,因此CPU 負載的
問題可能比IO 的問題更加突出,這也是爲何spark.rdd.compress 自己默認爲不壓縮,
若是要壓縮,大概也是Snappy 合適一些?
schedule 調度相關
大概會是你針對本身的集羣第一步就會配置的參數,這裏多少就其內部機制作一些解釋。
三、spark.cores.max
一個集羣最重要的參數之一,固然就是CPU 計算資源的數量。spark.cores.max 這個參數
決定了在Standalone 和Mesos 模式下,一個Spark 應用程序所能申請的CPU Core 的數
量。若是你沒有併發跑多個Spark 應用程序的需求,那麼能夠不須要設置這個參數,默認
會使用spark.deploy.defaultCores 的值(而spark.deploy.defaultCores 的值默認爲Int.Max,
也就是不限制的意思)從而應用程序可使用全部當前能夠得到的CPU 資源。
針對這個參數須要注意的是,這個參數對Yarn 模式不起做用,YARN 模式下,資源由Yarn
統一調度管理,一個應用啓動時所申請的CPU 資源的數量由另外兩個直接配置Executor
的數量和每一個Executor 中core 數量的參數決定。
SPARK_EXECUTOR_INSTANCES/SPARK_EXECUTOR_CORES
--num-executors / --executor-cores
(歷史緣由形成,不一樣運行模式下的一些啓動參數我的認爲還有待進一步整合)
此外,在Standalone 模式等後臺分配CPU 資源時,目前的實現中,在spark.cores.max
容許的範圍內,基本上是優先從每一個Worker 中申請所能獲得的最大數量的CPU core 給每
個Executor,所以若是人工限制了所申請的Max Core 的數量小於Standalone 和Mesos
模式所管理的CPU 數量,可能發生應用只運行在集羣中部分節點上的狀況(由於部分節點
所能提供的最大CPU 資源數量已經知足應用的要求),而不是平均分佈在集羣中。一般這
不會是太大的問題,可是若是涉及數據本地性的場合,有可能就會帶來必定的必須進行遠程
數據讀取的狀況發生。理論上,這個問題能夠經過兩種途徑解決:一是Standalone 和Mesos
的資源管理模塊自動根據節點資源狀況,均勻分配和啓動Executor,二是和Yarn 模式同樣,
容許用戶指定和限制每一個Executor 的Core 的數量。
spark.task.cpus
這個參數在字面上的意思就是分配給每一個任務的CPU 的數量,默認爲1。實際上,這個參
數並不能真的控制每一個任務實際運行時所使用的CPU 的數量,好比你能夠經過在任務內部
建立額外的工做線程來使用更多的CPU(至少目前爲止,未來任務的執行環境是否能經過
LXC 等技術來控制還很差說)。它所發揮的做用,只是在做業調度時,每分配出一個任務
時,對已使用的CPU 資源進行計數。也就是說只是理論上用來統計資源的使用狀況,便於
安排調度。所以,若是你指望經過修改這個參數來加快任務的運行,那仍是趕忙換個思路吧。
這個參數的意義,我的以爲仍是在你真的在任務內部本身經過任何手段,佔用了更多的CPU
資源時,讓調度行爲更加準確的一個輔助手段。
spark.scheduler.mode
這個參數決定了單個Spark 應用內部調度的時候使用FIFO 模式仍是Fair 模式。是的,你
沒有看錯,這個參數只管理一個Spark 應用內部的多個沒有依賴關係的Job 做業的調度策
略。
若是你須要的是多個Spark 應用之間的調度策略,那麼在Standalone 模式下,這取決於每
個應用所申請和得到的CPU 資源的數量(暫時沒有得到資源的應用就Pending 在那裏了),
基本上就是FIFO 形式的,誰先申請和得到資源,誰就佔用資源直到完成。而在Yarn 模式
下,則多個Spark 應用間的調度策略由Yarn 本身的策略配置文件所決定。
那麼這個內部的調度邏輯有什麼用呢?若是你的Spark 應用是經過服務的形式,爲多個用
戶提交做業的話,那麼能夠經過配置Fair 模式相關參數來調整不一樣用戶做業的調度和資源
分配優先級。
spark.locality.wait
spark.locality.wait 和spark.locality.wait.process,spark.locality.wait.node,
spark.locality.wait.rack 這幾個參數影響了任務分配時的本地性策略的相關細節。
Spark 中任務的處理須要考慮所涉及的數據的本地性的場合,基本就兩種,一是數據的來源
是HadoopRDD; 二是RDD 的數據來源來自於RDD Cache(即由CacheManager 從
BlockManager 中讀取,或者Streaming 數據源RDD)。其它狀況下,若是不涉及shuffle
操做的RDD,不構成劃分Stage 和Task 的基準,不存在判斷Locality 本地性的問題,而
若是是ShuffleRDD,其本地性始終爲No Prefer,所以其實也無所謂Locality。
在理想的狀況下,任務固然是分配在能夠從本地讀取數據的節點上時(同一個JVM 內部或
同一臺物理機器內部)的運行時性能最佳。可是每一個任務的執行速度沒法準確估計,因此很
難在事先得到全局最優的執行策略,當Spark 應用獲得一個計算資源的時候,若是沒有可
以知足最佳本地性需求的任務能夠運行時,是退而求其次,運行一個本地性條件稍差一點的
任務呢,仍是繼續等待下一個可用的計算資源已指望它能更好的匹配任務的本地性呢?
這幾個參數一塊兒決定了Spark 任務調度在獲得分配任務時,選擇暫時不分配任務,而是等
待得到知足進程內部/節點內部/機架內部這樣的不一樣層次的本地性資源的最長等待時間。默
認都是3000 毫秒。
基本上,若是你的任務數量較大和單個任務運行時間比較長的狀況下,單個任務是否在數據
本地運行,代價區別可能比較顯著,若是數據本地性不理想,那麼調大這些參數對於性能優
化可能會有必定的好處。反之若是等待的代價超過帶來的收益,那就不要考慮了。
特別值得注意的是:在處理應用剛啓動後提交的第一批任務時,因爲看成業調度模塊開始工
做時,處理任務的Executors 可能尚未徹底註冊完畢,所以一部分的任務會被放置到No
Prefer 的隊列中,這部分任務的優先級僅次於數據本地性知足Process 級別的任務,從而
被優先分配到非本地節點執行,若是的確沒有Executors 在對應的節點上運行,或者的確是
No Prefer 的任務(如shuffleRDD),這樣作確實是比較優化的選擇,可是這裏的實際狀況
只是這部分Executors 還沒來得及註冊上而已。這種狀況下,即便加大本節中這幾個參數的
數值也沒有幫助。針對這個狀況,有一些已經完成的和正在進行中的PR 經過例如動態調整
No Prefer 隊列,監控節點註冊比例等等方式試圖來給出更加智能的解決方案。不過,你也
能夠根據自身集羣的啓動狀況,經過在建立SparkContext 以後,主動Sleep 幾秒的方式來
簡單的解決這個問題。
spark.speculation
spark.speculation 以及spark.speculation.interval, spark.speculation.quantile,
spark.speculation.multiplier 等參數調整Speculation 行爲的具體細節,Speculation 是在任
務調度的時候,若是沒有適合當前本地性要求的任務可供運行,將跑得慢的任務在空閒計算
資源上再度調度的行爲,這些參數調整這些行爲的頻率和判斷指標,默認是不使用
Speculation 的。
一般來講很難正確的判斷是否須要Speculation,能真正發揮Speculation 用處的場合,往
往是某些節點因爲運行環境緣由,好比CPU 資源因爲某種緣由被佔用,磁盤損壞致使IO
緩慢形成任務執行速度異常的狀況,固然前提是你的分區任務不存在僅能被執行一次,或者
不能同時執行多個拷貝等狀況。Speculation 任務參照的指標一般是其它任務的執行時間,
而實際的任務可能因爲分區數據尺寸不均勻,原本就會有時間差別,加上必定的調度和IO
的隨機性,因此若是一致性指標定得過嚴,Speculation 可能並不能真的發現問題,反而增
加了沒必要要的任務開銷,定得過寬,大概又基本至關於沒用。
我的以爲,若是你的集羣規模比較大,運行環境複雜,的確可能常常發生執行異常,加上數
據分區尺寸差別不大,爲了程序運行時間的穩定性,那麼能夠考慮仔細調整這些參數。不然
仍是考慮如何排除形成任務執行速度異常的因數比較靠鋪一些。