從源碼剖析一個 Spark WordCount Job 執行的全過程 | Mz's Blog編程
WordCount 能夠說是分佈式數據處理框架的」Hello World」,咱們能夠以它爲例來剖析一個 Spark Job 的執行全過程。數組
咱們要執行的代碼爲:網絡
sc.textFile("hdfs://...").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect
複製代碼
只有一行,很簡單也很經典的代碼。這裏的collect
做爲一個 action,將觸發一個 Job,如今咱們從源碼開始剖析這個 Job 執行的所有過程。我此次讀的源碼是 Spark 1.4.1 的 release 版本。閉包
爲了方便描述,咱們把上面的代碼先進行一下拆分,這樣能夠清晰的看到每一步生成的 RDD 及其依賴關係,並方便下面分析時進行引用:框架
val hadoopRDD0 = sc.textFile("hdfs://...") // HadoopRDD[0]
val mapPartitionsRDD1 = hadoopRDD0.flatMap(_.split(" ")) // MapPartitionsRDD[2]
val mapPartitionsRDD2 = mapPartitionsRDD1.map((_, 1)) // MapPartitionsRDD[2]
val shuffledRDD3 = mapPartitionsRDD2.reduceByKey(_+_) // ShuffledRDD[3]
shuffledRDD3.collect // action
複製代碼
首先,collect 調用了 SparkContext 上的 runJob 方法。這個方法是一個阻塞方法,會在 Job 完成以前一直阻塞等待,直到 Job 執行完成以後返回所得的結果:分佈式
RDD.collectide
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
複製代碼
須要注意的是這裏傳入了一個函數,這個函數就是這個 Job 的要執行的任務。後面咱們能夠看到,它將會被包裝並序列化後發送到要執行它的 executor 上,並在要處理的 RDD 上的每一個分區上被調用執行。函數
SparkContext 的 runJob 被調用以後,這個 Job 的信息被遞傳給了 SparkContext 持有的一個 DAGScheduler 上。DAGScheduler 自己維護着一個消息隊列,在收到這個 Job 以後,將給本身的消息隊列發送一個 JobSubmitted 消息。這個消息中包含了新生成的一個 JobId, 觸發 action 的 RDD,通過清理後的閉包函數,要處理的各個分區的在 RDD 中的索引,以及一些其餘信息。oop
DAGScheduler 的消息隊列在收到 JobSubmitted 消息後,將觸發調用 handleJobSubmitted 方法。在這個方法中,首先會根據這個觸發 action 的 RDD 的依賴信息計算出這個 Job 的全部 Stage。在這個 WordCount 中,咱們是在 reduceByKey 生成的 shuffledRDD3(其生成的過程涉及到通用的 combineByKey 方法,具體能夠參考這篇文章)上觸發的 action,因此咱們的 ResultStage 所對應的 finalRDD 就是 shuffledRDD3,ResultStage 所要執行的就是 shuffledRDD3 的全部分區。shuffledRDD3 有一個 ShuffleDependency,指向 mapPartitionsRDD2,據此 ShuffleDependency 會生成一個 ShuffleMapStage,它是 ResultStage 的父 Stage。大數據
在分析出全部的 Stage 以後,DAGScheduler 會根據 ResultStage 建立出一個 ActiveJob 對象,用來表示這個活躍的 Job。而後提交 ResultStage,可是在真正執行這個 Stage 以前,先遞歸的判斷它有沒有父 Stage,如有的話先提交它的父 Stage,並將當前 Stage 加入等待隊列;若沒有父 Stage,纔會真正的開始執行這個 Stage。等待隊列中的 Stage,會在父 Stage 都執行完成以後再被執行。
由此能夠看出,在一個 Job 中,Stage 之間必須按序執行,後一個 Stage 的執行將依賴前一個 Stage 的結果。一個 Job 只會有一個 ResultStage,而且這個 ResultStage 必定會是整個 Job 的最後一個 Stage,因此 ResultStage 執行的結束也就標誌着整個 Job 的結束。
按照以前的分析,咱們的 Job 一共有兩個 Stage,一個 ShuffleMapStage,一個 ResultStage,並將先執行 ShuffleMapStage。在執行 Stage 的時候,會按此 Stage 對應的 RDD 的分區數量,對應每個分區建立一個 Task。若是是 ShuffleMapStage 則建立 ShuffleMapTask,若是是 ResultStage 則建立 ResultTask。這些 Task 在後面將會被序列化後發到其餘的 executor 上面去運行。
在這裏分析一下每一個 Task 包含哪些信息 兩種 Task 都會包含的信息有 (1) 當前 Stage 對應的 RDD 對象(輕量級) (2) 當前 Stage 的 ID (3) 要處理的那個分區信息(輕量級),以及該任務可能的最優執行位置(例如,對於 hdfs 上的文件,HadoopRDD 中會記錄其每個分區存儲在集羣的位置,並將這個位置經過依賴繼承到其子 RDD)
除此以外,ShuffleMapTask 還包含了對應的 ShuffleDependency 的對象(這其中實際上有分區的方法,數據合併的方法等計算時所需的信息);ResultTask 還包含了當前這個 Job 最終要執行在每一個數據上的函數(在此狀況下就是 collect 傳給 SparkContext 的那個函數)。
在對每一個要處理的分區建立出各個 Task 以後,DAGScheduler 會將同一個 Stage 的各個 Task 合併成一個 TaskSet,並將其提交給 TaskScheduler。至此,調度這些 Task 的工做就交給了 TaskScheduler 來進行。
TaskScheduler 在收到這個 TaskSet 以後,首先爲其建立一個 TaskSetManager,這個 TaskSetManager 將輔助任務的調度。而後 TaskScheduler 將會調用 SchedulerBackend 上的 reviveOffers 方法去申請可用的資源。
SchedulerBackend 是一個接口,它在不一樣的部署模式下會有不一樣的實現(實際上 TaskScheduler 也是這樣)。SchedulerBackend 的做用是調度和控制整個集羣裏面的資源(我是這麼理解的,這裏的資源指的是可用的 executors),當 reviveOffers 方法被調用後,它會將當前可用的全部資源信息,經過調用 TaskScheduler 的 resourceOffers 提供給 TaskScheduler(實際上這個過程是經過另外一個 EndPoint 類以消息隊列的方式實現的,這樣能夠保證同時只會進行一個對資源的申請或釋放過程)。
TaskScheduler 在收到當前全部可用的資源信息後,會將這些資源信息按序提供給當前正在執行的多個 TaskSet,每一個 TaskSet 再根據這些資源信息將當前能夠執行的 Task 序列化後包裝到一個 TaskDescription 對象中返回(這個 TaskDescription 對象中也包含了這個任務將要運行在哪一個 executor 上),最終經過 TaskScheduler 將全部當前的資源狀況能夠執行的 Task 對應的 TaskDescription 返回給 SchedulerBackend。
SchedulerBackend 這時才根據每一個 TaskDescription 將 executors 資源真正的分配給這些 Task,並記錄已分配掉的資源和剩餘的資源,而後將 TaskDescription 中序列化後的 Task 經過網絡(Spark 使用 akka 框架)發送給它對應的 executor。
集羣中的 executor 在收到 Task 後,申請一個線程開始運行這個 Task。這是整個 Job 中最核心的部分了,真正的計算都在這一步發生。首先將其反序列化,而後調用這個 Task 對象上的 runTask 方法。在這裏對於 ShuffleMapTask 和 ResultTask,runTask 方法有着不一樣的實現,並將返回不一樣的內容。咱們分別來分別分析。
對於 ShuffleMapTask,runTask 首先獲取對應的 RDD 和 ShuffleDependency。在這裏對應的 RDD 是 mapPartitionsRDD2,ShuffleDependency 中則有着合併的計算信息。而後調用 RDD 的 iterator 方法獲取一個對應分區數據的迭代器。若是當前 RDD 分區的數據已經在以前計算過了,則會直接去內存或磁盤中獲取,不然在此時就會調用 mapPartitionsRDD2 的 compute 方法,根據其依賴去計算它的分區數據。若是 ShuffleDependency 中的 mapSideCombine 標記爲 true,就會將 iterator 方法返回的分區數據在這裏(也就是 map 端)進行合併(此時要求 ShuffleDependency 中的 aggregator 不爲空,aggregator 中包含了如何將數據進行合併的信息)。而後根據 ShuffleDependency 中的 partitioner(默認是一個 HashPartitioner)計算出每條數據在其結果端(就是 shuffleRDD3 中)的分區,並將其寫入到本地磁盤中對應的文件中去(在這裏寫入方法有多種實現方式,1.4.1 的版本默認是用了 SortShuffleManager,還有的其餘實現是 HashShuffleManager 和 UnsafeShuffleManager,具體的實現方法在此處就不詳說了)。當分區的每條數據都處理完後,runTask 會返回一個 MapStatus,這其中包含了一個 BlockManagerId(標記了這個任務被執行的位置,也就是 Map 後的數據存儲的位置)以及每一個結果分區(每一個 reduceId)的數據的大小信息。最後這個 MapStatus 將經過網絡發回給 driver,dirver 將其記錄。
ShuffleMapTask.runTask
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
複製代碼
對於 ResultTask,runTask 首先也是獲取對應的 RDD 和要在數據上執行的函數 func。在這裏對應的 RDD 應該是 shuffleRDD3,而後調用 RDD 上的 iterator 獲取這個分區的數據,並將其傳入 func 函數中,將 func 函數的返回值做爲 runTask 的返回值返回。過程看似簡單,實際上在 shuffleRDD3 上調用 iterator 時就對應了 shuffle 的 reduce 端的合併。從 shuffleRDD3 的 compute 方法的實現能夠看出,它的每一個分區的數據都要去執行了 ShuffleMapTask 的 executor 上面獲取,因此會產生大量的網絡流量和磁盤 IO。這個過程就是 MapReduce 範式中的 shuffle 過程,這裏面還有不少的細節我並無詳述,可是這個過程十分關鍵,它的實現效率直接決定了分佈式大數據處理的效率。
ResultTask.runTask
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
複製代碼
在 runTask 計算結束返回數據後,executor 將其返回的數據進行序列化,而後根據序列化後數據的大小進行判斷:若是數據大與某個值,就將其寫入本地的內存或磁盤(若是內存不夠),而後將數據的位置 blockId 和數據大小封裝到一個 IndirectTaskResult 中,並將其序列化;若是數據不是很大,則直接將其封裝入一個 DirectTaskResult 並進行序列化。最終將序列化後的 DirectTaskResult 或者 IndirectTaskResult 遞傳給 executor 上運行的一個 ExecutorBackend 上(經過 statusUpdate 方法)。
ExecutorBackend 如上面的 SchedulerBackend 有着類似的功能(實際上,對於 local 模式,這兩個類都由一個 LocalBackend 實現),將結果封入一個 StatusUpdate 消息透傳給一個對應的 EndPoint 類,EndPoint 類中收到這個消息後將該消息再經過網絡發送給 driver。
在 driver 端的 SchedulerBackend 收到這個 StatusUpdate 消息以後,將結果續傳給 TaskScheduler,並進行資源的釋放,在釋放資源後再調用一次 reviveOffers,這樣又能夠重複上面所描述的過程,將釋放出來的資源安排給其餘的 Task 來執行。
TaskScheduler 在收到任務結果後,將這個任務標記爲結束,而後使用一個 TaskResultGetter 類來進行結果的解析。TaskResultGetter 將結果反序列化,判斷若是其是一個 DirectTaskResult 則直接抽取出其中的結果;若是是一個 IndirectTaskResult 則須要根據其中的 blockId 信息去對應的機器上拉取結果。最終都是將結果拉取到 driver 的內存中(這就是咱們最好不要在大數據集上執行相似 collect 的方法的緣由,它會將全部的數據拉入 driver 的內存中,形成大量的內存開銷,甚至內存不足)。而後 TaskResultGetter 會將拉取到的結果遞交給 TaskScheduler,TaskScheduler 再將此結果遞交給 DAGScheduler。
DAGScheduler 在收到 Task 完成的消息後,先判斷這完成的是一個什麼任務。若是是一個 ShuffleMapTask 則須要將返回的結果(MapStatus)記錄到 driver 中,並判斷若是當前的 ShuffleMapStage 如果已經完成,則去提交下一個 Stage。若是是一個 ResultTask 完成了, 則將其結果遞交給 JobWaiter,並標記這個任務以完成。
JobWaiter 是 DAGScheduler 在最開始 submitJob 的時候建立的一個對象,用於阻塞等待任務的完成,並進行結果的處理。JobWaiter 在每收到一個 ResultTask 的結果時,都將結果在 resultHandler 上執行。這個 resultHandler 則是由 SparkContext 傳進來的一個函數,其做用是將數據放入一個數組中,這個數組最終將做爲 SparkContext.runJob 方法的返回值,被最開始的 collect 方法接收而後返回。若 JobWaiter 收到了每一個 ResultTask 的結果,則表示整個 Job 已經完成,此時就中止阻塞等待,因而 SparkContext.runJob 返回一個結果的數組,並由 collect 接收後返回給用戶程序。
至此,一個 Spark 的 WordCount 執行結束。
本文從源碼的角度詳細分析了一個 Spark Job 的整個執行、調度的過程,不過不少東西還只是淺嘗輒止,並未徹底深刻。儘管如此,通過連續好幾天的分析,我仍是以爲收穫頗豐,對 Spark 的實現原理有了更加深刻的理解,甚至對 MapReduce 的編程範式以及其 shuffle 過程也增長了很多理解。PS:其實從一開始我到分析結束都是沒有作任何記錄的,只由於一直只知其一;不知其二實在不知道如何來作記錄,因此只是去查閱一些資料和使勁兒的閱讀源碼。在我自認爲分析結束後,我纔開始寫這篇記錄,可是在寫的過程當中我才發現我分析的過程有一些並非很清晰,而後從新去看,才真正弄的比較清晰了。可見寫博文是很重要的過程,不只是將學到的知識分享出來,並且對自身的知識也有很好的加固做用。