摘自:https://www.cnblogs.com/qingyunzong/p/8945933.htmlhtml
(1)Application:表示你的應用程序node
(2)Driver:表示main()函數,建立SparkContext。由SparkContext負責與ClusterManager通訊,進行資源的申請,任務的分配和監控等。程序執行完畢後關閉SparkContextweb
(3)Executor:某個Application運行在Worker節點上的一個進程,該進程負責運行某些task,而且負責將數據存在內存或者磁盤上。在Spark on Yarn模式下,其進程名稱爲 CoarseGrainedExecutor Backend,一個CoarseGrainedExecutor Backend進程有且僅有一個executor對象,它負責將Task包裝成taskRunner,並從線程池中抽取出一個空閒線程運行Task,這樣,每一個CoarseGrainedExecutorBackend能並行運行Task的數據就取決於分配給它的CPU的個數。算法
(4)Worker:集羣中能夠運行Application代碼的節點。在Standalone模式中指的是經過slave文件配置的worker節點,在Spark on Yarn模式中指的就是NodeManager節點。shell
(5)Task:在Executor進程中執行任務的工做單元,多個Task組成一個Stageapache
(6)Job:包含多個Task組成的並行計算,是由Action行爲觸發的安全
(7)Stage:每一個Job會被拆分不少組Task,做爲一個TaskSet,其名稱爲Stage網絡
(8)DAGScheduler:根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler,其劃分Stage的依據是RDD之間的依賴關係多線程
(9)TaskScheduler:將TaskSet提交給Worker(集羣)運行,每一個Executor運行什麼Task就是在此處分配的。架構
(1)構建Spark Application的運行環境(啓動SparkContext),SparkContext向資源管理器(能夠是Standalone、Mesos或YARN)註冊並申請運行Executor資源;
(2)資源管理器分配Executor資源並啓動StandaloneExecutorBackend,Executor運行狀況將隨着心跳發送到資源管理器上;
(3)SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task
(4)Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
(5)Task在Executor上運行,運行完畢釋放全部資源。
(1)每一個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。這種Application隔離機制有其優點的,不管是從調度角度看(每一個Driver調度它本身的任務),仍是從運行角度看(來自不一樣Application的Task運行在不一樣的JVM中)。固然,這也意味着Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。
(2)Spark與資源管理器無關,只要可以獲取executor進程,並能保持相互通訊就能夠了。
(3)提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack裏,由於Spark Application運行過程當中SparkContext和Executor之間有大量的信息交換;若是想在遠程集羣中運行,最好使用RPC將SparkContext提交給集羣,不要遠離Worker運行SparkContext。
(4)Task採用了數據本地性和推測執行的優化機制。數據本地性是儘可能將計算移到數據所在的節點上進行,即「計算向數據靠攏」,由於移動計算比移動數據所佔的網絡資源要少得多。並且,Spark採用了延時調度機制,能夠在更大的程度上實現執行過程優化。好比,擁有數據的節點當前正被其餘的任務佔用,那麼,在這種狀況下是否須要將數據移動到其餘的空閒節點呢?答案是不必定。由於,若是通過預測發現當前節點結束當前任務的時間要比移動數據的時間還要少,那麼,調度就會等待,直到當前節點可用。
Job=多個stage,Stage=多個同種task, Task分爲ShuffleMapTask和ResultTask,Dependency分爲ShuffleDependency和NarrowDependency
面向stage的切分,切分依據爲寬依賴
維護waiting jobs和active jobs,維護waiting stages、active stages和failed stages,以及與jobs的映射關係
主要職能:
一、接收提交Job的主入口,
submitJob(rdd, ...)
或runJob(rdd, ...)
。在SparkContext
裏會調用這兩個方法。
- 生成一個Stage並提交,接着判斷Stage是否有父Stage未完成,如有,提交併等待父Stage,以此類推。結果是:DAGScheduler裏增長了一些waiting stage和一個running stage。
- running stage提交後,分析stage裏Task的類型,生成一個Task描述,即TaskSet。
- 調用
TaskScheduler.submitTask(taskSet, ...)
方法,把Task描述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會爲這個TaskSet分配資源並觸發執行。DAGScheduler
提交job後,異步返回JobWaiter
對象,可以返回job運行狀態,可以cancel job,執行成功後會處理並返回結果二、處理
TaskCompletionEvent
- 若是task執行成功,對應的stage裏減去這個task,作一些計數工做:
- 若是task是ResultTask,計數器
Accumulator
加一,在job裏爲該task置true,job finish總數加一。加完後若是finish數目與partition數目相等,說明這個stage完成了,標記stage完成,從running stages裏減去這個stage,作一些stage移除的清理工做- 若是task是ShuffleMapTask,計數器
Accumulator
加一,在stage里加上一個output location,裏面是一個MapStatus
類。MapStatus
是ShuffleMapTask
執行完成的返回,包含location信息和block size(能夠選擇壓縮或未壓縮)。同時檢查該stage完成,向MapOutputTracker
註冊本stage裏的shuffleId和location信息。而後檢查stage的output location裏是否存在空,若存在空,說明一些task失敗了,整個stage從新提交;不然,繼續從waiting stages裏提交下一個須要作的stage- 若是task是重提交,對應的stage裏增長這個task
- 若是task是fetch失敗,立刻標記對應的stage完成,從running stages裏減去。若是不容許retry,abort整個stage;不然,從新提交整個stage。另外,把這個fetch相關的location和map任務信息,從stage裏剔除,從
MapOutputTracker
註銷掉。最後,若是此次fetch的blockManagerId對象不爲空,作一次ExecutorLost
處理,下次shuffle會換在另外一個executor上去執行。- 其餘task狀態會由
TaskScheduler
處理,如Exception, TaskResultLost, commitDenied等。三、其餘與job相關的操做還包括:cancel job, cancel stage, resubmit failed stage等
其餘職能:
cacheLocations 和 preferLocation
維護task和executor對應關係,executor和物理資源對應關係,在排隊的task和正在跑的task。
內部維護一個任務隊列,根據FIFO或Fair策略,調度任務。
TaskScheduler
自己是個接口,spark裏只實現了一個TaskSchedulerImpl
,理論上任務調度能夠定製。
主要功能:
一、submitTasks(taskSet)
,接收DAGScheduler
提交來的tasks
- 爲tasks建立一個
TaskSetManager
,添加到任務隊列裏。TaskSetManager
跟蹤每一個task的執行情況,維護了task的許多具體信息。- 觸發一次資源的索要。
- 首先,
TaskScheduler
對照手頭的可用資源和Task隊列,進行executor分配(考慮優先級、本地化等策略),符合條件的executor會被分配給TaskSetManager
。- 而後,獲得的Task描述交給
SchedulerBackend
,調用launchTask(tasks)
,觸發executor上task的執行。task描述被序列化後發給executor,executor提取task信息,調用task的run()
方法執行計算。
二、cancelTasks(stageId)
,取消一個stage的tasks
- 調用
SchedulerBackend
的killTask(taskId, executorId, ...)
方法。taskId和executorId在TaskScheduler
裏一直維護着。
三、resourceOffer(offers: Seq[Workers])
,這是很是重要的一個方法,調用者是SchedulerBacnend
,用途是底層資源SchedulerBackend
把空餘的workers資源交給TaskScheduler
,讓其根據調度策略爲排隊的任務分配合理的cpu和內存資源,而後把任務描述列表傳回給SchedulerBackend
- 從worker offers裏,蒐集executor和host的對應關係、active executors、機架信息等等
- worker offers資源列表進行隨機洗牌,任務隊列裏的任務列表依據調度策略進行一次排序
- 遍歷每一個taskSet,按照進程本地化、worker本地化、機器本地化、機架本地化的優先級順序,爲每一個taskSet提供可用的cpu核數,看是否知足
- 默認一個task須要一個cpu,設置參數爲
"spark.task.cpus=1"
- 爲taskSet分配資源,校驗是否知足的邏輯,最終在
TaskSetManager
的resourceOffer(execId, host, maxLocality)
方法裏- 知足的話,會生成最終的任務描述,而且調用
DAGScheduler
的taskStarted(task, info)
方法,通知DAGScheduler
,這時候每次會觸發DAGScheduler
作一次submitMissingStage
的嘗試,即stage的tasks都分配到了資源的話,立刻會被提交執行
四、statusUpdate(taskId, taskState, data)
,另外一個很是重要的方法,調用者是SchedulerBacnend
,用途是SchedulerBacnend
會將task執行的狀態彙報給TaskScheduler
作一些決定
- 若
TaskLost
,找到該task對應的executor,從active executor裏移除,避免這個executor被分配到其餘task繼續失敗下去。- task finish包括四種狀態:finished, killed, failed, lost。只有finished是成功執行完成了。其餘三種是失敗。
- task成功執行完,調用
TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data)
,不然調用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)
。TaskResultGetter
內部維護了一個線程池,負責異步fetch task執行結果並反序列化。默認開四個線程作這件事,可配參數"spark.resultGetter.threads"=4
。
TaskResultGetter取task result的邏輯
一、對於success task,若是taskResult裏的數據是直接結果數據,直接把data反序列出來獲得結果;若是不是,會調用
blockManager.getRemoteBytes(blockId)
從遠程獲取。若是遠程取回的數據是空的,那麼會調用TaskScheduler.handleFailedTask
,告訴它這個任務是完成了的可是數據是丟失的。不然,取到數據以後會通知BlockManagerMaster
移除這個block信息,調用TaskScheduler.handleSuccessfulTask
,告訴它這個任務是執行成功的,而且把result data傳回去。二、對於failed task,從data裏解析出fail的理由,調用
TaskScheduler.handleFailedTask
,告訴它這個任務失敗了,理由是什麼。
在TaskScheduler
下層,用於對接不一樣的資源管理系統,SchedulerBackend
是個接口,須要實現的主要方法以下:
def start(): Unit def stop(): Unit def reviveOffers(): Unit // 重要方法:SchedulerBackend把本身手頭上的可用資源交給TaskScheduler,TaskScheduler根據調度策略分配給排隊的任務嗎,返回一批可執行的任務描述,SchedulerBackend負責launchTask,即最終把task塞到了executor模型上,executor裏的線程池會執行task的run() def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException
粗粒度:進程常駐的模式,典型表明是standalone模式,mesos粗粒度模式,yarn
細粒度:mesos細粒度模式
這裏討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend
。
維護executor相關信息(包括executor的地址、通訊端口、host、總核數,剩餘核數),手頭上executor有多少被註冊使用了,有多少剩餘,總共還有多少核是空的等等。
主要職能
一、Driver端主要經過actor監聽和處理下面這些事件:
RegisterExecutor(executorId, hostPort, cores, logUrls)
。這是executor添加的來源,一般worker拉起、重啓會觸發executor的註冊。CoarseGrainedSchedulerBackend
把這些executor維護起來,更新內部的資源信息,好比總核數增長。最後調用一次makeOffer()
,即把手頭資源丟給TaskScheduler
去分配一次,返回任務描述回來,把任務launch起來。這個makeOffer()
的調用會出如今任何與資源變化相關的事件中,下面會看到。StatusUpdate(executorId, taskId, state, data)
。task的狀態回調。首先,調用TaskScheduler.statusUpdate
上報上去。而後,判斷這個task是否執行結束了,結束了的話把executor上的freeCore加回去,調用一次makeOffer()
。ReviveOffers
。這個事件就是別人直接向SchedulerBackend
請求資源,直接調用makeOffer()
。KillTask(taskId, executorId, interruptThread)
。這個killTask的事件,會被髮送給executor的actor,executor會處理KillTask
這個事件。StopExecutors
。通知每個executor,處理StopExecutor
事件。RemoveExecutor(executorId, reason)
。從維護信息中,那這堆executor涉及的資源數減掉,而後調用TaskScheduler.executorLost()
方法,通知上層我這邊有一批資源不能用了,你處理下吧。TaskScheduler
會繼續把executorLost
的事件上報給DAGScheduler
,緣由是DAGScheduler
關心shuffle任務的output location。DAGScheduler
會告訴BlockManager
這個executor不可用了,移走它,而後把全部的stage的shuffleOutput信息都遍歷一遍,移走這個executor,而且把更新後的shuffleOutput信息註冊到MapOutputTracker
上,最後清理下本地的CachedLocations
Map。
二、reviveOffers()
方法的實現。直接調用了makeOffers()
方法,獲得一批可執行的任務描述,調用launchTasks
。
三、launchTasks(tasks: Seq[Seq[TaskDescription]])
方法。
- 遍歷每一個task描述,序列化成二進制,而後發送給每一個對應的executor這個任務信息
- 若是這個二進制信息太大,超過了9.2M(默認的akkaFrameSize 10M 減去 默認 爲akka留空的200K),會出錯,abort整個taskSet,並打印提醒增大akka frame size
- 若是二進制數據大小可接受,發送給executor的actor,處理
LaunchTask(serializedTask)
事件。
Executor是spark裏的進程模型,能夠套用到不一樣的資源管理系統上,與SchedulerBackend
配合使用。
內部有個線程池,有個running tasks map,有個actor,接收上面提到的由SchedulerBackend
發來的事件。
事件處理
launchTask
。根據task描述,生成一個TaskRunner
線程,丟盡running tasks map裏,用線程池執行這個TaskRunner
killTask
。從running tasks map裏拿出線程對象,調它的kill方法。Spark注重創建良好的生態系統,它不只支持多種外部文件存儲系統,提供了多種多樣的集羣運行模式。部署在單臺機器上時,既能夠用本地(Local)模式運行,也可使用僞分佈式模式來運行;當以分佈式集羣部署的時候,能夠根據本身集羣的實際狀況選擇Standalone模式(Spark自帶的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各類運行模式雖然在啓動方式、運行位置、調度策略上各有不一樣,但它們的目的基本都是一致的,就是在合適的位置安全可靠的根據用戶的配置和Job的須要運行和管理Task。
Standalone-client模式是Spark實現的資源調度框架,其主要的節點有Client節點、Master節點和Worker節點。其中Driver既能夠運行在Master節點上中,也能夠運行在本地Client端。當用spark-shell交互式工具提交Spark的Job時,Driver在Master節點上運行;當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平臺上使用」new SparkConf().setMaster(「spark://master:7077」)」方式運行Spark任務時,Driver是運行在本地Client端上的。
一、咱們提交一個任務,任務就叫Application
二、初始化程序的入口SparkContext,
2.1 初始化DAG Scheduler
2.2 初始化Task Scheduler
三、Task Scheduler向master去進行註冊並申請資源(CPU Core和Memory)
四、Master根據SparkContext的資源申請要求和Worker心跳週期內報告的信息決定在哪一個Worker上分配資源,而後在該Worker上獲取資源,而後啓動StandaloneExecutorBackend;順便初
始化好了一個線程池
五、StandaloneExecutorBackend向Driver(SparkContext)註冊,這樣Driver就知道哪些Executor爲他進行服務了。
到這個時候其實咱們的初始化過程基本完成了,咱們開始執行transformation的代碼,可是代碼並不會真正的運行,直到咱們遇到一個action操做。生產一個job任務,進行stage的劃分
六、SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;而且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage(當碰到Action操做 時,就會催生Job;每一個Job中含有1個或多個Stage,Stage通常在獲取外部數據和shuffle以前產生)。
七、將Stage(或者稱爲TaskSet)提交給Task Scheduler。Task Scheduler負責將Task分配到相應的Worker,最後提交給StandaloneExecutorBackend執行;
八、對task進行序列化,並根據task的分配算法,分配task
九、對接收過來的task進行反序列化,把task封裝成一個線程
十、開始執行Task,並向SparkContext報告,直至Task完成。
十一、資源註銷
./spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
client模式適用於測試調試程序。Driver進程是在客戶端啓動的,這裏的客戶端就是指提交應用程序的當前節點。在Driver端能夠看到task執行的狀況。生產環境下不能使用client模式,是由於:假設要提交100個application到集羣運行,Driver每次都會在client端啓動,那麼就會致使客戶端100次網卡流量暴增的問題
- cluster模式提交應用程序後,會向Master請求啓動Driver.
- Master接受請求,隨機在集羣一臺節點啓動Driver進程。
- Driver啓動後爲當前的應用程序申請資源。
- Driver端發送task到worker節點上執行。
- worker將執行狀況和執行結果返回給Driver端。
Driver進程是在集羣某一臺Worker上啓動的,在客戶端是沒法查看task的執行狀況的。假設要提交100個application到集羣運行,每次Driver會隨機在集羣中某一臺Worker上啓動,那麼這100次網卡流量暴增的問題就散佈在集羣上。
./spark-submit --master spark://node01:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
1. Driver負責應用程序資源的申請
2. 任務的分發。
3. 結果的回收。
4. 監控task執行狀況。
YARN是一種統一資源管理機制,在其上面能夠運行多套計算框架。目前的大數據技術世界,大多數公司除了使用Spark來進行數據計算,因爲歷史緣由或者單方面業務處理的性能考慮而使用着其餘的計算框架,好比MapReduce、Storm等計算框架。Spark基於此種狀況開發了Spark on YARN的運行模式,因爲藉助了YARN良好的彈性資源管理機制,不只部署Application更加方便,並且用戶在YARN集羣中運行的服務和Application的資源也徹底隔離,更具實踐應用價值的是YARN能夠經過隊列的方式,管理同時運行在集羣中的多個服務。
Spark on YARN模式根據Driver在集羣中的位置分爲兩種模式:一種是YARN-Client模式,另外一種是YARN-Cluster(或稱爲YARN-Standalone模式)。
任何框架與YARN的結合,都必須遵循YARN的開發模式。在分析Spark on YARN的實現細節以前,有必要先分析一下YARN框架的一些基本原理。
參考:http://www.cnblogs.com/qingyunzong/p/8615096.html
Yarn-Client模式中,Driver在客戶端本地運行,這種模式可使得Spark Application和客戶端進行交互,由於Driver在客戶端,因此能夠經過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN經過http:// hadoop1:8088訪問。
YARN-client的工做流程分爲如下幾個步驟:
1.Spark Yarn Client向YARN的ResourceManager申請啓動Application Master。同時在SparkContent初始化中將建立DAGScheduler和TASKScheduler等,因爲咱們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;
2.ResourceManager收到請求後,在集羣中選擇一個NodeManager,爲該應用程序分配第一個Container,要求它在這個Container中啓動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯繫進行資源的分派;
3.Client中的SparkContext初始化完畢後,與ApplicationMaster創建通信,向ResourceManager註冊,根據任務信息向ResourceManager申請資源(Container);
4.一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在得到的Container中啓動啓動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啓動後會向Client中的SparkContext註冊並申請Task;
5.Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver彙報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;
6.應用程序運行完成後,Client的SparkContext向ResourceManager申請註銷並關閉本身。
測試代碼:
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
Yarn-client模式一樣是適用於測試,由於Driver運行在本地,Driver會與yarn集羣中的Executor進行大量的通訊,會形成客戶機網卡流量的大量增長.
在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序後,YARN將分兩個階段運行該應用程序:第一個階段是把Spark的Driver做爲一個ApplicationMaster在YARN集羣中先啓動;第二個階段是由ApplicationMaster建立應用程序,而後爲它向ResourceManager申請資源,並啓動Executor來運行Task,同時監控它的整個運行過程,直到運行完成。
YARN-cluster的工做流程分爲如下幾個步驟:
1. Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啓動ApplicationMaster的命令、須要在Executor中運行的程序等;
2. ResourceManager收到請求後,在集羣中選擇一個NodeManager,爲該應用程序分配第一個Container,要求它在這個Container中啓動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;
3. ApplicationMaster向ResourceManager註冊,這樣用戶能夠直接經過ResourceManage查看應用程序的運行狀態,而後它將採用輪詢的方式經過RPC協議爲各個任務申請資源,並監控它們的運行狀態直到運行結束;
4. 一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在得到的Container中啓動啓動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啓動後會向ApplicationMaster中的SparkContext註冊並申請Task。這一點和Standalone模式同樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增長了對Executor的等待邏輯等;
5. ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster彙報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;
6. 應用程序運行完成後,ApplicationMaster向ResourceManager申請註銷並關閉本身。
測試代碼:
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
Yarn-Cluster主要用於生產環境中,由於Driver運行在Yarn集羣中某一臺nodeManager中,每次提交任務的Driver所在的機器都是隨機的,不會產生某一臺機器網卡流量激增的現象,缺點是任務提交後不能看到日誌。只能經過yarn查看日誌。
理解YARN-Client和YARN-Cluster深層次的區別以前先清楚一個概念:Application Master。在YARN中,每一個Application實例都有一個ApplicationMaster進程,它是Application啓動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源以後告訴NodeManager爲其啓動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別。
一、YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督做業的運行情況。當用戶提交了做業以後,就能夠關掉Client,做業會繼續在YARN上運行,於是YARN-Cluster模式不適合運行交互類型的做業;
二、YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通訊來調度他們工做,也就是說Client不能離開。