本文主要經過源碼來了解SparkStreaming程序從任務生成到任務完成整個執行流程以及中間伴隨的checkpoint操做app
注:下面源碼只貼出跟分析內容有關的代碼,其餘省略ide
##1 分析流程 應用程序入口:oop
val sparkConf = new SparkConf().setAppName("SparkStreaming") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(batchDuration.toLong)) ssc.start() ssc.awaitTermination()
一旦ssc.start()調用以後,程序便真正開始運行post
第一步: SparkStreamingContext.start()進行以下主要工做:ui
JobScheduler.start() def start(): Unit = synchronized { state match { case INITIALIZED => StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try{ ... scheduler.start() } state = StreamingContextState.ACTIVE scheduler.listenerBus.post( StreamingListenerStreamingStarted(System.currentTimeMillis())) } catch { ... } StreamingContext.setActiveContext(this) } ... case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
第二步: 調用JobScheduler.start()執行如下主要操做:this
JobScheduler.start(): def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //建立一個Event監聽器並啓動 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() ... //啓動JobGenerator jobGenerator.start() ... }
第三步: JobGenerator.start()執行如下主要操做:spa
JobGenerator.start() def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started //建立checkpointWriter用於將checkpoint信息持久化 checkpointWriter //建立了Event監聽器,用於監聽JobGeneratorEvent並處理 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { //從checkpoint中恢復 restart() } else { //首次建立 startFirstTime() } }
首次啓動會調用startFirstTime(),在該方法中主要是調用已經初始化好的RecurringTimer.start()進行週期性的發送GenerateJobs事件,這個週期是ssc.graph.batchDuration.milliseconds也就是你所設置的batchTime,JobGenerate.start()中所建立好的EventLoop收到GenerateJobs事件,就會執行processEvent(),從而執行generateJobs(time)來進行Job的生成工做線程
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
generateJobs的主要工做:rest
private def generateJobs(time: Time) { ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
第一個操做:jobScheduler.submitJobSet()中的主要操做是遍歷jobSet中的job,並將其做爲參數傳入JobHandler對象中,並將JobHandler丟到jobExecutor中去執行。JobHandler是實現了Runnable,它的run()主要作了如下三件事code
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } } private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis()))//發送JobStarted事件 SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))//發送JobCompleted事件 } } else { } } finally { ssc.sparkContext.setLocalProperties(oldProps) } } }
第二個操做:發送DoCheckpoint事件
JobScheduler.start()中建立的EventLoop的核心內容是processEvent(event)方法,Event的類型有三種,分別是JobStarted、JobCompleted和ErrorReported,EventLoop收到DoCheckpoint事件後會執行doCheckpoint():
//JobGenerator.processEvent() private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { ... case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) ... } }
doCheckpoint()調用graph.updateCheckpointData進行checkpoint信息的更新,調用checkpointWriter.write對checkpoint信息進行持久化
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) //將新的checkpoint寫到 ssc.graph.updateCheckpointData(time) //將checkpoint寫到文件系統中 checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } else if (clearCheckpointDataLater) { markBatchFullyProcessed(time) } }
checkpoint的update中主要是調用DStreamGraph.updateCheckpointData:
def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) }
outputStreams.foreach(_.updateCheckpointData(time))則是調用了DStream中的updateCheckpointData方法,而該方法主要是調用checkpointData.update(currentTime)來進行更新,而且調用該DStream所依賴的DStream進行更新。
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]() private[streaming] def updateCheckpointData(currentTime: Time) { logDebug(s"Updating checkpoint data for time $currentTime") checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData") }
咱們接下來來看看checkpointData.update(currentTime):咱們能夠在DStream中看到如下的實現:
private[streaming] val checkpointData = new DStreamCheckpointData(this)
咱們接着找到了:DStreamCheckpointData.update,DStreamCheckpointData有其餘子類用於自定義保存的內容和邏輯
//key爲指定時間,value爲checkpoint file內容 @transient private var timeToCheckpointFile = new HashMap[Time, String] // key爲batchtime,value該batch中最早checkpointed RDD的time @transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time] protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]] def update(time: Time) { //從dsteam中得到要checkpoint的RDDs,generatedRDDs就是一個HashMap[Time, RDD[T]] val checkpointFiles = dstream.generatedRDDs.filter(_._2.getCheckpointFile.isDefined) .map(x => (x._1, x._2.getCheckpointFile.get)) logDebug("Current checkpoint files:\n" + checkpointFiles.toSeq.mkString("\n")) // checkpoint文件添加到最後要進行序列化的HashMap中 if (!checkpointFiles.isEmpty) { currentCheckpointFiles.clear() currentCheckpointFiles ++= checkpointFiles //更新checkpointfile timeToCheckpointFile ++= currentCheckpointFiles // key爲傳入的time,value爲最早進行checkpoint的rdd的time timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering) } }
第四步:任務完成 在上面generateJobs中所調用的jobScheduler.submitJobSet()中提到每一個Job都會做爲參數傳入JobHandler,而JobHandler會丟到JobExecutor中去執行,而JobHandler的主要工做是發送JobStarted事件,執行完任務後會發送JobCompleted事件,而JobScheduler.EventLoop收到事件後會執行handleJobCompletion方法
//JobScheduler.processEvent() private def processEvent(event: JobSchedulerEvent) { try { event match { case JobStarted(job, startTime) => handleJobStart(job, startTime) case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { case e: Throwable => reportError("Error in job scheduler", e) } }
handleJobCompletion方法會調用jobSet.handleJobCompletion(job),而且在最後會判斷jobSet是否已經所有完成,若是是的話會執行jobGenerator.onBatchCompletion(jobSet.time)
private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo)) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => //若是全部事件完成則會執行如下操做 if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) } } }
此時到JobGenerator中找到onBatchCompletion():
def onBatchCompletion(time: Time) { eventLoop.post(ClearMetadata(time)) }
JobGenerator.processEvent()執行clearMetadata(time)
private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } }
clearMetadata()對原數據進行checkpoint或者刪除
private def clearMetadata(time: Time) { ssc.graph.clearMetadata(time) // If checkpointing is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { //若是須要進行checkpoint,發送DoCheckpoint事件 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) } else { //將meta數據進行刪除 } }
##2 總結 到這裏SparkStreaming的啓動、任務生成、任務結束、Checkpoint操做基本就瞭解完畢了,最後咱們來作一個總結,SparkStreming程序的運行流程以下:
附:RecurringTimer和EventLoop的源碼,並做簡單的註釋
RecurringTimer的代碼以下:
private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { //建立一個thread,用來執行loop private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } @volatile private var prevTime = -1L @volatile private var nextTime = -1L @volatile private var stopped = false def getStartTime(): Long = { (math.floor(clock.getTimeMillis().toDouble / period) + 1).toLong * period } def getRestartTime(originalStartTime: Long): Long = { val gap = clock.getTimeMillis() - originalStartTime (math.floor(gap.toDouble / period).toLong + 1) * period + originalStartTime } //start方法中主要是啓動thread,用於執行thread中的loop def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } def start(): Long = { start(getStartTime()) } def stop(interruptTimer: Boolean): Long = synchronized { if (!stopped) { stopped = true if (interruptTimer) { thread.interrupt() } thread.join() logInfo("Stopped timer for " + name + " after time " + prevTime) } prevTime } private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } //週期性地執行callback的內容,也就是 private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } }
EventLoop的源碼:主要功能就是建立一個線程在後臺判斷是否Event進來,有的話則進行相應的處理
private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } def stop(): Unit = { if (stopped.compareAndSet(false, true)) { eventThread.interrupt() var onStopCalled = false try { eventThread.join() // Call onStop after the event thread exits to make sure onReceive happens before onStop onStopCalled = true onStop() } catch { case ie: InterruptedException => Thread.currentThread().interrupt() if (!onStopCalled) { // ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since // it's already called. onStop() } } } else { // Keep quiet to allow calling `stop` multiple times. } } //將event放進eventQueue中,在eventThread進行相應的處理 def post(event: E): Unit = { eventQueue.put(event) } //返回eventThread是否存活 def isActive: Boolean = eventThread.isAlive //eventThread啓動前調用 protected def onStart(): Unit = {} //在eventThred結束後調用 protected def onStop(): Unit = {} //實現類實現Event的處理邏輯 protected def onReceive(event: E): Unit //出錯時的處理邏輯 protected def onError(e: Throwable): Unit }