概述
Spark Application在遇到action算子時,SparkContext會生成Job,並將構成DAG圖將給DAG Scheduler解析成Stage。
Stage有兩種:
ShuffleMapStage
這種Stage是以Shuffle爲輸出邊界
其輸入邊界能夠是從外部獲取數據,也能夠是另外一個ShuffleMapStage的輸出
其輸出能夠是另外一個Stage的開始
ShuffleMapStage的最後Task就是ShuffleMapTask
在一個Job裏可能有該類型的Stage,也能夠能沒有該類型Stage。
ResultStage
這種Stage是直接輸出結果
其輸入邊界能夠是從外部獲取數據,也能夠是另外一個ShuffleMapStage的輸出
ResultStage的最後Task就是ResultTask
在一個Job裏一定有該類型Stage。
一個Job含有一個或多個Stage,但至少含有一個ResultStage。
Scheduler模塊總體架構
scheduler 模塊主要分爲兩大部分:
TaskSchedulerListener。TaskSchedulerListener部分的主要功能是監聽用戶提交的job,將job分解爲不一樣的類型的stage以及相應的task,並向TaskScheduler提交task。
TaskScheduler。TaskScheduler 接收用戶提交的task並執行。而TaskScheduler根據部署的不一樣又分爲三個子模塊:
ClusterScheduler
LocalScheduler
MesosScheduler
TaskSchedulerListener
Spark抽象了 TaskSchedulerListener 並在其上實現了 DAGScheduler 。DAGScheduler 的主要功能是接收用戶提交的job,將job根據類型劃分爲不一樣的stage,並在每個stage內產生一系列的task,向 TaskScheduler 提交task。下面咱們首先來看一下 TaskSchedulerListener 部分的類圖:
php
1.png (49.86 KB, 下載次數: 13)node
2014-7-26 14:40 上傳app
用戶所提交的job在獲得 DAGScheduler 的調度後,會被包裝成 ActiveJob,同時會啓動 JobWaiter 阻塞監聽job的完成情況。
於此同時依據job中 RDD 的dependency和dependency屬性(NarrowDependency , ShufflerDependecy ), DAGScheduler 會根據依賴關係的前後產生出不一樣的stage DAG(result stage, shuffle map stage)。
在每個stage內部,根據stage產生出相應的task,包括 ResultTask 或是ShuffleMapTask ,這些task會根據 RDD 中partition的數量和分佈,產生出一組相應的task,並將其包裝爲 TaskSet 提交到 TaskScheduler 上去。
RDD的依賴關係和Stage的分類
在Spark中,每個 RDD 是對於數據集在某一狀態下的表現形式,而這個狀態有多是從前一狀態轉換而來的,所以換句話說這一個 RDD 有可能與以前的RDD(s) 有依賴關係。根據依賴關係的不一樣,能夠將 RDD 分紅兩種不一樣的類型: Narrow Dependency 和 Wide Dependency 。
Narrow Dependency 指的是 child RDD 只依賴於 parent RDD(s) 固定數量的partition。
Wide Dependency 指的是 child RDD 的每個partition都依賴於parent RDD(s) 全部partition。
它們之間的區別可參看下圖:
框架
1.png (116.5 KB, 下載次數: 9)ide
2014-7-26 14:41 上傳大數據
根據 RDD 依賴關係的不一樣,Spark也將每個job分爲不一樣的stage,而stage之間的依賴關係則造成了DAG。對於 Narrow Dependency ,Spark會盡可能多地將 RDD 轉換放在同一個stage中;而對於 Wide Dependency ,因爲Wide Dependency 一般意味着shuffle操做,所以Spark會將此stage定義ShuffleMapStage ,以便於向 MapOutputTracker 註冊shuffle操做。對於stage的劃分可參看下圖,Spark一般將shuffle操做定義爲stage的邊界。
this
1.png (78.5 KB, 下載次數: 8)url
2014-7-26 14:42 上傳
DAGScheduler
在用戶建立 SparkContext 對象時,Spark會在內部建立 DAGScheduler 對象,並根據用戶的部署狀況,綁定不一樣的 TaskSechduler ,並啓動DAGcheduler
private var taskScheduler: TaskScheduler = {
//...
}
taskScheduler.start()
private var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
複製代碼
而 DAGScheduler 的啓動會在內部建立daemon線程,daemon線程調用run() 從block queue中取出event進行處理。
private def run() {
SparkEnv.set(env)
while (true) {
val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
if (event != null) {
logDebug("Got event of type " + event.getClass.getName)
}
if (event != null) {
if (processEvent(event)) {
return
}
}
val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
resubmitFailedStages()
} else {
submitWaitingStages()
}
}
}
複製代碼
而 run() 會調用 processEvent 來處理不一樣的event。
DAGScheduler 處理的event包括:
JobSubmitted
CompletionEvent
ExecutorLost
TaskFailed
StopDAGScheduler
根據event的不一樣調用不一樣的方法去處理。
本質上 DAGScheduler 是一個生產者-消費者模型,用戶和 TaskSchduler 產生event將其放入block queue,daemon線程消費event並處理相應事件。
Job的生與死
既然用戶提交的job最終會交由 DAGScheduler 去處理,那麼咱們就來研究一下DAGScheduler 處理job的整個流程。在這裏咱們分析兩種不一樣類型的job的處理流程。
1.沒有shuffle和reduce的job
val textFile = sc.textFile("README.md")
textFile.filter(line => line.contains("Spark")).count()
複製代碼
2.有shuffle和reduce的job
val textFile = sc.textFile("README.md")
textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
複製代碼
首先在對 RDD 的 count() 和 reduceByKey() 操做都會調用SparkContext 的 runJob() 來提交job,而 SparkContext 的 runJob() 最終會調用 DAGScheduler 的 runJob() :
def runJob[T, U: ClassManifest](
finalRdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit)
{
if (partitions.size == 0) {
return
}
val (toSubmit, waiter) = prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler)
eventQueue.put(toSubmit)
waiter.awaitResult() match {
case JobSucceeded => {}
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
throw exception
}
}
複製代碼
runJob() 會調用 prepareJob() 對job進行預處理,封裝成 JobSubmitted事件,放入queue中,並阻塞等待job完成。
當daemon線程的 processEvent() 從queue中取出 JobSubmitted 事件後,會根據job劃分出不一樣的stage,而且提交stage:
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
clearCacheLocs()
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
runLocally(job)
} else {
activeJobs += job
resultStageToJob(finalStage) = job
submitStage(finalStage)
}
複製代碼
首先,對於任何的job都會產生出一個 finalStage 來產生和提交task。其次對於某些簡單的job,它沒有依賴關係,而且只有一個partition,這樣的job會使用local thread處理而並不是提交到 TaskScheduler 上處理。
接下來產生 finalStage 後,須要調用 submitStage() ,它根據stage之間的依賴關係得出stage DAG,並以依賴關係進行處理:
private def submitStage(stage: Stage) {
if (!waiting(stage) && !running(stage) && !failed(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing == Nil) {
submitMissingTasks(stage)
running += stage
} else {
for (parent <- missing) {
submitStage(parent)
}
waiting += stage
}
}
}
複製代碼
對於新提交的job, finalStage 的parent stage還未得到,所以submitStage 會調用 getMissingParentStages() 來得到依賴關係:
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
if (getCacheLocs(rdd).contains(Nil)) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
複製代碼
這裏parent stage是經過 RDD 的依賴關係遞歸遍歷得到。對於Wide Dependecy 也就是 Shuffle Dependecy ,Spark會產生新的 mapStage做爲 finalStage 的parent,而對於 Narrow Dependecy Spark則不會產生新的stage。這裏對stage的劃分是按照上面提到的做爲劃分依據的,所以對於本段開頭提到的兩種job,第一種job只會產生一個 finalStage ,而第二種job會產生finalStage 和 mapStage 。
當stage DAG產生之後,針對每一個stage須要產生task去執行,故在這會調用submitMissingTasks() :
private def submitMissingTasks(stage: Stage) {
val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
myPending.clear()
var tasks = ArrayBuffer[Task[_]]()
if (stage.isShuffleMap) {
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else {
val job = resultStageToJob(stage)
for (id <- 0 until job.numPartitions if (!job.finished(id))) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
if (tasks.size > 0) {
myPending ++= tasks
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
} else {
running -= stage
}
}
複製代碼
首先根據stage所依賴的 RDD 的partition的分佈,會產生出與partition數量相等的task,這些task根據partition的locality進行分佈;其次對於 finalStage 或是mapStage 會產生不一樣的task;最後全部的task會封裝到 TaskSet 內提交到TaskScheduler 去執行。
至此job在 DAGScheduler 內的啓動過程所有完成,交由 TaskScheduler 執行task,當task執行完後會將結果返回給 DAGScheduler , DAGScheduler 調用handleTaskComplete() 處理task返回:
private def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
val stage = idToStage(task.stageId)
def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
case _ => "Unkown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
running -= stage
}
event.reason match {
case Success =>
...
task match {
case rt: ResultTask[_, _] =>
...
case smt: ShuffleMapTask =>
...
}
case Resubmitted =>
...
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
...
case other =>
abortStage(idToStage(task.stageId), task + " failed: " + other)
}
}
複製代碼
每一個執行完成的task都會將結果返回給 DAGScheduler , DAGScheduler 根據返回結果來進行進一步的動做。
RDD的計算
RDD 的計算是在task中完成的。咱們以前提到task分爲 ResultTask 和ShuffleMapTask ,咱們分別來看一下這兩種task具體的執行過程。
ResultTask
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
try {
func(context, rdd.iterator(split, context))
} finally {
context.executeOnCompleteCallbacks()
}
}
複製代碼
ShuffleMapTask
override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
val taskContext = new TaskContext(stageId, partition, attemptId)
try {
val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
for (elem <- rdd.iterator(split, taskContext)) {
val pair = elem.asInstanceOf[(Any, Any)]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets(bucketId) += pair
}
val compressedSizes = new Array[Byte](numOutputSplits)
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
val iter: Iterator[(Any, Any)] = buckets(i).iterator
val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
compressedSizes(i) = MapOutputTracker.compressSize(size)
}
return new MapStatus(blockManager.blockManagerId, compressedSizes)
} finally {
taskContext.executeOnCompleteCallbacks()
}
}
複製代碼
ResultTask 和 ShuffleMapTask 都會調用 RDD 的 iterator() 來計算和轉換 RDD ,不一樣的是: ResultTask 轉換完 RDD 後調用 func() 計算結果;而 ShufflerMapTask 則將其放入 blockManager 中用來shuffle。
RDD 的計算調用 iterator() , iterator() 在內部調用 compute() 從RDD 依賴關係的根開始計算:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
if (isCheckpointed) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
複製代碼
至此大體分析了 TaskSchedulerListener ,包括 DAGScheduler 內部的結構,job生命週期內的活動, RDD 是什麼時候何地計算的。接下來咱們分析一下task在TaskScheduler 內幹了什麼。
TaskScheduler
前面也提到了Spark實現了三種不一樣的 TaskScheduler ,包括LocalSheduler 、 ClusterScheduler 和 MesosScheduler 。LocalSheduler 是一個在本地執行的線程池, DAGScheduler 提交的全部task會在線程池中被執行,並將結果返回給 DAGScheduler 。 MesosScheduler 依賴於Mesos進行調度,筆者對Mesos瞭解甚少,所以不作分析。故此章節主要分析ClusterScheduler 模塊。
ClusterScheduler 模塊與deploy模塊和executor模塊耦合較爲緊密,所以在分析 ClUsterScheduler 時也會順帶介紹deploy和executor模塊。
首先咱們來看一下 ClusterScheduler 的類圖:
1.png (41.13 KB, 下載次數: 5)
2014-7-26 14:47 上傳
ClusterScheduler 的啓動會伴隨 SparkDeploySchedulerBackend 的啓動,而backend會將本身分爲兩個角色:首先是driver,driver是一個local運行的actor,負責與remote的executor進行通行,提交任務,控制executor;其次是StandaloneExecutorBackend ,Spark會在每個slave node上啓動一個StandaloneExecutorBackend 進程,負責執行任務,返回執行結果。
ClusterScheduler的啓動
在 SparkContext 實例化的過程當中, ClusterScheduler 被隨之實例化,同時賦予其 SparkDeploySchedulerBackend :
master match {
...
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
scheduler.initialize(backend)
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
...
case _ =>
...
}
}
taskScheduler.start()
複製代碼
ClusterScheduler 的啓動會啓動 SparkDeploySchedulerBackend ,同時啓動daemon進程來檢查speculative task:
override def start() {
backend.start()
if (System.getProperty("spark.speculation", "false") == "true") {
new Thread("ClusterScheduler speculation check") {
setDaemon(true)
override def run() {
while (true) {
try {
Thread.sleep(SPECULATION_INTERVAL)
} catch {
case e: InterruptedException => {}
}
checkSpeculatableTasks()
}
}
}.start()
}
}
複製代碼
SparkDeploySchedulerBacked 的啓動首先會調用父類的 start() ,接着它會啓動client,並由client鏈接到master向每個node的worker發送請求啓動StandaloneExecutorBackend 。這裏的client、master、worker涉及到了deploy模塊,暫時不作具體介紹。而 StandaloneExecutorBackend 則涉及到了executor模塊,它主要的功能是在每個node建立task能夠運行的環境,並讓task在其環境中運行。
override def start() {
super.start()
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "", "", "")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(
throw new IllegalArgumentException("must supply spark home for spark standalone"))
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome)
client = new Client(sc.env.actorSystem, master, appDesc, this)
client.start()
}
複製代碼
在 StandaloneSchedulerBackend 中會建立 DriverActor ,它就是local的driver,以actor的方式與remote的executor進行通訊。
override def start() {
val properties = new ArrayBuffer[(String, String)]
val iterator = System.getProperties.entrySet.iterator
while (iterator.hasNext) {
val entry = iterator.next
val (key, value) = (entry.getKey.toString, entry.getValue.toString)
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
driverActor = actorSystem.actorOf(
Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
}
複製代碼
在client實例化以前,會將 StandaloneExecutorBackend 的啓動環境做爲參數傳遞給client,而client啓動時會將此提交給master,由master分發給全部node上的worker,worker會配置環境並建立進程啓動 StandaloneExecutorBackend 。
至此 ClusterScheduler 的啓動,local driver的建立,remote executor環境的啓動全部過程都已結束, ClusterScheduler 等待 DAGScheduler 提交任務。
ClusterScheduler提交任務
DAGScheduler 會調用 ClusterScheduler 提交任務,任務會被包裝成TaskSetManager 並等待調度:
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = new TaskSetManager(this, taskSet)
activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
if (hasReceivedTask == false) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true;
}
backend.reviveOffers()
}
複製代碼
在任務提交的同時會啓動定時器,若是任務還未被執行,定時器持續發出警告直到任務被執行。同時會調用 StandaloneSchedulerBackend 的reviveOffers() ,而它則會經過actor向driver發送 ReviveOffers ,driver收到 ReviveOffers 後調用 makeOffers() :
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
executorActor(task.executorId) ! LaunchTask(task)
}
}
複製代碼
makeOffers() 會向 ClusterScheduler 申請資源,並向executor提交LauchTask 請求。
接下來 LaunchTask 會進入executor模塊, StandaloneExecutorBackend在收到 LaunchTask 請求後會調用 Executor 執行task:
override def receive = {
case RegisteredExecutor(sparkProperties) =>
...
case RegisterExecutorFailed(message) =>
...
case LaunchTask(taskDesc) =>
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
...
}
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
}
複製代碼
Executor 內部是一個線程池,每個提交的task都會包裝爲 TaskRunner 交由threadpool執行:
class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable {
override def run() {
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(urlClassLoader)
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID " + taskId)
context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
logInfo("Its generation is " + task.generation)
env.mapOutputTracker.updateGeneration(task.generation)
val value = task.run(taskId.toInt)
val accumUpdates = Accumulators.values
val result = new TaskResult(value, accumUpdates)
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
case ffe: FetchFailedException => {
val reason = ffe.toTaskEndReason
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
}
case t: Throwable => {
val reason = ExceptionFailure(t)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID " + taskId, t)
//System.exit(1)
}
}
}
}
複製代碼
其中 task.run() 則真正執行了task中的任務,如前 RDD的計算 章節所述。返回值被包裝成 TaskResult 返回。
至此task在 ClusterScheduler 內運行的流程有了一個大體的介紹,固然這裏略掉了許多異常處理的分支,但這不影響咱們對主線的瞭解。
END至此對Spark的Scheduler模塊的主線作了一個順藤摸瓜式的介紹,Scheduler模塊做爲Spark最核心的模塊之一,充分體現了Spark與MapReduce的不一樣之處,體現了Spark DAG思想的精巧和設計的優雅。固然Spark的代碼仍然在積極開發之中,當前的源碼分析在過不久後可能會變得沒有意義,但重要的是體會Spark區別於MapReduce的設計理念,以及DAG思想的應用。DAG做爲對MapReduce框架的改進愈來愈受到大數據界的重視,hortonworks 也提出了相似DAG的框架 tez 做爲對MapReduce的改進。