<iframe src="//player.bilibili.com/player.html?aid=37442139&cid=66006637&page=20" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html
<iframe src="//player.bilibili.com/player.html?aid=37442139&cid=66008946&page=21" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>node
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } backend.reviveOffers() }
val manager = createTaskSetManager(taskSet, maxTaskFailures)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
backend.reviveOffers()
override def reviveOffers() { driverEndpoint.send(ReviveOffers) }
case ReviveOffers => makeOffers()
// Make fake resource offers on all executors private def makeOffers() { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toIndexedSeq launchTasks(scheduler.resourceOffers(workOffers)) }
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false for (o <- offers) { if (!hostToExecutors.contains(o.host)) { hostToExecutors(o.host) = new HashSet[String]() } if (!executorIdToTaskCount.contains(o.executorId)) { hostToExecutors(o.host) += o.executorId executorAdded(o.executorId, o.host) executorIdToHost(o.executorId) = o.host executorIdToTaskCount(o.executorId) = 0 newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do // this here to avoid a separate thread and added synchronization overhead, and also because // updating the blacklist is only relevant when task offers are being made. blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => offers.filter { offer => !blacklistTracker.isNodeBlacklisted(offer.host) && !blacklistTracker.isExecutorBlacklisted(offer.executorId) } }.getOrElse(offers) // Randomly shuffle offers to avoid always placing tasks on the same set of workers. val shuffledOffers = Random.shuffle(filteredOffers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
for (taskSet <- sortedTaskSets) { var launchedAnyTask = false var launchedTaskAtCurrentMaxLocality = false for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } if (!launchedAnyTask) { taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } }
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. return launchedTask } } } return launchedTask }
/** * Respond to an offer of a single executor from the scheduler by finding a task * * NOTE: this function is either called with a maxLocality which * would be adjusted by delay scheduling algorithm or it will be with a special * NO_PREF locality which will be not modified * * @param execId the executor Id of the offered resource * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */ @throws[TaskNotSerializableException] def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isExecutorBlacklistedForTaskSet(execId) } if (!isZombie && !offerBlacklisted) { val curTime = clock.getTimeMillis() var allowedLocality = maxLocality if (maxLocality != TaskLocality.NO_PREF) { allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { // We're not allowed to search for farther-away tasks allowedLocality = maxLocality } } dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() // Do various bookkeeping copiesRunning(index) += 1 val attemptNum = taskAttempts(index).size val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling // NO_PREF will not affect the variables related to delay scheduling if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } // Serialize and return the task val startTime = clock.getTimeMillis() val serializedTask: ByteBuffer = try { Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } addRunningTask(taskId) // We used to log the time it takes to serialize the task, but task size is already // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask) } } else { None } }
for (i <- (0 until numTasks).reverse) { addPendingTask(i) }
/** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. * * @return An option containing (task index within the task set, locality, is speculative?) */ private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value) : Option[(Int, TaskLocality.Value, Boolean)] = { for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) { for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) { // Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic for (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL, false)) } } if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) { for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) { return Some((index, TaskLocality.ANY, false)) } } // find a speculative task if all others tasks have been scheduled dequeueSpeculativeTask(execId, host, maxLocality).map { case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)} }
/** * Dequeue a pending task from the given list and return its index. * Return None if the list is empty. * This method also cleans up any tasks in the list that have already * been launched, since we want that to happen lazily. */ private def dequeueTaskFromList( execId: String, host: String, list: ArrayBuffer[Int]): Option[Int] = { var indexOffset = list.size while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) if (copiesRunning(index) == 0 && !successful(index)) { return Some(index) } } } None }
// Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
override def removeSchedulable(schedulable: Schedulable) { schedulableQueue.remove(schedulable) schedulableNameToSchedulable.remove(schedulable.name) }
/** * Called to indicate that all task attempts (including speculated tasks) associated with the * given TaskSetManager have completed, so state associated with the TaskSetManager should be * cleaned up. */ def taskSetFinished(manager: TaskSetManager): Unit = synchronized { taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage => taskSetsForStage -= manager.taskSet.stageAttemptId if (taskSetsForStage.isEmpty) { taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") }
private def maybeFinishTaskSet() { if (isZombie && runningTasks == 0) { sched.taskSetFinished(this) if (tasksSuccessful == numTasks) { blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet( taskSet.stageId, taskSet.stageAttemptId, taskSetBlacklistHelperOpt.get.execToFailures)) } } }
/** * Marks the task as successful and notifies the DAGScheduler that a task has ended. */ def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = { val info = taskInfos(tid) val index = info.index info.markSuccessful() removeRunningTask(tid) // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not // "deserialize" the value when holding a lock to avoid blocking other threads. So we call // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here. // Note: "result.value()" only deserializes the value when it's called at the first time, so // here "result.value()" just returns the value and won't block other threads. sched.dagScheduler.taskEnded( tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics) if (!successful(index)) { tasksSuccessful += 1 logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" + s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" + s" ($tasksSuccessful/$numTasks)") // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { isZombie = true } } else { logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } maybeFinishTaskSet() }
override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None => // Ignoring the update since we don't know about the executor. logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } }
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { var failedExecutor: Option[String] = None synchronized { try { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { // We lost this entire executor, so remember that it's gone val execId = taskIdToExecutorId(tid) if (executorIdToTaskCount.contains(execId)) { removeExecutor(execId, SlaveLost(s"Task $tid was lost, so marking the executor as lost as well.")) failedExecutor = Some(execId) } } taskIdToTaskSetManager.get(tid) match { case Some(taskSet) => if (TaskState.isFinished(state)) { taskIdToTaskSetManager.remove(tid) taskIdToExecutorId.remove(tid).foreach { execId => if (executorIdToTaskCount.contains(execId)) { executorIdToTaskCount(execId) -= 1 } } } if (state == TaskState.FINISHED) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) { taskSet.removeRunningTask(tid) taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData) } case None => logError( ("Ignoring update with state %s for TID %s because its task set is gone (this is " + "likely the result of receiving duplicate task finished status updates)") .format(state, tid)) } } catch { case e: Exception => logError("Exception in statusUpdate", e) } } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor.isDefined) { dagScheduler.executorLost(failedExecutor.get) backend.reviveOffers() } }