SparkSteaming運行流程分析以及CheckPoint操做

本文主要經過源碼來了解SparkStreaming程序從任務生成到任務完成整個執行流程以及中間伴隨的checkpoint操做app

注:下面源碼只貼出跟分析內容有關的代碼,其餘省略ide

##1 分析流程 應用程序入口:oop

val sparkConf = new SparkConf().setAppName("SparkStreaming")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong))
ssc.start()
ssc.awaitTermination()

一旦ssc.start()調用以後,程序便真正開始運行post

第一步: SparkStreamingContext.start()進行以下主要工做:ui

  • 調用JobScheduler.start()
  • 發送StreamingListenerStreamingStarted消息
JobScheduler.start()

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try{
              ...
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
            scheduler.listenerBus.post(
              StreamingListenerStreamingStarted(System.currentTimeMillis()))
          } catch {
            ...
          }
          StreamingContext.setActiveContext(this)
        }
        ...
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

第二步: 調用JobScheduler.start()執行如下主要操做:this

  • 建立EventLoop用於處理接收到的JobSchedulerEvent,processEvent就是實際的處理邏輯
  • 調用jobGenerator.start()
JobScheduler.start():

def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    //建立一個Event監聽器並啓動
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()
    ...
    //啓動JobGenerator
    jobGenerator.start()
    ...
  }

第三步: JobGenerator.start()執行如下主要操做:spa

  • 建立EventLoop[JobGeneratorEvent]用於處理JobGeneratorEvent事件
  • 開始執行job的生成工做
    • 建立一個timer週期地執行eventLoop.post(GenerateJobs(new Time(longTime)))
    • JobGenerator.start()中的EventLoop收到GenerateJobs事件後,去執行generateJobs(time)
    • generateJobs()中生成JobSet並調用jobScheduler.submitJobSet()進行提交,而後發送一個DoCheckpointEvent進行checkpoint
JobGenerator.start()

def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started
    //建立checkpointWriter用於將checkpoint信息持久化
    checkpointWriter
    //建立了Event監聽器,用於監聽JobGeneratorEvent並處理
    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      //從checkpoint中恢復
      restart()
    } else {
      //首次建立
      startFirstTime()
    }
}

首次啓動會調用startFirstTime(),在該方法中主要是調用已經初始化好的RecurringTimer.start()進行週期性的發送GenerateJobs事件,這個週期是ssc.graph.batchDuration.milliseconds也就是你所設置的batchTime,JobGenerate.start()中所建立好的EventLoop收到GenerateJobs事件,就會執行processEvent(),從而執行generateJobs(time)來進行Job的生成工做線程

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }

generateJobs的主要工做:rest

  • 生成JobSet並調用jobScheduler.submitJobSet()進行提交
  • 發送一個DoCheckpointEvent進行checkpoint
private def generateJobs(time: Time) {
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

第一個操做:jobScheduler.submitJobSet()中的主要操做是遍歷jobSet中的job,並將其做爲參數傳入JobHandler對象中,並將JobHandler丟到jobExecutor中去執行。JobHandler是實現了Runnable,它的run()主要作了如下三件事code

  • 發送JobStarted事件
  • 執行Job.run()
  • 發送JobCompleted事件
def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
}

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._
    def run() {
      try {
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//發送JobStarted事件
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//發送JobCompleted事件
          }
        } else {
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }
  }

第二個操做:發送DoCheckpoint事件

JobScheduler.start()中建立的EventLoop的核心內容是processEvent(event)方法,Event的類型有三種,分別是JobStarted、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件後會執行doCheckpoint():

//JobGenerator.processEvent()
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      ...
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      ...
    }
  }

doCheckpoint()調用graph.updateCheckpointData進行checkpoint信息的更新,調用checkpointWriter.write對checkpoint信息進行持久化

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      //將新的checkpoint寫到
      ssc.graph.updateCheckpointData(time)
      //將checkpoint寫到文件系統中
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
    } else if (clearCheckpointDataLater) {
      markBatchFullyProcessed(time)
    }
  }

