Spark TaskSchedulerImpl 任務調度方式(FIFO)

Spark TaskSchedulerImpl 任務調度方式(FIFO)

更多資源

視頻分享

<iframe src="//player.bilibili.com/player.html?aid=37442139&cid=66005253&page=19" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html

圖解

FIFO

TaskSchedulerImpl提交任務集

  • 在DAGScheduler.scal中件中的submitMissingTasks()方法中調用 taskScheduler.submitTasks
  • 把任務集經過任務調度器進行提交
taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  • 任務調度器實現
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    backend.reviveOffers()
  }
  • 把任務集放到TaskSetManager(任務集管理器)中
  • TaskSetManager(任務集管理器)繼承 Schedulable,(可調度元素,就是把到調度池隊列中的一個元素,供調度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
  • 把任務集管理器增長到指定調度類型(FIFO,PAIR)的調度池中,也就是調度池中的調度隊列中schedulableQueue
  • 此時,至關於須要調度的任務已有了,存放在調度池中,下面是用具體的調度算法,按指定的順序調度池中的任務
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • 任務調度器的submitTasks()方法中調用 backend.reviveOffers()方法,backend爲SparkDeploySchedulerBackend,繼承CoarseGrainedSchedulerBackend,因此調用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
backend.reviveOffers()
  • 至關因而給Driver發送消息ReviveOffers
override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
  }
  • driverEndpoint 中receive()方法處理消息,調用makeOffers()方法
case ReviveOffers =>
        makeOffers()
  • scheduler.resourceOffers(workOffers)會計算出須要啓動的任務序列
  • resourceOffers()方法中調用方法獲得調度任務的隊列(按指定順序的) rootPool.getSortedTaskSetQueue()
  • launchTasks()方法把啓動任務消息發送給executor
// Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toIndexedSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }
  • 按指定的調度算法,對調度池中的調度任務進行排序
  • 返回排序後調度隊列
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

FIFO調度算法的實現

  • 默認的調度算法FIFO
  • 按做業id進行比較,id小的放在前,也就是先進來的做業先處理
  • 若是做業id相同,就按stageId比較,StageId小的放在前,也就是從第一個Stage依次開始排列
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

定時任務處理調度池中的任務

  • DriverEndpoint 的 onStart()方法中會每秒調用一次處理調度池中調度任務的方法
  • 經過發送Driver消息ReviveOffers 來觸發
override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }
相關文章
相關標籤/搜索