淺談Spark On Yarn 中的延遲調度問題

延遲調度算法思想十分簡單,爲了實現data locality(即該task所需數據就在其運行的機器上),會盡可能將task分佈到有其所需數據的機器或者jvm中去,若是機器或者jvm已被佔用就進行延遲等待,直到該機器或者jvm能夠運行該task或者超過等待時限則將task運行到其餘機器上。
這個想法基於如下幾點:
1.每每數據比程序要大得多,分佈式上處理的數據都是GB爲單位的,將程序放到數據所在機器去執行,大大減小網絡傳輸時間。
2.在集羣上面task通常都是運行時間較短的,即整個集羣上面不斷有task完成,釋放其佔用的資源,延遲調度的task可以有極大的機會得到分配。
總之,就是延遲調度節省的網絡傳輸時間遠遠大於task等待花費的時間。
延遲調度的思想是相通的,本文討論的是spark在yarn集羣上的延遲調度狀況,故分爲兩層,第一層是yarn的延遲調度,第二層則是spark內部的延遲調度。
1.yarn級別的Delay Scheduler
spark在yarn上面的Delay Scheduler其實就是以爲spark的executor分配在哪些NodeManager上面,這是由yarn根據application的輸入文件而定。儘可能將executor分佈到有數據的NodeManager上。由於,在這一層上若是executor沒法作到data locality,那麼到了spark的級別分配task到executor的時候,更加沒法實現data locality。
在yarn中配置yarn.scheduler.capacity.node-locality-delay配置延遲等待次數。(一般設置機架數量)。
2.spark內部Task的Delay Scheduler
這個級別的Delay Scheduler是面臨的問題,是將task分到有數據的executor上去,上面已經說了,這一層次的Delay Scheduler依賴於yarn對executor的分配。另外,在運算過程當中,有task 的Delay Scheduler是由於咱們在spark中對數據進行了cache或者persist。在shuffle中是不用考慮Delay Scheduler的,由於shuffle中的read task 是須要去全部的write task的disk上拉取數據的,故也就不存在經過延遲調度來選擇data locality的問題了。
在spark中會有3個配置項:
spark.locality.wait.process default 3000ms
spark.locality.wait.node default spark.locality.wait.process
spark.locality.wait.rack default spark.locality.wait.process
目前就還有最後一個問題,須要解決了,配置項該以什麼標準進行配置?
在這篇論文中Delay Scheduling: A Simple Technique for AchievingLocality and Fairness in Cluster Scheduling  有一個詳細的介紹,這裏我直接給出公式:
Job等待一次task實現data locality所花的最長時間 W= (D/S)*T=D/(L*M) * T
D是實現Data Locality,須要延遲等待的次數
M是本次計算用到的集羣節點數
L爲每一個節點能用的core數量
S即爲集羣能用的總的core
N爲本次job的task數量
R爲文件的備份數量(HDFS默認爲3)
λ爲指望本次job達到的數據本地率
T爲單個task運行所須要的時間。
經過上面兩個公式,咱們就能計算出yarn和spark中的延遲調度項如何配置了。
(D則爲yarn配置的延遲等待次數,W則爲spark中配置中的等待時間。)node

相關文章
相關標籤/搜索