checkpoint的update中主要是調用DStreamGraph.updateCheckpointData:

def updateCheckpointData(time: Time) {
    logInfo("Updating checkpoint data for time " + time)
    this.synchronized {
      outputStreams.foreach(_.updateCheckpointData(time))
    }
    logInfo("Updated checkpoint data for time " + time)
  }

outputStreams.foreach(_.updateCheckpointData(time))則是調用了DStream中的updateCheckpointData方法,而該方法主要是調用checkpointData.update(currentTime)來進行更新,而且調用該DStream所依賴的DStream進行更新。

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()

private[streaming] def updateCheckpointData(currentTime: Time) {
    logDebug(s"Updating checkpoint data for time $currentTime")
    checkpointData.update(currentTime)
    dependencies.foreach(_.updateCheckpointData(currentTime))
    logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData")
  }

咱們接下來來看看checkpointData.update(currentTime):咱們能夠在DStream中看到如下的實現:

private[streaming] val checkpointData = new DStreamCheckpointData(this)

咱們接着找到了:DStreamCheckpointData.update,DStreamCheckpointData有其餘子類用於自定義保存的內容和邏輯

//key爲指定時間,value爲checkpoint file內容
  @transient private var timeToCheckpointFile = new HashMap[Time, String]
  // key爲batchtime,value該batch中最早checkpointed RDD的time
  @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
  protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]

def update(time: Time) {
    //從dsteam中得到要checkpoint的RDDs,generatedRDDs就是一個HashMap[Time, RDD[T]]
    val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined)
                                       .map(x => (x._1, x._2.getCheckpointFile.get))
    logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n"))

    // checkpoint文件添加到最後要進行序列化的HashMap中
    if (!checkpointFiles.isEmpty) {
      currentCheckpointFiles.clear()
      currentCheckpointFiles ++= checkpointFiles
      //更新checkpointfile
      timeToCheckpointFile ++= currentCheckpointFiles
      // key爲傳入的time,value爲最早進行checkpoint的rdd的time
      timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
    }
  }

第四步:任務完成 在上面generateJobs中所調用的jobScheduler.submitJobSet()中提到每一個Job都會做爲參數傳入JobHandler,而JobHandler會丟到JobExecutor中去執行,而JobHandler的主要工做是發送JobStarted事件,執行完任務後會發送JobCompleted事件,而JobScheduler.EventLoop收到事件後會執行handleJobCompletion方法

//JobScheduler.processEvent()
 private def processEvent(event: JobSchedulerEvent) {
    try {
      event match {
        case JobStarted(job, startTime) => handleJobStart(job, startTime)
        case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
        case ErrorReported(m, e) => handleError(m, e)
      }
    } catch {
      case e: Throwable =>
        reportError("Error in job scheduler", e)
    }
  }

handleJobCompletion方法會調用jobSet.handleJobCompletion(job),而且在最後會判斷jobSet是否已經所有完成,若是是的話會執行jobGenerator.onBatchCompletion(jobSet.time)

private def handleJobCompletion(job: Job, completedTime: Long) {
    val jobSet = jobSets.get(job.time)
    jobSet.handleJobCompletion(job)
    job.setEndTime(completedTime)
    listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
    logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
    if (jobSet.hasCompleted) {
      listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
    }
    job.result match {
      case Failure(e) =>
        reportError("Error running job " + job, e)
      case _ => //若是全部事件完成則會執行如下操做
        if (jobSet.hasCompleted) {
          jobSets.remove(jobSet.time)
          jobGenerator.onBatchCompletion(jobSet.time)
          logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
            jobSet.totalDelay / 1000.0, jobSet.time.toString,
            jobSet.processingDelay / 1000.0
          ))
        }
    }
  }

此時到JobGenerator中找到onBatchCompletion():

def onBatchCompletion(time: Time) {
    eventLoop.post(ClearMetadata(time))
}

JobGenerator.processEvent()執行clearMetadata(time)

private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
}

clearMetadata()對原數據進行checkpoint或者刪除

