大數據中有一個頗有名的概念就是「移動數據不如移動計算」,之因此有數據本地性就是由於數據在網絡中傳輸會有不小的I/O消耗,若是可以想辦法儘可能減小這個I/O消耗就可以提高效率。那麼如何減小I/O消耗呢,固然是儘可能不讓數據在網絡上傳輸,即便沒法避免數據在網絡上傳輸,也要儘可能縮短傳輸距離,這個數據須要傳輸多遠的距離(實際意味着數據傳輸的代價)就是數據本地性,數據本地性根據傳輸距離分爲幾個級別,不在網絡上傳輸確定是最好的級別,其它級別劃分依據傳輸距離越遠級別越低,Spark在分配任務的時候會考慮到數據本地性,優先將任務分配給數據本地性最好的Executor執行。node
在執行任務時查看Task的執行狀況,常常可以看到Task的狀態中有這麼一列: 緩存
這一列就是在說這個Task任務讀取數據的本地性是哪一個級別,數據本地性共分爲五個級別:網絡
PROCESS_LOCAL:顧名思義,要處理的數據就在同一個本地進程中,即數據和Task在同一個Executor JVM中,這種狀況就是RDD的數據在以前就已經被緩存過了,由於BlockManager是以Executor爲單位的,因此只要Task所須要的Block在所屬的Executor的BlockManager上已經被緩存,這個數據本地性就是PROCESS_LOCAL,這種是最好的locality,這種狀況下數據不須要在網絡中傳輸。jvm
NODE_LOCAL:數據在同一臺節點上,可是並不不在同一個jvm中,好比數據在同一臺節點上的另一個Executor上,速度要比PROCESS_LOCAL略慢。還有一種狀況是讀取HDFS的塊就在當前節點上,數據本地性也是NODE_LOCAL。socket
NO_PREF:數據從哪裏訪問都同樣,表示數據本地性無心義,看起來很奇怪,其實指的是從MySQL、MongoDB之類的數據源讀取數據。性能
RACK_LOCAL:數據在同一機架上的其它節點,須要通過網絡傳輸,速度要比NODE_LOCAL慢。大數據
ANY:數據在其它更遠的網絡上,甚至都不在同一個機架上,比RACK_LOCAL更慢,通常狀況下不會出現這種級別,萬一出現了多是有什麼異常須要排查下緣由。優化
使用一張圖來表示五個傳輸級別:spa
Spark在調度程序的時候並不必定老是能按照計算出的數據本地性執行,由於即便計算出在某個Executor上執行時數據本地性最好,可是Executor的core也是有限的,有可能計算出TaskFoo在ExecutorBar上執行數據本地性最好,可是發現ExecutorBar的全部core都一直被用着騰不出資源來執行新來的TaskFoo,因此當TaskFoo等待一段時間以後發現仍然等不到資源的話就嘗試下降數據本地性級別讓其它的Executor去執行。.net
好比當前有一個RDD,有四個分區,稱爲A、B、C、D,當前Stage中這個RDD的每一個分區對應的Task分別稱爲TaskA、TaskB、TaskC、TaskD,在以前的Stage中將這個RDD cache在了一臺機器上的兩個Executor上,稱爲ExecutorA、ExecutorB,每一個Executor的core是2,ExecutorA上緩存了RDD的A、B、C分區,ExecutorB上緩存了RDD的D分區,而後分配Task的時候會把TaskA、TaskB、TaskC分配給ExecutorA,TaskD分配給ExecutorB,可是由於每一個Executor只有兩個core,只能同時執行兩個Task,因此ExecutorA可以執行TaskA和TaskB,可是TaskC就只能等着,儘管它在ExecutorA上執行的數據本地性是PROCESS_LOCAL,可是人家沒有資源啊,因而TaskC就等啊等,可是等了一下子它發現不太對勁,搞這個數據本地性不就是爲了加快Task的執行速度以提升Stage的總體執行速度嗎,我擱這裏乾等着可不能加快Stage的總體速度,我要看下邊上有沒有其它的Executor是閒着的,假設我在ExecutorA須要再排隊10秒才能拿到core資源執行,拿到資源以後我須要執行30秒,那麼我只須要找到一個其它的Executor,即便由於數據本地性很差可是若是我可以在40秒內執行完的話仍是要比在這邊繼續傻等要快的,因此TaskC就給本身設定了一個時間,當超過n毫秒以後還等不到就放棄PROCESS_LOCAL級別,轉而嘗試NODE_LOCAL級別的Executor,而後它看到了ExecutorB,ExecutorB和ExecutorA在同一臺機器上,只是兩個不一樣的jvm,因此在ExecutorB上執行須要從ExecutorA上拉取數據,經過BlockManager的getRemote,底層經過BlockTransferService去把數據拉取過來,由於是在同一臺機器上的兩個進程之間使用socket數據傳輸,走的應該是迴環地址,速度會很是快,因此對於這種數據存儲在同一臺機器上的不一樣Executor上由於降級致使的NODE_LOCAL的狀況,理論上並不會比PROCESS_LOCAL慢多少,TaskC在ExecutorB上執行並不會比ExecutorA上執行慢多少。可是對於好比HDFS塊存儲在此節點因此將Task分配到此節點的狀況致使的NODE_LOCAL,由於要跟HDFS交互,還要讀取磁盤文件,涉及到了一些I/O操做,這種狀況就會耗費較長時間,相比較於PROCESS_LOCAL級別就慢上很多了。
上面舉的例子中提到了TaskC會等待一段時間,根據數據本地性不一樣,等待的時間間隔也不一致,不一樣數據本地性的等待時間設置參數:
spark.locality.wait:設置全部級別的數據本地性,默認是3000毫秒
spark.locality.wait.process:多長時間等不到PROCESS_LOCAL就降級,默認爲${spark.locality.wait}
spark.locality.wait.node:多長時間等不到NODE_LOCAL就降級,默認爲${spark.locality.wait}
spark.locality.wait.rack:多長時間等不到RACK_LOCAL就降級,默認爲${spark.locality.wait}
總結一下數據延遲調度策略:當使用當前的數據本地性級別等待一段時間以後仍然沒有資源執行時,嘗試下降數據本地性級別使用更低的數據本地性對應的Executor執行,這個就是Task的延遲調度策略。
最後探討一下什麼樣的Task能夠針對數據本地性延遲調度的等待時間作優化?
若是Task的輸入數據比較大,那麼耗費在數據讀取上的時間會比較長,一個好的數據本地性可以節省很長時間,因此這種狀況下最好仍是將延遲調度的降級等待時間調長一些。而對於輸入數據比較小的,即便數據本地性很差也只是多花一點點時間,那麼便沒必要在延遲調度上耗費太長時間。總結一下就是若是數據本地性對任務的執行時間影響較大的話就稍稍調高延遲調度的降級等待時間。
相關資料:
1. spark on yarn 中的延遲調度(delay scheduler)
.