Spark Streaming源碼解讀之Job動態生成和深度思考

1、Spark Streaming JOB生成深度思考

定時任務,其實也算是流處理的一種,都是時間加上定時器(也有多是某個條件),一切處理都是流處理。
從JobGenerator做爲入口,JobGenerator是動態生成JOB的封裝。主要是基於Dstream的依賴關係根據batchDuration生成JOB,及spark的流處理跟storm不太同樣,storm是流入一條計算一條,而spark的流處理是基於時間段的批處理app

/**
 * This class generates jobs from DStreams as well as drives checkpointing and cleaning
 * up DStream metadata.
 */
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {

JobGenerator只是負責生成任務,並不執行任務,而是由RDD來觸發做業的提交ide

2、Spark Streaming JOB生成源碼分析

  1. JobGenerator負責生成任務
  2. JobScheduler負責任務的調度
  3. RecurringTimer定時觸發任務生成事件

從JobGenerator開始做分析:
注意JobScheduler中也有一個eventLoop消息線程,這個消息線程主要是(JobHandler)通知JobScheduler任務的開始完成等事件:模塊化

private[scheduler] sealed trait JobSchedulerEvent
private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent
private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent
private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent

 JobGenerator中的消息線程:函數

/** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter
    //接收任務的各類事件(如任務生成,清除元信息、DoCheckpoint)
    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
      
      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

咱們關注processEvent方法:oop

/** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      //根據時間點來生成任務
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

這裏有四個事件 ,咱們根據跟蹤generateJobs方法:源碼分析

/** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      //這裏是分配blocks給當前時間點(receiverTracker記錄了Blocks的元信息)
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      //DstremGraph生成JOB,DstremGraph記錄了Dstream的DAG
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

咱們回到DStreamGraph的generateJobs方法:
這裏使用flatMap是爲了去掉None類型的JOB,扁平化返回值post

def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      //這裏的outputStreams 是咱們每次調用foreachRDD會向DStreamGraph註冊輸出的outputStream
      // private val outputStreams = new ArrayBuffer[DStream[_]]()
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

繼續追蹤outputStream.generateJob(time)(注意這個outputStream就是ForEachDStream 的一個實例)這個方法:this

/**
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    //這個方法根據Dstream生成了RDD
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          //這裏咱們看到提交任務是基於RDD的,真正向DAG提交任務是被封裝到一個函數中,所以不會立刻運行
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

咱們繼續看Dstream類的getOrCompute方法,追蹤如何生成RDD:spa

/**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    //注意Dstream是抽象類,因此每一個Dstream的實現類都本身的generatedRDDs這個對象,
    即咱們在代碼裏邊所作的Dstream的轉換最終做用於最開始的那個RDD,每一個Dstream都持有本身的RDD實例,最終計算的時候只須要最後
    一個RDD便可
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          //每一個Dstream的實現類都有本身的RDD!
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

從上面的代碼中咱們看到最終落在了compute(time) 這個關鍵的方法上面,因爲這個方法是個抽象類,咱們須要從子類中找實現,以WordCont 程序爲例,咱們最後一個Stream是ForEachDStream:線程

//ForEachDStream的46行
override def generateJob(time: Time): Option[Job] = {
    //這裏的Parent就是ShuffledDStream 
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

每一個Dstream都是先計算ParentDstream也就是不斷生成RDD鏈條的過程,最終咱們到ReceiverInputDStream 這個類

/**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

上面的代碼就是從開始生成的第一個RDD的過程,咱們每次調用函數的過程都是將函數做用於RDD的過程,也就是生成了RDD,每一個Dstream都持有本身的RDD最終咱們對RDD調用行動算子的時候是對最後一個Dstream中的RDD進行操做!

咱們回到事件處理哪裏,思考任務生成事件是從哪裏來的?任務是以不間斷的生成的,那麼必需要一個定時器不斷地往eventLoop中post消息(JobGenerator的58行):

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

這裏RecurringTimer 接收了一個    callback: (Long) => Unit類型的函數

這裏RecurringTimer 中有一個線程不斷地往隊列中post任務消息:

private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {

  private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
  }

 重複的觸發事件而且回調咱們傳入的函數:longTime => eventLoop.post(GenerateJobs(new Time(longTime))),及不斷地往隊列中post消息(RecurringTimer 的103行):

/**
   * Repeatedly call the callback every interval.
   */
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

咱們看看triggerActionForNextInterval 如何生成任務消息:

private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }

總結:JobGenerator接收RecurringTimer中發過來的各類事件,例如生成JOB的事件,而後由JobGenerator來分別處理各類任務事件,這種方式能夠重複利用代碼,不一樣的模塊負責不一樣的功能,一方面是解耦,另外一方是模塊化

最後咱們關注一點:任務怎麼被提交到集羣的?
咱們回到JobGenerator的generateJobs方法(241行):

/** Generate jobs and perform checkpoint for the given `time`.  */
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      //這裏就是咱們前面分析的任務怎麼生成的
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        //任務生成後怎麼處理?
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

這裏咱們看到:JobGenerator生成好任務後交給了jobScheduler來處理

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      //咱們看到foreach傳入了一個處理JOB的函數:job => jobExecutor.execute(new JobHandler(job))
      //注意這裏使用了一個線程池來執行任務
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

咱們看看JobHandler怎麼處理咱們傳入的任務:

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
         //這裏把任務開始事件通知JobScheduler 任務開始了
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            //最終調用了job.run方法來處理任務,思考jbo.run作了哪些事?
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
          //這裏把任務開始事件通知JobScheduler 任務完成了
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }

咱們追蹤:job.run方法:

def run() {
    _result = Try(func())
  }

這裏執行了一個func方法,那這個方法從哪裏來的,又作了什麼事?
前面咱們分析過,DStream中生成的任務只是封裝了一個函數並無執行,再次迴歸Dstream中的:

/**
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

及咱們生成的JOB中的那個func() 就是:
      val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
這個函數,這裏最終做用於RDD向DAG提交任務:

/**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.length)
  }

這裏就不繼續追蹤下去了,到此任務怎麼生成以及任務怎麼被提交的已經所有分析完成。
最後附上一張JOB動態生成簡圖:

相關文章
相關標籤/搜索