private def clearMetadata(time: Time) {
    ssc.graph.clearMetadata(time)

    // If checkpointing is enabled, then checkpoint,
    // else mark batch to be fully processed
    if (shouldCheckpoint) {
      //若是須要進行checkpoint,發送DoCheckpoint事件 
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
    } else {
      //將meta數據進行刪除
    }
}

##2 總結 到這裏SparkStreaming的啓動、任務生成、任務結束、Checkpoint操做基本就瞭解完畢了,最後咱們來作一個總結,SparkStreming程序的運行流程以下:

  • SparkStreamingContext.start() 啓動 JobScheduler
  • JobScheduler的啓動操做
    • JobScheduler 建立了 EventLoop[JobSchedulerEvent] 來處理 JobStarted 和 JobCompleted 事件
    • 啓動 JobGenerator
  • JobGenerator 的啓動操做
    • JobGenerator 建立了 EventLoop[JobGeneratorEvent] 來處理 GenerateJobs、ClearMetaData、DoCheckPoint和ClearCheckpointData 事件
    • 建立RecurringTimer週期性地發送 GenerateJobs 事件用於任務的週期性建立和執行
  • JobGenerator的任務生成工做
    • 調用 geneateJobs() 來生成 JobSet 並經過 JobScheduler.submitJobset 進行任務的提交
      • submitJobset 將 Job 做爲參數傳入 JobHandler ,並將 JobHandler 丟進線程池 JobExecutor 中執行
    • 發送 DoCheckPoint 事件進行元數據的 checkpoint
  • 任務完成
    • JobHandler 中任務完成以後會發送 JobCompleted 事件,JobScheduler.EventLoop 接收到該事件後會進行處理,而且判斷 JobSet 所有完成以後,發送 ClearMetaData 事件,進行數據的 checkpoint 或者刪除

附:RecurringTimer和EventLoop的源碼,並做簡單的註釋

RecurringTimer的代碼以下:

private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {
  //建立一個thread,用來執行loop
  private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
  }

  @volatile private var prevTime = -1L
  @volatile private var nextTime = -1L
  @volatile private var stopped = false

  def getStartTime(): Long = {
    (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period
  }

  def getRestartTime(originalStartTime: Long): Long = {
    val gap = clock.getTimeMillis() - originalStartTime
    (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime
  }

  //start方法中主要是啓動thread,用於執行thread中的loop
  def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }

  def start(): Long = {
    start(getStartTime())
  }

  def stop(interruptTimer: Boolean): Long = synchronized {
    if (!stopped) {
      stopped = true
      if (interruptTimer) {
        thread.interrupt()
      }
      thread.join()
      logInfo("Stopped timer for " + name + " after time " + prevTime)
    }
    prevTime
  }

  private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }

  //週期性地執行callback的內容,也就是 
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
}

EventLoop的源碼:主要功能就是建立一個線程在後臺判斷是否Event進來,有的話則進行相應的處理

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

  def stop(): Unit = {
    if (stopped.compareAndSet(false, true)) {
      eventThread.interrupt()
      var onStopCalled = false
      try {
        eventThread.join()
        // Call onStop after the event thread exits to make sure onReceive happens before onStop
        onStopCalled = true
        onStop()
      } catch {
        case ie: InterruptedException =>
          Thread.currentThread().interrupt()
          if (!onStopCalled) {
            // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
            // it's already called.
            onStop()
          }
      }
    } else {
      // Keep quiet to allow calling `stop` multiple times.
    }
  }

  //將event放進eventQueue中,在eventThread進行相應的處理
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

  //返回eventThread是否存活
  def isActive: Boolean = eventThread.isAlive

  //eventThread啓動前調用
  protected def onStart(): Unit = {}

  //在eventThred結束後調用
  protected def onStop(): Unit = {}

  //實現類實現Event的處理邏輯
  protected def onReceive(event: E): Unit

  //出錯時的處理邏輯
  protected def onError(e: Throwable): Unit

}
相關文章
相關標籤/搜索