WordCount能夠說是分佈式數據處理框架的」Hello World」,咱們能夠以它爲例來剖析一個Spark Job的執行全過程。編程
咱們要執行的代碼爲:數組
只有一行,很簡單也很經典的代碼。這裏的collect
做爲一個action,將觸發一個Job,如今咱們從源碼開始剖析這個Job執行的所有過程。我此次讀的源碼是Spark 1.4.1的release版本。網絡
爲了方便描述,咱們把上面的代碼先進行一下拆分,這樣能夠清晰的看到每一步生成的RDD及其依賴關係,並方便下面分析時進行引用:閉包
首先,collect調用了SparkContext上的runJob方法。這個方法是一個阻塞方法,會在Job完成以前一直阻塞等待,直到Job執行完成以後返回所得的結果:框架
RDD.collect
分佈式
須要注意的是這裏傳入了一個函數,這個函數就是這個Job的要執行的任務。後面咱們能夠看到,它將會被包裝並序列化後發送到要執行它的executor上,並在要處理的RDD上的每一個分區上被調用執行。ide
SparkContext的runJob被調用以後,這個Job的信息被遞傳給了SparkContext持有的一個DAGScheduler上。DAGScheduler自己維護着一個消息隊列,在收到這個Job以後,將給本身的消息隊列發送一個JobSubmitted消息。這個消息中包含了新生成的一個JobId, 觸發action的RDD,通過清理後的閉包函數,要處理的各個分區的在RDD中的索引,以及一些其餘信息。函數
DAGScheduler的消息隊列在收到JobSubmitted消息後,將觸發調用handleJobSubmitted方法。在這個方法中,首先會根據這個觸發action的RDD的依賴信息計算出這個Job的全部Stage。在這個WordCount中,咱們是在reduceByKey生成的shuffledRDD3(其生成的過程涉及到通用的combineByKey方法,具體能夠參考這篇文章)上觸發的action,因此咱們的ResultStage所對應的finalRDD就是shuffledRDD3,ResultStage所要執行的就是shuffledRDD3的全部分區。shuffledRDD3有一個ShuffleDependency,指向mapPartitionsRDD2,據此ShuffleDependency會生成一個ShuffleMapStage,它是ResultStage的父Stage。oop
在分析出全部的Stage以後,DAGScheduler會根據ResultStage建立出一個ActiveJob對象,用來表示這個活躍的Job。而後提交ResultStage,可是在真正執行這個Stage以前,先遞歸的判斷它有沒有父Stage,如有的話先提交它的父Stage,並將當前Stage加入等待隊列;若沒有父Stage,纔會真正的開始執行這個Stage。等待隊列中的Stage,會在父Stage都執行完成以後再被執行。post
由此能夠看出,在一個Job中,Stage之間必須按序執行,後一個Stage的執行將依賴前一個Stage的結果。一個Job只會有一個ResultStage,而且這個ResultStage必定會是整個Job的最後一個Stage,因此ResultStage執行的結束也就標誌着整個Job的結束。
按照以前的分析,咱們的Job一共有兩個Stage,一個ShuffleMapStage,一個ResultStage,並將先執行ShuffleMapStage。在執行Stage的時候,會按此Stage對應的RDD的分區數量,對應每個分區建立一個Task。若是是ShuffleMapStage則建立ShuffleMapTask,若是是ResultStage則建立ResultTask。這些Task在後面將會被序列化後發到其餘的executor上面去運行。
在這裏分析一下每一個Task包含哪些信息
兩種Task都會包含的信息有 (1)當前Stage對應的RDD對象(輕量級) (2)當前Stage的ID (3)要處理的那個分區信息(輕量級),以及該任務可能的最優執行位置(例如,對於hdfs上的文件,HadoopRDD中會記錄其每個分區存儲在集羣的位置,並將這個位置經過依賴繼承到其子RDD)除此以外,ShuffleMapTask還包含了對應的ShuffleDependency的對象(這其中實際上有分區的方法,數據合併的方法等計算時所需的信息);ResultTask還包含了當前這個Job最終要執行在每一個數據上的函數(在此狀況下就是collect傳給SparkContext的那個函數)。
在對每一個要處理的分區建立出各個Task以後,DAGScheduler會將同一個Stage的各個Task合併成一個TaskSet,並將其提交給TaskScheduler。至此,調度這些Task的工做就交給了TaskScheduler來進行。
TaskScheduler在收到這個TaskSet以後,首先爲其建立一個TaskSetManager,這個TaskSetManager將輔助任務的調度。而後TaskScheduler將會調用SchedulerBackend上的reviveOffers方法去申請可用的資源。
SchedulerBackend是一個接口,它在不一樣的部署模式下會有不一樣的實現(實際上TaskScheduler也是這樣)。SchedulerBackend的做用是調度和控制整個集羣裏面的資源(我是這麼理解的,這裏的資源指的是可用的executors),當reviveOffers方法被調用後,它會將當前可用的全部資源信息,經過調用TaskScheduler的resourceOffers提供給TaskScheduler(實際上這個過程是經過另外一個EndPoint類以消息隊列的方式實現的,這樣能夠保證同時只會進行一個對資源的申請或釋放過程)。
TaskScheduler在收到當前全部可用的資源信息後,會將這些資源信息按序提供給當前正在執行的多個TaskSet,每一個TaskSet再根據這些資源信息將當前能夠執行的Task序列化後包裝到一個TaskDescription對象中返回(這個TaskDescription對象中也包含了這個任務將要運行在哪一個executor上),最終經過TaskScheduler將全部當前的資源狀況能夠執行的Task對應的TaskDescription返回給SchedulerBackend。
SchedulerBackend這時才根據每一個TaskDescription將executors資源真正的分配給這些Task,並記錄已分配掉的資源和剩餘的資源,而後將TaskDescription中序列化後的Task經過網絡(Spark使用akka框架)發送給它對應的executor。
集羣中的executor在收到Task後,申請一個線程開始運行這個Task。這是整個Job中最核心的部分了,真正的計算都在這一步發生。首先將其反序列化,而後調用這個Task對象上的runTask方法。在這裏對於ShuffleMapTask和ResultTask,runTask方法有着不一樣的實現,並將返回不一樣的內容。咱們分別來分別分析。
對於ShuffleMapTask,runTask首先獲取對應的RDD和ShuffleDependency。在這裏對應的RDD是mapPartitionsRDD2,ShuffleDependency中則有着合併的計算信息。而後調用RDD的iterator方法獲取一個對應分區數據的迭代器。若是當前RDD分區的數據已經在以前計算過了,則會直接去內存或磁盤中獲取,不然在此時就會調用mapPartitionsRDD2的compute方法,根據其依賴去計算它的分區數據。若是ShuffleDependency中的mapSideCombine標記爲true,就會將iterator方法返回的分區數據在這裏(也就是map端)進行合併(此時要求ShuffleDependency中的aggregator不爲空,aggregator中包含了如何將數據進行合併的信息)。而後根據ShuffleDependency中的partitioner(默認是一個HashPartitioner)計算出每條數據在其結果端(就是shuffleRDD3中)的分區,並將其寫入到本地磁盤中對應的文件中去(在這裏寫入方法有多種實現方式,1.4.1的版本默認是用了SortShuffleManager,還有的其餘實現是HashShuffleManager和UnsafeShuffleManager,具體的實現方法在此處就不詳說了)。當分區的每條數據都處理完後,runTask會返回一個MapStatus,這其中包含了一個BlockManagerId(標記了這個任務被執行的位置,也就是Map後的數據存儲的位置)以及每一個結果分區(每一個reduceId)的數據的大小信息。最後這個MapStatus將經過網絡發回給driver,dirver將其記錄。
ShuffleMapTask.runTask
對於ResultTask,runTask首先也是獲取對應的RDD和要在數據上執行的函數func。在這裏對應的RDD應該是shuffleRDD3,而後調用RDD上的iterator獲取這個分區的數據,並將其傳入func函數中,將func函數的返回值做爲runTask的返回值返回。過程看似簡單,實際上在shuffleRDD3上調用iterator時就對應了shuffle的reduce端的合併。從shuffleRDD3的compute方法的實現能夠看出,它的每一個分區的數據都要去執行了ShuffleMapTask的executor上面獲取,因此會產生大量的網絡流量和磁盤IO。這個過程就是MapReduce範式中的shuffle過程,這裏面還有不少的細節我並無詳述,可是這個過程十分關鍵,它的實現效率直接決定了分佈式大數據處理的效率。
ResultTask.runTask
在runTask計算結束返回數據後,executor將其返回的數據進行序列化,而後根據序列化後數據的大小進行判斷:若是數據大與某個值,就將其寫入本地的內存或磁盤(若是內存不夠),而後將數據的位置blockId和數據大小封裝到一個IndirectTaskResult中,並將其序列化;若是數據不是很大,則直接將其封裝入一個DirectTaskResult並進行序列化。最終將序列化後的DirectTaskResult或者IndirectTaskResult遞傳給executor上運行的一個ExecutorBackend上(經過statusUpdate方法)。
ExecutorBackend如上面的SchedulerBackend有着類似的功能(實際上,對於local模式,這兩個類都由一個LocalBackend實現),將結果封入一個StatusUpdate消息透傳給一個對應的EndPoint類,EndPoint類中收到這個消息後將該消息再經過網絡發送給driver。
在driver端的SchedulerBackend收到這個StatusUpdate消息以後,將結果續傳給TaskScheduler,並進行資源的釋放,在釋放資源後再調用一次reviveOffers,這樣又能夠重複上面所描述的過程,將釋放出來的資源安排給其餘的Task來執行。
TaskScheduler在收到任務結果後,將這個任務標記爲結束,而後使用一個TaskResultGetter類來進行結果的解析。TaskResultGetter將結果反序列化,判斷若是其是一個DirectTaskResult則直接抽取出其中的結果;若是是一個IndirectTaskResult則須要根據其中的blockId信息去對應的機器上拉取結果。最終都是將結果拉取到driver的內存中(這就是咱們最好不要在大數據集上執行相似collect的方法的緣由,它會將全部的數據拉入driver的內存中,形成大量的內存開銷,甚至內存不足)。而後TaskResultGetter會將拉取到的結果遞交給TaskScheduler,TaskScheduler再將此結果遞交給DAGScheduler。
DAGScheduler在收到Task完成的消息後,先判斷這完成的是一個什麼任務。若是是一個ShuffleMapTask則須要將返回的結果(MapStatus)記錄到driver中,並判斷若是當前的ShuffleMapStage如果已經完成,則去提交下一個Stage。若是是一個ResultTask完成了, 則將其結果遞交給JobWaiter,並標記這個任務以完成。
JobWaiter是DAGScheduler在最開始submitJob的時候建立的一個對象,用於阻塞等待任務的完成,並進行結果的處理。JobWaiter在每收到一個ResultTask的結果時,都將結果在resultHandler上執行。這個resultHandler則是由SparkContext傳進來的一個函數,其做用是將數據放入一個數組中,這個數組最終將做爲SparkContext.runJob方法的返回值,被最開始的collect方法接收而後返回。若JobWaiter收到了每一個ResultTask的結果,則表示整個Job已經完成,此時就中止阻塞等待,因而SparkContext.runJob返回一個結果的數組,並由collect接收後返回給用戶程序。
至此,一個Spark的WordCount執行結束。
本文從源碼的角度詳細分析了一個Spark Job的整個執行、調度的過程,不過不少東西還只是淺嘗輒止,並未徹底深刻。儘管如此,通過連續好幾天的分析,我仍是以爲收穫頗豐,對Spark的實現原理有了更加深刻的理解,甚至對MapReduce的編程範式以及其shuffle過程也增長了很多理解。PS:其實從一開始我到分析結束都是沒有作任何記錄的,只由於一直只知其一;不知其二實在不知道如何來作記錄,因此只是去查閱一些資料和使勁兒的閱讀源碼。在我自認爲分析結束後,我纔開始寫這篇記錄,可是在寫的過程當中我才發現我分析的過程有一些並非很清晰,而後從新去看,才真正弄的比較清晰了。可見寫博文是很重要的過程,不只是將學到的知識分享出來,並且對自身的知識也有很好的加固做用。