在上兩篇文章 spark 源碼分析之十九 -- DAG的生成和Stage的劃分 和 spark 源碼分析之二十 -- Stage的提交 中剖析了Spark的DAG的生成,Stage的劃分以及Stage轉換爲TaskSet後的提交。html
以下圖,咱們在前兩篇文章中剖析了DAG的構建,Stage的劃分以及Stage轉換爲TaskSet後的提交,本篇文章主要剖析TaskSet被TaskScheduler提交以後的Task的整個執行流程,關於具體Task是如何執行的兩種stage對應的Task的執行有本質的區別,咱們將在下一篇文章剖析。node
咱們先來剖析一下SchdulerBackend的子類實現。在yarn 模式下,它有兩個實現yarn-client 模式下的 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend實現 和 yarn-cluster 模式下的 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend 實現,以下圖。apache
這兩個類在spark 項目的 resource-managers 目錄下的 yarn 目錄下定義實現。後端
下面簡單看一下這幾個類的定義和實現。數組
簡單說明一下,這個類主要是負責想Cluster Master請求或殺掉executor。核心方法以下,不作過多解釋,能夠看源碼作進一步瞭解。安全
A backend interface for scheduling systems that allows plugging in different ones under TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as machines become available and can launch tasks on them.網絡
其定義的方法以下:多線程
killTask:請求 executor 殺掉正在運行的task併發
applicationId:獲取job的applicationIdapp
applicationAttemptId:獲取task的 attemptId
getDriverLogUrls:獲取驅動程序日誌的URL。這些URL用於顯示驅動程序的UI Executors選項卡中的連接。
maxNumConcurrentTasks:當前task的最大併發數
下面咱們來看一下它的子類。
A scheduler backend that waits for coarse-grained executors to connect. This backend holds onto each executor for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the coarse-grained Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
調度程序後端,等待粗粒度執行程序進行鏈接。 此後端在Spark做業期間保留每一個執行程序,而不是在任務完成時放棄執行程序並要求調度程序爲每一個新任務啓動新的執行程序。 執行程序能夠以多種方式啓動,例如用於粗粒度Mesos模式的Mesos任務或用於Spark的獨立部署模式(spark.deploy。*)的獨立進程。
它是線程安全的。表明的是Driver的endpoint
以下,是它的類結構:
rpcEnv 是指的每一個節點上的NettyRpcEnv
executorsPendingLossReason:記錄了已經丟失的而且不知道緣由的executor
addressToExecutorId:記錄了每個executor的id和executor地址的映射關係
下面咱們看一下Task以及其繼承關係。
它是Task的基本單元。
run:運行Task,被executor調用,源碼以下:
runTask 運行Task,被run方法調用,它是一個抽象方法,由子類來實現。
kill:殺死Task。源碼以下:
下面看一下其繼承關係。
Task的繼承關係以下:
A unit of execution. We have two kinds of Task's in Spark:
- org.apache.spark.scheduler.ShuffleMapTask
- org.apache.spark.scheduler.ResultTask
A Spark job consists of one or more stages.
The very last stage in a job consists of multiple ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task and sends the task output back to the driver application. A ShuffleMapTask executes the task and divides the task output to multiple buckets (based on the task's partitioner).
下面分別看一下兩個Task的實現,是如何定義 runTask 方法的?
類名:org.apache.spark.scheduler.ResultTask
其runTask方法以下:
類名:org.apache.spark.scheduler.ShuffleMapTask
其runTask方法以下:
全稱:org.apache.spark.executor.Executor
Executor對象是Spark Executor的抽象,它背後有一個線程池用來執行任務。其實從源碼能夠看出,Spark的Executor這個術語,其實來自於Java線程池部分的Executors。
下面主要分析一下其內部的結構。
線程池定義以下:
Executor會不斷地向driver發送心跳來彙報其健康情況,以下:
EXECUTOR_HEARTBEAT_INTERVAL 值默認爲 10s, 能夠經過參數 spark.executor.heartbeatInterval 來進行調整。
startDriverHeartBeater方法以下:
其依賴方法 reportHeartBeat 方法源碼以下:
首先先來了解一下 TaskReaper。
類說明:
Supervises the killing / cancellation of a task by sending the interrupted flag, optionally sending a Thread.interrupt(), and monitoring the task until it finishes. Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptable or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks. The TaskReaper was introduced in SPARK-18761 as a mechanism to monitor and clean up zombie tasks. For backwards-compatibility / backportability this component is disabled by default and must be explicitly enabled by setting spark.task.reaper.enabled=true. A TaskReaper is created for a particular task when that task is killed / cancelled. Typically a task will have only one TaskReaper, but it's possible for a task to have up to two reapers in case kill is called twice with different values for the interrupt parameter. Once created, a TaskReaper will run until its supervised task has finished running. If the TaskReaper has not been configured to kill the JVM after a timeout (i.e. if spark.task.reaper.killTimeout < 0) then this implies that the TaskReaper may run indefinitely if the supervised task never exits.
其源碼以下:
思路:發送kill信號,等待必定時間後,若是任務中止,則返回,不然yarn模式下拋出一場,對local模式沒有影響。
reaper機制默認是不啓用的,能夠經過參數 spark.task.reaper.enabled 來啓用。
它也是一個daemon的支持多個worker同時工做的線程池,也就是說能夠同時中止多個任務。
當kill任務的時候,會調用kill Task方法,源碼以下:
在上一篇文章spark 源碼分析之二十 -- Stage的提交中,提到SchedulerBackend接收到task請求後調用了 makeOffsers 方法,以下:
先調用TaskScheduler分配資源,並返回TaskDescription對象,而後拿着該對象去執行任務。
executorDataMap的定義以下:
其中ExecutorData 是記錄着executor的信息。包括 executor的address,port,可用cpu核數,總cpu核數等信息。
executorIsAlive方法定義以下:
即該executor既不在即將被回收的集合中也不在丟失的executor集合中。
WorkOffer對象表明着一個executor上的可用資源,類定義以下:
org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers 方法以下:
思路:先過濾掉不可用的WorkOffser對象,而後給每個TaskSet分配資源。若是taskSet是barrier的,須要初始化barrierCoordinator的rpc endpoint。
記錄hostname和executorId的映射關係,記錄executorId和taskId的映射關係,源碼以下:
1. 其中 executorAdded的源碼以下:
org.apache.spark.scheduler.DAGScheduler#executorAdded的映射關係以下:
通過eventProcessLoop異步消息隊列後,最終被以下分支處理:
最終處理邏輯以下,即把狀態健康的executor從失敗的epoch集合中移除。
2. 其中,獲取host的rack信息的方法沒有實現,返回None。
blacklistTrackerOpt 定義以下:
org.apache.spark.scheduler.BlacklistTracker#isBlacklistEnabled 方法以下:
即 BLACKLIST_ENABLED 能夠經過設置參數 spark.blacklist.enabled 來設定是否使用blacklist,默認沒有設置。若是設定了spark.scheduler.executorTaskBlacklistTime參數值大於 0 ,也啓用 blacklist。
BlacklistTracker 主要就是用來追蹤有問題的executor和host信息的,其類說明以下:
BlacklistTracker is designed to track problematic executors and nodes. It supports blacklisting executors and nodes across an entire application (with a periodic expiry). TaskSetManagers add additional blacklisting of executors and nodes for individual tasks and stages which works in concert with the blacklisting here. The tracker needs to deal with a variety of workloads, eg.:
bad user code -- this may lead to many task failures, but that should not count against individual executors
many small stages -- this may prevent a bad executor for having many failures within one stage, but still many failures over the entire application
"flaky" executors -- they don't fail every task, but are still faulty enough to merit blacklisting See the design doc on SPARK-8425 for a more in-depth discussion.
過濾掉host或executor在黑名單中的WorkOffer,對應源碼以下:
對應源碼以下:
首先對WorkOffer集合隨機打亂順序,而後獲取其可用core,可用slot的信息,而後獲取排序後的TaskSetManager隊列。rootPool是Pool對象,源碼在 TaskScheduler提交TaskSet 中有描述,再也不贅述。
CPUS_PER_TASK的核數默認是1,即一個task使用一個core,因此在spark算子中,儘可能不要使用多線程,由於就一個core,提升不了多少的性能。能夠經過spark.task.cpus參數進行調節。
org.apache.spark.scheduler.Pool#getSortedTaskSetQueue 源碼以下:
其中TaskSetManager的 getSortedTaskSetManager的源碼以下:
從新計算本地性:
org.apache.spark.scheduler.TaskSetManager#executorAdded 的源碼以下:
org.apache.spark.scheduler.TaskSetManager#computeValidLocalityLevels 源碼以下:
在這裏,能夠很好的理解五種數據本地性級別。先加入數據本地性數組的優先考慮使用。
對應源碼以下:
若是slot資源夠用或者TaskSet不是barrier的,開始爲TaskSet分配資源。
org.apache.spark.scheduler.TaskSchedulerImpl#resourceOfferSingleTaskSet 源碼以下:
思路:遍歷每個shuffledOffers,若是其可用cpu核數不小於一個slot所用的核數,則分配資源,分配資源完畢後,記錄taskId和taskSetManager的映射關係、taskId和executorId的映射關係、executorId和task的映射關係。最後可用核數減一個slot因此的cpu核數。
其依賴方法 org.apache.spark.scheduler.TaskSetManager#resourceOffer 源碼以下,思路:先檢查該executor和該executor所在的host都不在黑名單中,若在則返回None,不然開始分配資源。
分配資源步驟:
1. 計算數據本地性。
2. 每個task出隊並構建 TaskDescription 對象。
其依賴方法 org.apache.spark.scheduler.TaskSetManager#getAllowedLocalityLevel 源碼以下,目的就是計算該task 的容許的最大數據本地性。
若是任務資源分配成功而且TaskSet是barrier的,則初始化BarrierCoordinator,源碼以下:
依賴方法 org.apache.spark.scheduler.TaskSchedulerImpl#maybeInitBarrierCoordinator 以下:
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers中,分配資源結束後,就能夠運行task了,源碼以下:
其依賴方法 lauchTasks 源碼以下:
org.apache.spark.scheduler.TaskDescription#encode 方法是一個序列化的操做,將內存中的Java Function對象序列化爲字節數組。源碼以下:
maxRpcMessageSize定義以下:
org.apache.spark.util.RpcUtils#maxMessageSizeBytes 源碼以下:
默認爲128MB,能夠經過參數 spark.rpc.message.maxSize 來調整。
executorData可用核數減去一個Slot所需的核數後,去調用executor運行task。
對應 lauchTasks 源碼以下:
通過底層RPC的傳輸,executorEndpoint的處理代碼receive方法處理分支爲:
其主要有兩步,反序列化TaskDescription字節數據爲Java對象。
調用executor來運行task。
下面詳細來看每一步。
思路:將經過RPC傳輸過來的ByteBuffer對象中的字節數據內容反序列化爲在內存中的Java對象,即TaskDescription對象。
Executor對象是Spark Executor的抽象,它背後有一個線程池用來執行任務。其實從源碼能夠看出,Spark的Executor這個術語,其實來自於Java線程池部分的Executors。
launchTasks方法源碼以下:
TaskRunner是一個Runnable的實現,worker線程池中的worker會去執行其run方法。
下面來看一下TaskRunner類。
它繼承了Runnable接口,worker線程池中的worker會去執行其run方法來執行任務,其主要方法以下:
run方法比較長,劃分爲四部分來講明。
對應源碼以下:
初始化環境,修改task的運行狀態爲RUNNING,初始化gc時間。
其源碼以下:
反序列化Task對象,而且設置Task的依賴。
記錄任務開始時間,開始使用cpu時間,運行task,最後釋放內存。
其依賴方法 org.apache.spark.util.Utils#tryWithSafeFinally 源碼以下:
從源碼能夠看出,第一個方法是執行的方法,第二個方法是finally方法體中須要執行的方法。即釋放內存。
源碼以下:
關於metrics的相關內容,不作過多介紹。源碼以下:
思路:將返回值序列化爲ByteBuffer對象。
org.apache.spark.executor.CoarseGrainedExecutorBackend#statusUpdate 方法以下:
通過rpc後,driver端org.apache.spark.executor.CoarseGrainedExecutorBackend 的 receive 方法以下:
思路:更新task的狀態,接着在同一個executor上分配資源,執行任務。
org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate 方法以下:
源碼以下,不作再深刻的剖析:
源碼以下:
其依賴方法 org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask 源碼以下:
org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask 源碼以下:
org.apache.spark.scheduler.TaskSchedulerImpl#markPartitionCompletedInAllTaskSets 源碼以下:
org.apache.spark.scheduler.TaskSetManager#markPartitionCompleted 的源碼以下:
org.apache.spark.scheduler.TaskSetManager#maybeFinishTaskSet 源碼以下:
在org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask 源碼中,最後調用了dagScheduler的taskEnded 方法,源碼以下:
即發送事件消息給eventProcessLoop隊列作異步處理:
在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive 源碼中,處理該事件的分支爲:
即會調用 org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion,源碼中處理成功的返回值的代碼以下:
咱們重點關注其返回值的處理,若是執行的是一個Action操做,則會進入第一個分支。若是執行的是shuffle操做,則會進入第二個分支。
先來看第一個分支:
跟返回值有關的代碼以下:
org.apache.spark.scheduler.JobWaiter#taskSucceeded源碼以下:
思路:調用RDD定義的resultHandler方法,取出返回值,若是該 task執行完畢以後,全部task都已經執行完畢了,那麼jobPromise能夠標誌爲成功,driver就能夠拿着action操做返回的值作進一步操做。
假設是collect方法,能夠根據 org.apache.spark.SparkContext#submitJob 依賴方法推出resultHandler的定義,以下:
能夠知道,resultHandler是在調用方法以前傳遞過來的方法參數。
咱們從collect 方法正向推:
其調用的SparkContext的幾個重載的runJob方法以下:
即,上圖中標紅的就是resultHandler方法,collect方法是應用於整個RDD的分區的。
也就是說,org.apache.spark.scheduler.JobWaiter#taskSucceeded的第一個參數其實就是partition,第二個參數就是該action在RDD的該partition上計算後的返回值。
該resultHandler方法將返回值,直接賦值給result的特定分區。最終,將全部分區的數據都返回給driver。注意,如今的返回值是數組套數組的形式,即二維數組。
最終collect方法中也定義了二維數組flatten爲一維數組的方法,以下:
這個方法內部是會生成一個ArrayBuilder對象的用來添加數組元素,最終構造新數組返回。這個方法是會內存溢出的,因此不建議使用這個方法獲取大量結果數據。
下面,咱們來看第二個分支。
shuffle做業的返回值是 MapStatus 類型。
先來聊一下MapStatus類。
主要方法以下:
location表示 shuffle的output數據由哪一個BlockManager管理着。
getSizeForBlock:獲取指定block的大小。
其繼承關係以下:
CompressedMapStatus 主要是實現了壓縮的MapStatus,即在網絡傳輸進行序列化的時候,能夠對MapStatus進行壓縮。
HighlyCompressedMapStatus 主要實現了大block的存儲,以及保存了block的平均大小以及block是否爲空的信息。
咱們只關注返回值的處理,org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion方法中涉及值處理的源碼以下:
org.apache.spark.MapOutputTrackerMaster#registerMapOutput 的源碼以下,mapId就是partition的id:
其中,成員變量 shuffleStatuses 定義以下:
即shuffleStatuses在driver端保存了shuffleId和shuffleStatus的信息。便於後續stage能夠調用 MapOutputTrackerMasterEndpoint ref 來獲取該stage返回的MapStatus信息。具體內容,咱們將在下一節分析。
本篇文章主要介紹了跟Spark內部Task運行的細節流程,關於Task的運行部分沒有具體涉及,Task按照ResultStage和ShuffleStage劃分爲兩種Task,ResultStage任務和ShuffleStage分別對應的Task的執行流程有本質的區別,將在下一篇文章進行更加詳細的剖析。