Spark任務調度流程及調度策略分析

Spark任務調度html

TaskScheduler調度入口:node

(1)       CoarseGrainedSchedulerBackend 在啓動時會建立DriverEndPoint. 而DriverEndPoint中存在必定時任務,每隔必定時間(spark.scheduler.revive.interval, 默認爲1s)進行一次調度(給自身發送ReviveOffers消息, 進行調用makeOffers進行調度)。代碼以下所示算法

 override def onStart() {
      // Periodically revive offers to allow delay scheduling to work
      val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")

      reviveThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(ReviveOffers))
        }
      }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
    }

 

(2)當Executor執行完成已分配任務時,會向Driver發送StatusUpdate消息,當Driver接收到消後會調用 makeOffers(executorId)方法,進行任務調度, CoarseGrainedExecutorBackend 狀態變化時向Driver (DriverEndPoint)向送StatusUpdate消息dom

  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

Dirver接收到StatusUpdate消息時將會觸發設調度(makeOffers),爲完成任務的Executor分配任務。ide

override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          executorDataMap.get(executorId) match {
            case Some(executorInfo) =>
              executorInfo.freeCores += scheduler.CPUS_PER_TASK
              makeOffers(executorId)
            case None =>
              // Ignoring the update since we don't know about the executor.
              logWarning(s"Ignored task status update ($taskId state $state) " +
                s"from unknown executor with ID $executorId")
          }
        }

      case ReviveOffers =>
        makeOffers()

      case KillTask(taskId, executorId, interruptThread) =>
        executorDataMap.get(executorId) match {
          case Some(executorInfo) =>
            executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
          case None =>
            // Ignoring the task kill since the executor is not registered.
            logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
        }

    }


其中makeOffers方法,會調用TaskSchedulerImpl中的resourceOffers方法,依其的調度策略爲Executor分配適合的任務。具體代碼以下:ui

a、爲全部資源分配任務spa

 // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))
    }

b、爲單個executor分配任務code

 // Make fake resource offers on just one executor
    private def makeOffers(executorId: String) {
      // Filter out executors under killing
      if (!executorsPendingToRemove.contains(executorId)) {
        val executorData = executorDataMap(executorId)
        val workOffers = Seq(
          new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
        launchTasks(scheduler.resourceOffers(workOffers))
      }
    }

分配完任務後,向Executor發送LaunchTask指令,啓動任務,執行用戶邏輯代碼orm

 // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }
View Code

 

Spark任務調度策略

Ò  FIFOxml

FIFO(先進先出)方式調度Job,以下圖所示,每一個Job被切分紅多個Stage.第一個Job優先獲取全部可用資源,接下來第二個Job再獲取剩餘可用資源。(每一個Stage對應一個TaskSetManager)

 

Ò  FAIR

FAIR共享模式調度下,Spark以在多Job之間輪詢方式爲任務分配資源,全部的任務擁有大體至關的優先級來共享集羣的資源。FAIR調度模型以下圖:

 

 

下面從源碼的角度對調度策略進行說明:

當觸發調度時,會調用TaskSchedulerImpl的resourceOffers方法,方法中會依照調度策略選出要執行的TaskSet, 而後取出適合(考慮本地性)的task交由Executor執行, 其代碼以下:

  /**
   * Called by cluster manager to offer resources on slaves. We respond by asking our active task
   * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
   * that tasks are balanced across the cluster.
   */
  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      activeExecutorIds += o.executorId
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
View Code


通過分析可知,經過rootPool.getSortedTaskSetQueue對隊列中的TaskSet進行排序,getSortedTaskSetQueue的具體實現以下:

  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue =
      schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
  }

由上述代碼可知,其經過算法作爲比較器對taskSet進行排序, 其中調度算法有FIFO和FAIR兩種,下面分別進行介紹。

FIFO

         優先級(Priority): 在DAGscheduler建立TaskSet時使用JobId作爲優先級的值。

 FIFO調度算法實現以下所示

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}
View Code

 由源碼可知,FIFO依據JobId進行挑選較小值。由於越早提交的做業,JobId越小。

對同一個做業(Job)來講越先生成的Stage,其StageId越小,有依賴關係的多個Stage之間,DAGScheduler會控制Stage是否會被提交到調度隊列中(若其依賴的Stage未執行完前,此Stage不會被提交),其調度順序可經過此來保證。但若某Job中有兩個無入度的Stage的話,則先調度StageId小的Stage.

Fair

    Fair調度隊列相比FIFO較複雜,其可存在多個調度隊列,且隊列呈樹型結構(現階段Spark的Fair調度只支持兩層樹結構),每用戶可使用sc.setLocalProperty(「spark.scheduler.pool」, 「poolName」)來指定要加入的隊列,默認狀況下會加入到buildDefaultPool。每一個隊列中還可指定本身內部的調度策略,且Fair還存在一些特殊的屬性:

schedulingMode: 設置調度池的調度模式FIFO或FAIR, 默認爲FIFO.

minShare:最少資源保證量,當一個隊列最少資源未知足時,它將優先於其它同級隊列獲取資源。

weight: 在一個隊列內部分配資源時,默認狀況下,採用公平輪詢的方法將資源分配給各個應用程序,而該參數則將打破這種平衡。例如,若是用戶配置一個指定調度池權重爲2, 那麼這個調度池將會得到相對於權重爲1的調度池2倍的資源。

以上參數,可經過conf/fairscheduler.xml文件配置調度池的屬性。

Fair調度算法實現以下所示:

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }
}
View Code

由原碼可知,未知足minShare規定份額的資源的隊列或任務集先執行;若是全部均不知足minShare的話,則選擇缺失比率小的先調度;若是均不知足,則按執行權重比進行選擇,先調度執行權重比小的。若是執行權重也相同的話則會選擇StageId小的進行調度(name=「TaskSet_」+ taskSet.stageId.toString)。

以此爲標準將全部TaskSet進行排序, 而後選出優先級最高的進行調度。

Spark 任務調度之任務本地性

  當選出TaskSet後,將按本地性從中挑選適合Executor的任務,在Executor上執行。

   (詳細見http://www.cnblogs.com/barrenlake/p/4550800.html一小節相關內容)

 

文章地址: http://www.cnblogs.com/barrenlake/p/4891589.html

相關文章
相關標籤/搜索