Job 物理執行圖
在 Overview 裏咱們初步介紹了 DAG 型的物理執行圖,裏面包含 stages 和 tasks。這一章主要解決的問題是:
給定 job 的邏輯執行圖,如何生成物理執行圖(也就是 stages 和 tasks)?
一個複雜 job 的邏輯執行圖
代碼貼在本章最後。
給定這樣一個複雜數據依賴圖,如何合理劃分 stage,並肯定 task 的類型和個數? 一個直觀想法是將先後關聯的 RDDs 組成一個 stage,每一個箭頭生成一個 task。對於兩個 RDD 聚合成一個 RDD 的狀況,這三個 RDD 組成一個 stage。這樣雖然能夠解決問題,但顯然效率不高。除了效率問題,這個想法還有一個更嚴重的問題:
大量中間數據須要存儲。對於 task 來講,其執行結果要麼要存到磁盤,要麼存到內存,或者二者皆有。若是每一個箭頭都是 task 的話,每一個 RDD 裏面的數據都須要存起來,佔用空間可想而知。 仔細觀察一下邏輯執行圖會發現:在每一個 RDD 中,每一個 partition 是獨立的,也就是說在 RDD 內部,每一個 partition 的數據依賴各自不會相互干擾。所以,一個大膽的想法是將整個流程圖當作一個 stage,爲最後一個 finalRDD 中的每一個 partition 分配一個 task。圖示以下:
全部的粗箭頭組合成第一個 task,該 task 計算結束後順便將 CoGroupedRDD 中已經計算獲得的第二個和第三個 partition 存起來。以後第二個 task(細實線)只需計算兩步,第三個 task(細虛線)也只須要計算兩步,最後獲得結果。 這個想法有兩個不靠譜的地方:
- 第一個 task 太大,碰到 ShuffleDependency 後,不得不計算 shuffle 依賴的 RDDs 的全部 partitions,並且都在這一個 task 裏面計算。
- 須要設計巧妙的算法來判斷哪一個 RDD 中的哪些 partition 須要 cache。並且 cache 會佔用存儲空間。
雖然這是個不靠譜的想法,但有一個可取之處,即
pipeline 思想:數據用的時候再算,並且數據是流到要計算的位置的。好比在第一個 task 中,從 FlatMappedValuesRDD 中的 partition 向前推算,只計算要用的(依賴的) RDDs 及 partitions。在第二個 task 中,從 CoGroupedRDD 到 FlatMappedValuesRDD 計算過程當中,不須要存儲中間結果(MappedValuesRDD 中 partition 的所有數據)。 更進一步,從 record 粒度來說,以下圖中,第一個 pattern 中先算 g(f(record1)),而後原始的 record1 和 f(record1) 均可以丟掉,而後再算 g(f(record2)),丟掉中間結果,最後算 g(f(record3))。對於第二個 pattern 中的 g,record1 進入 g 後,理論上能夠丟掉(除非被手動 cache)。其餘 pattern 同理。
回到 stage 和 task 的劃分問題,上面不靠譜想法的主要問題是碰到 ShuffleDependency 後沒法進行 pipeline。那麼只要在 ShuffleDependency 處斷開,就只剩 NarrowDependency,而 NarrowDependency chain 是能夠進行 pipeline 的。按照此思想,上面 ComplexJob 的劃分圖以下:
因此劃分算法就是:
從後往前推算,遇到 ShuffleDependency 就斷開,遇到 NarrowDependency 就將其加入該 stage。每一個 stage 裏面 task 的數目由該 stage 最後一個 RDD 中的 partition 個數決定。 粗箭頭表示 task。由於是從後往前推算,所以最後一個 stage 的 id 是 0,stage 1 和 stage 2 都是 stage 0 的 parents。
若是 stage 最後要產生 result,那麼該 stage 裏面的 task 都是 ResultTask,不然都是 ShuffleMapTask。之因此稱爲 ShuffleMapTask 是由於其計算結果須要 shuffle 到下一個 stage,本質上至關於 MapReduce 中的 mapper。ResultTask 至關於 MapReduce 中的 reducer(若是須要從 parent stage 那裏 shuffle 數據),也至關於普通 mapper(若是該 stage 沒有 parent stage)。 還有一個問題:算法中提到 NarrowDependency chain 能夠 pipeline,但是這裏的
ComplexJob 只展現了 OneToOneDependency 和 RangeDependency 的 pipeline,普通 NarrowDependency 如何 pipeline? 回想上一章裏面 cartesian(otherRDD) 裏面複雜的 NarrowDependency,圖示以下:
通過算法劃分後結果以下:
圖中粗箭頭展現了第一個 ResultTask,其餘的 task 依此類推。因爲該 stage 的 task 直接輸出 result,因此這個圖包含 6 個 ResultTasks。與 OneToOneDependency 不一樣的是這裏每一個 ResultTask 須要計算 3 個 RDD,讀取兩個 data block,而整個讀取和計算這三個 RDD 的過程在一個 task 裏面完成。當計算 CartesianRDD 中的 partition 時,須要從兩個 RDD 獲取 records,因爲都在一個 task 裏面,不須要 shuffle。這個圖說明:
不論是 1:1 仍是 N:1 的 NarrowDependency,只要是 NarrowDependency chain,就能夠進行 pipeline,生成的 task 個數與該 stage 最後一個 RDD 的 partition 個數相同。
物理圖的執行
生成了 stage 和 task 之後,下一個問題就是
task 如何執行來生成最後的 result? 回到 ComplexJob 的物理執行圖,若是按照 MapReduce 的邏輯,從前到後執行,map() 產生中間數據 map outpus,通過 partition 後放到本地磁盤。再通過 shuffle-sort-aggregate 後生成 reduce inputs,最後 reduce() 執行獲得 result。執行流程以下:
整個執行流程沒有問題,但不能直接套用在 Spark 的物理執行圖上,由於 MapReduce 的流程圖簡單、固定,並且沒有 pipeline。 回想 pipeline 的思想是
數據用的時候再算,並且數據是流到要計算的位置的。Result 產生的地方的就是要計算的位置,要肯定 「須要計算的數據」,咱們能夠從後往前推,須要哪一個 partition 就計算哪一個 partition,若是 partition 裏面沒有數據,就繼續向前推,造成 computing chain。這樣推下去,結果就是:須要首先計算出每一個 stage 最左邊的 RDD 中的某些 partition。
對於沒有 parent stage 的 stage,該 stage 最左邊的 RDD 是能夠當即計算的,並且每計算出一個 record 後即可以流入 f 或 g(見前面圖中的 patterns)。若是 f 中的 record 關係是 1:1 的,那麼 f(record1) 計算結果能夠當即順着 computing chain 流入 g 中。若是 f 的 record 關係是 N:1,record1 進入 f() 後也能夠被回收。總結一下,computing chain 從後到前創建,而實際計算出的數據從前到後流動,並且計算出的第一個 record 流動到不能再流動後,再計算下一個 record。這樣,雖然是要計算後續 RDD 的 partition 中的 records,但並非要求當前 RDD 的 partition 中全部 records 計算獲得後再總體向後流動。 對於有 parent stage 的 stage,先等着全部 parent stages 中 final RDD 中數據計算好,而後通過 shuffle 後,問題就又回到了計算 「沒有 parent stage 的 stage」。
代碼實現:每一個 RDD 包含的 getDependency() 負責確立 RDD 的數據依賴,compute() 方法負責接收 parent RDDs 或者 data block 流入的 records,進行計算,而後輸出 record。常常能夠在 RDD 中看到這樣的代碼
firstParent[T].iterator(split, context).map(f)
。firstParent 表示該 RDD 依賴的第一個 parent RDD,iterator() 表示 parentRDD 中的 records 是一個一個流入該 RDD 的,map(f) 表示每流入一個 recod 就對其進行 f(record) 操做,輸出 record。爲了統一接口,這段 compute() 仍然返回一個 iterator,來迭代 map(f) 輸出的 records。
總結一下:
整個 computing chain 根據數據依賴關係自後向前創建,遇到 ShuffleDependency 後造成 stage。在每一個 stage 中,每一個 RDD 中的 compute() 調用 parentRDD.iter() 來將 parent RDDs 中的 records 一個個 fetch 過來。 若是要本身設計一個 RDD,那麼須要注意的是 compute() 只負責定義 parent RDDs => output records 的計算邏輯,具體依賴哪些 parent RDDs 由
getDependency()
定義,具體依賴 parent RDD 中的哪些 partitions 由
dependency.getParents()
定義。 例如,在 CartesianRDD 中,
// RDD x = (RDD a).cartesian(RDD b)
// 定義 RDD x 應該包含多少個 partition,每一個 partition 是什麼類型
override def getPartitions: Array[Partition] = {
// create the cross product split
val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
// 定義 RDD x 中的每一個 partition 怎麼計算獲得
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
// s1 表示 RDD x 中的 partition 依賴 RDD a 中的 partitions(這裏只依賴一個)
// s2 表示 RDD x 中的 partition 依賴 RDD b 中的 partitions(這裏只依賴一個)
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
// 定義 RDD x 中的 partition i 依賴於哪些 RDD 中的哪些 partitions
//
// 這裏 RDD x 依賴於 RDD a,同時依賴於 RDD b,都是 NarrowDependency
// 對於第一個依賴,RDD x 中的 partition i 依賴於 RDD a 中的
// 第 List(i / numPartitionsInRdd2) 個 partition
// 對於第二個依賴,RDD x 中的 partition i 依賴於 RDD b 中的
// 第 List(id % numPartitionsInRdd2) 個 partition
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
生成 job
前面介紹了邏輯和物理執行圖的生成原理,那麼,
怎麼觸發 job 的生成?已經介紹了 task,那麼 job 是什麼? 下表列出了能夠觸發執行圖生成的典型
action(),其中第二列是
processPartition()
,定義如何計算 partition 中的 records 獲得 result。第三列是
resultHandler()
,定義如何對從各個 partition 收集來的 results 進行計算來獲得最終結果。
Action |
finalRDD(records) => result |
compute(results) |
reduce(func) |
(record1, record2) => result, (result, record i) => result |
(result1, result 2) => result, (result, result i) => result |
collect() |
Array[records] => result |
Array[result] |
count() |
count(records) => result |
sum(result) |
foreach(f) |
f(records) => result |
Array[result] |
take(n) |
record (i<=n) => result |
Array[result] |
first() |
record 1 => result |
Array[result] |
takeSample() |
selected records => result |
Array[result] |
takeOrdered(n, [ordering]) |
TopN(records) => result |
TopN(results) |
saveAsHadoopFile(path) |
records => write(records) |
null |
countByKey() |
(K, V) => Map(K, count(K)) |
(Map, Map) => Map(K, count(K)) |
用戶的 driver 程序中一旦出現 action(),就會生成一個 job,好比
foreach()
會調用
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f))
,向 DAGScheduler 提交 job。若是 driver 程序後面還有 action(),那麼其餘 action() 也會生成 job 提交。因此,driver 有多少個 action(),就會生成多少個 job。這就是 Spark 稱 driver 程序爲 application(可能包含多個 job)而不是 job 的緣由。 每個 job 包含 n 個 stage,最後一個 stage 產生 result。好比,第一章的 GroupByTest 例子中存在兩個 job,一共產生了兩組 result。在提交 job 過程當中,DAGScheduler 會首先劃分 stage,而後先提交
無 parent stage 的 stages,並在提交過程當中肯定該 stage 的 task 個數及類型,並提交具體的 task。無 parent stage 的 stage 提交完後,依賴該 stage 的 stage 纔可以提交。從 stage 和 task 的執行角度來說,一個 stage 的 parent stages 執行完後,該 stage 才能執行。
提交 job 的實現細節
下面簡單分析下 job 的生成和提交代碼,提交過程在 Architecture 那一章也會有圖文並茂的分析:
- rdd.action() 會調用
DAGScheduler.runJob(rdd, processPartition, resultHandler)
來生成 job。
- runJob() 會首先經過
rdd.getPartitions()
來獲得 finalRDD 中應該存在的 partition 的個數和類型:Array[Partition]。而後根據 partition 個數 new 出來未來要持有 result 的數組 Array[Result](partitions.size)
。
- 最後調用 DAGScheduler 的
runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)
來提交 job。cleanedFunc 是 processParittion 通過閉包清理後的結果,這樣能夠被序列化後傳遞給不一樣節點的 task。
- DAGScheduler 的 runJob 繼續調用
submitJob(rdd, func, partitions, allowLocal, resultHandler)
來提交 job。
- submitJob() 首先獲得一個 jobId,而後再次包裝 func,向 DAGSchedulerEventProcessActor 發送 JobSubmitted 信息,該 actor 收到信息後進一步調用
dagScheduler.handleJobSubmitted()
來處理提交的 job。之因此這麼麻煩,是爲了符合事件驅動模型。
- handleJobSubmmitted() 首先調用 finalStage = newStage() 來劃分 stage,而後submitStage(finalStage)。因爲 finalStage 可能有 parent stages,實際先提交 parent stages,等到他們執行完,finalStage 須要再次提交執行。再次提交由 handleJobSubmmitted() 最後的 submitWaitingStages() 負責。
分析一下 newStage() 如何劃分 stage:
- 該方法在 new Stage() 的時候會調用 finalRDD 的 getParentStages()。
- getParentStages() 從 finalRDD 出發,反向 visit 邏輯執行圖,遇到 NarrowDependency 就將依賴的 RDD 加入到 stage,遇到 ShuffleDependency 切開 stage,並遞歸到 ShuffleDepedency 依賴的 stage。
- 一個 ShuffleMapStage(不是最後造成 result 的 stage)造成後,會將該 stage 最後一個 RDD 註冊到
MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
,這一步很重要,由於 shuffle 過程須要 MapOutputTrackerMaster 來指示 ShuffleMapTask 輸出數據的位置。
分析一下 submitStage(stage) 如何提交 stage 和 task:
- 先肯定該 stage 的 missingParentStages,使用
getMissingParentStages(stage)
。若是 parentStages 均可能已經執行過了,那麼就爲空了。
- 若是 missingParentStages 不爲空,那麼先遞歸提交 missing 的 parent stages,並將本身加入到 waitingStages 裏面,等到 parent stages 執行結束後,會觸發提交 waitingStages 裏面的 stage。
- 若是 missingParentStages 爲空,說明該 stage 能夠當即執行,那麼就調用
submitMissingTasks(stage, jobId)
來生成和提交具體的 task。若是 stage 是 ShuffleMapStage,那麼 new 出來與該 stage 最後一個 RDD 的 partition 數相同的 ShuffleMapTasks。若是 stage 是 ResultStage,那麼 new 出來與 stage 最後一個 RDD 的 partition 個數相同的 ResultTasks。一個 stage 裏面的 task 組成一個 TaskSet,最後調用taskScheduler.submitTasks(taskSet)
來提交一整個 taskSet。
- 這個 taskScheduler 類型是 TaskSchedulerImpl,在 submitTasks() 裏面,每個 taskSet 被包裝成 manager: TaskSetMananger,而後交給
schedulableBuilder.addTaskSetManager(manager)
。schedulableBuilder 能夠是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 調度器。submitTasks() 最後一步是通知backend.reviveOffers()
去執行 task,backend 的類型是 SchedulerBackend。若是在集羣上運行,那麼這個 backend 類型是 SparkDeploySchedulerBackend。
- SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子類,
backend.reviveOffers()
實際上是向 DriverActor 發送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的時候,會啓動 DriverActor。DriverActor 收到 ReviveOffers 消息後,會調用launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
來 launch tasks。scheduler 就是 TaskSchedulerImpl。scheduler.resourceOffers()
從 FIFO 或者 Fair 調度器那裏得到排序後的 TaskSetManager,並通過TaskSchedulerImpl.resourceOffer()
,考慮 locality 等因素來肯定 task 的所有信息 TaskDescription。調度細節這裏暫不討論。
- DriverActor 中的 launchTasks() 將每一個 task 序列化,若是序列化大小不超過 Akka 的 akkaFrameSize,那麼直接將 task 送到 executor 那裏執行
executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
。
Discussion
至此,咱們討論了:
- driver 程序如何觸發 job 的生成
- 如何從邏輯執行圖獲得物理執行圖
- pipeline 思想與實現
- 生成與提交 job 的實際代碼
還有不少地方沒有深刻討論,如:
- 鏈接 stage 的 shuffle 過程
- task 運行過程及運行位置
下一章重點討論 shuffle 過程。 從邏輯執行圖的創建,到將其轉換成物理執行圖的過程很經典,過程當中的 dependency 劃分,pipeline,stage 分割,task 生成 都是有條不紊,有理有據的。
ComplexJob 的源代碼
package internals
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.HashPartitioner
object complexJob {
def main(args: Array[String]) {
val sc = new SparkContext("local", "ComplexJob test")
val data1 = Array[(Int, Char)](
(1, 'a'), (2, 'b'),
(3, 'c'), (4, 'd'),
(5, 'e'), (3, 'f'),
(2, 'g'), (1, 'h'))
val rangePairs1 = sc.parallelize(data1, 3)
val hashPairs1 = rangePairs1.partitionBy(new HashPartitioner(3))
val data2 = Array[(Int, String)]((1, "A"), (2, "B"),
(3, "C"), (4, "D"))
val pairs2 = sc.parallelize(data2, 2)
val rangePairs2 = pairs2.map(x => (x._1, x._2.charAt(0)))
val data3 = Array[(Int, Char)]((1, 'X'), (2, 'Y'))
val rangePairs3 = sc.parallelize(data3, 2)
val rangePairs = rangePairs2.union(rangePairs3)
val result = hashPairs1.join(rangePairs)
result.foreachWith(i => i)((x, i) => println("[result " + i + "] " + x))
println(result.toDebugString)
}
}