/** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. */ private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private[scheduler] sealed trait JobSchedulerEvent private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter //接收任務的各類事件(如任務生成,清除元信息、DoCheckpoint) eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { //根據時間點來生成任務 case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
這裏有四個事件 ,咱們根據跟蹤generateJobs方法:源碼分析
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { //這裏是分配blocks給當前時間點(receiverTracker記錄了Blocks的元信息) jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //DstremGraph生成JOB,DstremGraph記錄了Dstream的DAG graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { //這裏的outputStreams 是咱們每次調用foreachRDD會向DStreamGraph註冊輸出的outputStream // private val outputStreams = new ArrayBuffer[DStream[_]]() outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
繼續追蹤outputStream.generateJob(time)(注意這個outputStream就是ForEachDStream 的一個實例)這個方法:this
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { //這個方法根據Dstream生成了RDD getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } //這裏咱們看到提交任務是基於RDD的,真正向DAG提交任務是被封裝到一個函數中,所以不會立刻運行 context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD //注意Dstream是抽象類,因此每一個Dstream的實現類都本身的generatedRDDs這個對象, 即咱們在代碼裏邊所作的Dstream的轉換最終做用於最開始的那個RDD,每一個Dstream都持有本身的RDD實例,最終計算的時候只須要最後 一個RDD便可 generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } //每一個Dstream的實現類都有本身的RDD! generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
從上面的代碼中咱們看到最終落在了compute(time) 這個關鍵的方法上面,因爲這個方法是個抽象類,咱們須要從子類中找實現,以WordCont 程序爲例,咱們最後一個Stream是ForEachDStream:線程
//ForEachDStream的46行 override def generateJob(time: Time): Option[Job] = { //這裏的Parent就是ShuffledDStream parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
每一個Dstream都是先計算ParentDstream也就是不斷生成RDD鏈條的過程,最終咱們到ReceiverInputDStream 這個類
/** * Generates RDDs with blocks received by the receiver of this stream. */ override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
這裏RecurringTimer 接收了一個 callback: (Long) => Unit類型的函數
這裏RecurringTimer 中有一個線程不斷地往隊列中post任務消息:
private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } }
重複的觸發事件而且回調咱們傳入的函數:longTime => eventLoop.post(GenerateJobs(new Time(longTime))),及不斷地往隊列中post消息(RecurringTimer 的103行):
/** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } }
咱們看看triggerActionForNextInterval 如何生成任務消息:
private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) }
/** Generate jobs and perform checkpoint for the given `time`. */ SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //這裏就是咱們前面分析的任務怎麼生成的 graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => //任務生成後怎麼處理? val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) //咱們看到foreach傳入了一個處理JOB的函數:job => jobExecutor.execute(new JobHandler(job)) //注意這裏使用了一個線程池來執行任務 jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) var _eventLoop = eventLoop if (_eventLoop != null) { //這裏把任務開始事件通知JobScheduler 任務開始了 _eventLoop.post(JobStarted(job, clock.getTimeMillis())) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { //最終調用了job.run方法來處理任務,思考jbo.run作了哪些事? job.run() } _eventLoop = eventLoop if (_eventLoop != null) { //這裏把任務開始事件通知JobScheduler 任務完成了 _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } } }
def run() { _result = Try(func()) }
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } }
及咱們生成的JOB中的那個func() 就是:
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
/** * Run a job on all partitions in an RDD and return the results in an array. */ def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }