本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
Spark數據本地化即移動計算而不是移動數據,爲了讓task能儘量的以最優本地化級別(Locality Levels)來啓動,Spark的延遲調度應運而生,資源不夠可在該Locality Levels對應的限制時間內重試,超過限制時間後還沒法啓動則下降Locality Levels再嘗試啓動。緩存
注意 CoarseGrainedSchedulerBackend.makeOffers在任意Executor上變更時,開始調用,makeOffers屬於公共方法。架構
StatusUpdate
RegisterExecutor
複製代碼
首先選定一個Executor,選中在指定executor上的任務,以最大優先級分配。框架
CoarseGrainedSchedulerBackend.makeOffers(公共方法,任務分配觸發點,過濾活的Executor())
||
||
\||/
val tasks = TaskSchedulerImpl.resourceOffer(workOffers) (分配開始)
||
||
\||/
(遍歷全部TaskSet內部的Task的優先級,以最大本地性開始分配任務)
for (currentMaxLocality <- taskSet.myLocalityLevels)
||
||
\||/
(遍歷全部可用的Executor,以指定Executor開始分配)
for (i <- 0 until shuffledOffers.size)
||
||
\||/
(選定一個Executor,經過TaskSetManager進行專項任務分配)
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
||
||
\||/
(TaskSchedulerImpl引用結束後,返回Tasks,後執行)
CoarseGrainedSchedulerBackend.launchTasks(taskS)
複製代碼
經過在CoarseGrainedSchedulerBackend 中的makeOffers方法,經過scheduler的引用,執行TaskSchedulerImpl.resourceOffers 方法,返回taskDescs(包含了全部Task的位置信息和task的算子等),後執行launchTasks,向 executor 發送消息executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))oop
private def makeOffers() {
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
複製代碼
} if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }post
TaskSetManager中resourceOffer內部的是如何分配任務的呢?學習
-> allowedLocality = getAllowedLocalityLevel(curTime) (延遲調度)
-> dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality,
speculative) (返回TaskDescription序列,方便後續發送到Executor)
複製代碼
本篇內容還須要完善,並作進一步剖析。this
秦凱新 於深圳spa