先貼下案例源碼apache
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次數據 val lines = ssc.socketTextStream("localhost", 9999) // 監聽 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 後 reduce words.print() // 打印結果 ssc.start() // 啓動 ssc.awaitTermination() ssc.stop(true) } }
上文已經從源碼分析到將Receiver做爲RDD提交給Spark,高層調度器將處理此JobSubmitted事件,本文將聚焦於Receiver在Spark 集羣中執行的運行時。socket
// DAGScheduler.scala line 1605 private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) ... // 其餘 case class 的處理 }
追蹤。有Spark Core基礎的讀者必定知道。Spark的RDD再定義時是從前日後寫邏輯的。計算時,是按照血統從後往前推依賴的。而且按照依賴的寬窄劃分stage。ide
而本例中的RDD的數據只是一個很單純的Receiver,所以沒有依賴。只有一個RDD,並且是ParallelCollectionRDD類型的。oop
// DAGScheduler.scala line 826 private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { var finalStage: ResultStage = null try { // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite) // 獲取最後的stage } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return } val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) // 提交最後一個stage submitWaitingStages() }
追蹤源碼分析
本例中,只有一個stage,那麼這個stage就是ResultStagepost
// DAGScheduler.scala line 911 /** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) // 沒有依賴的stage,所以沒missing logDebug("missing: " + missing) if (missing.isEmpty) { // 當missing爲空時,就是最後一個stage,提交運行。 logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") // 這個日誌也印證了這點。 submitMissingTasks(stage, jobId.get) } else { // 順便提一下,若stage有父依賴,則遞歸掉submitStage。由於一個JVM只有一個DAGScheduler,所以waitingStages 做爲成員變量也沒什麼問題。 for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }
submitMissingTasksspa
本例中stage是ResultStage。.net
計算partitions個數 ; DAGScheduler line 941scala
計算數據本地性 ; DAGScheduler line 966debug
stage信息序列化 ; DAGScheduler line 996
將stage 二進制信息廣播至Executor ; DAGScheduler line 1006
taskScheduler提交任務 ; DAGScheduler line 1052
// DAGScheduler.scala line 934 /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingPartitions.clear() // First figure out the indexes of partition ids to compute. val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() // 此處是1,有興趣的課跟蹤進去 // ... 略過一些步驟 var taskBinary: Broadcast[Array[Byte]] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). val taskBinaryBytes: Array[Byte] = stage match { case stage: ShuffleMapStage => closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array() case stage: ResultStage => closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) // 此處,將Stage的序列化後的信息,以廣播的形式發到各worker上的Executor上了。 } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString, Some(e)) runningStages -= stage // Abort execution return case NonFatal(e) => abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => // 不適用本例的場景 case stage: ResultStage => val job = stage.activeJob.get partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id, stage.internalAccumulators) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e)) runningStages -= stage return } if (tasks.size > 0) { logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") stage.pendingPartitions ++= tasks.map(_.partitionId) logDebug("New pending partitions: " + stage.pendingPartitions) taskScheduler.submitTasks(new TaskSet( // 由TaskScheduler提交出去 tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) val debugString = stage match { case stage: ShuffleMapStage => // 忽略 case stage : ResultStage => s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) } }
任務提交:taskScheduler.submitTasks。TaskScheduler也是在SparkContext的實例化時建立的,具體實現是TaskSchedulerImpl。見SparkContext.scala line 522。
submitTasks中,最後調用的是backend.reviveOffers。在Standalone集羣模式下,backend是CoarseGrainedSchedulerBackend
//TaskSchedulerImpl.scala line 160 override def submitTasks(taskSet: TaskSet) { // ... 省略的操做 backend.reviveOffers() }
reviveOffers(),
// CoarseGrainedSchedulerBackend.scala line 344 override def reviveOffers() { driverEndpoint.send(ReviveOffers) }
driver處理消息
// CoarseGrainedSchedulerBackend.scala line 122 case ReviveOffers => makeOffers()
makeOffers
// CoarseGrainedSchedulerBackend.scala line 189 private def makeOffers() { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) }
launchTasks
// CoarseGrainedSchedulerBackend.scala line 224 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) // 給executor發送消息,處理LaunchTask類型的事件 } } }
至此,上述全部的操做都是在Driver端執行。
下面就刺激了。要轉到Worker上的Executor了