一般咱們開發spark-streaming都會用到以下代碼:閉包
val sparkConf = new SparkConf() .set("xxx", "") ... val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) //而後咱們使用ssc來建立一個InputDStream val stream = ssc.socketTextStream("localhost", 9090) stream.map(...) .filter(...) .reduce(...) .foreachRDD(...) ssc.start() ssc.awaitTermination()
咱們知道stream的一系列調用鏈其實是構建dag的過程,咱們深刻foreachRDD代碼中,查看到:socket
private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }
DStream::registeride
private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this }
DStreamGraph::addOutputStreamoop
def addOutputStream(outputStream: DStream[_]) { this.synchronized { outputStream.setGraph(this) outputStreams += outputStream } }
這裏DStream將本身註冊到DStreamGraph裏。後續生成做業的時候會遍歷這個outputStreams。到這裏dag已經構建並註冊完畢,下面咱們看StreamingContext啓動的以後(start)是如何每隔一段時間生成一個做業的。post
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread. ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get())) scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
這裏最核心的代碼就是調用了JobScheduler.start(),JobScheduler是在StreamingContext建立的時候初始化的。ui
//JobScheduler::start def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) executorAllocationManager = ExecutorAllocationManager.createIfEnabled( ssc.sparkContext, receiverTracker, ssc.conf, ssc.graph.batchDuration.milliseconds, clock) executorAllocationManager.foreach(ssc.addStreamingListener) receiverTracker.start() jobGenerator.start() executorAllocationManager.foreach(_.start()) logInfo("Started JobScheduler") }
這裏咱們關注jobGenerator的啓動。來看下JobGenerator.startthis
def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
咱們看到這裏也初始化並啓動了一個事件循環線程,接受JobGeneratorEvent類型的事件,同時第一次啓動時調用了startFirstTime方法,spa
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
在這個方法中啓動了DStreamGraph和timer,咱們看timer是個什麼東西:線程
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
發現它其實是一個定時器,每隔一段時間就向事件循環線程中發送一個GenerateJobs事件,而這一段時間就是咱們建立StreamingContext時傳進來的時間間隔參數。而後咱們來看事件循環線程是如何處理這個事件的:rest
private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } } private def generateJobs(time: Time) { // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
這裏重點關注調用了DStreamGraph的generateJobs方法返回job集合和後續調用JobScheduler.submitJobSet方法
//DStreamGraph::generateJobs def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
看到這裏遍歷了上面提到的outputStreams,針對每一個註冊的DStream調用其generateJob方法生成job,這裏調用的實際上是DStream的子類ForEachDStream的generateJob方法,由於全部的action算子的底層實現最終都會落到ForEachDStream上。
//ForEachDStream::generateJob override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
getOrCompute方法內部會調用compute方法來返回一個RDD,而這個compute方法是一個抽象方法,須要子類去實現,通常來講InputDStream直接在compute中建立一個RDD,而其餘tranform算子的compute方法經過對其依賴的上游算子產生的RDD作變換獲得一個新RDD。你們有興趣能夠看一下對接kafka模塊的DirectInputDStream是如何實現的。這裏返回一個rdd以後,建立了一個閉包封裝在Job對象中返回,閉包中走了SparkContext提交job的邏輯,那麼咱們接下來看Job中的閉包是何時執行的。 從這裏返回一些列調用棧回到JobGenerator.generateJobs方法的JobScheduler.submitJobSet處
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }
在submitJobSet中,依次將job封裝在JobHandler中,提交到jobExecutor來執行,JobHandler是Runnable接口的一個實現,它的run方法中,比較核心的邏輯以下:
var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } }
這裏調用了job.run方法,而在run方法中實際執行的就是咱們傳遞給Job對象的閉包。整個job的運行是一個阻塞的過程,會獨佔jobExecutor的一個線程,而JobExecutor的線程數在初始化的時候是經過判斷spark.streaming.concurrentJobs參數來指定的,默認爲1。 好了,streaming模塊生成做業的邏輯已經大致分析完成了。