上文已經從源碼分析了Receiver接收的數據交由BlockManager管理,整個數據接收流都已經運轉起來了,那麼讓咱們回到分析JobScheduler的博客中。apache
// JobScheduler.scala line 62 def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
前面好幾篇博客都是 由 receiverTracker.start() 延展開。延展完畢後,繼續下一步。app
// JobScheduler.scala line 83 jobGenerator.start()
jobGenerator的實例化過程,前面已經分析過。深刻下源碼瞭解到。socket
// JobGenerator.scala line 78 /** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }
// JobGenerator.scala line 189 /** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
將DStreamGraph.startide
// DStreamGraph.scala line 39 def start(time: Time) { this.synchronized { require(zeroTime == null, "DStream graph computation already started") zeroTime = time startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) outputStreams.foreach(_.validateAtStart) inputStreams.par.foreach(_.start()) } }
至此,只是作了一些簡單的初始化,並無讓數據處理起來。函數
再回到JobGenerator。此時,將循環定時器啓動,oop
// JobGenerator.scala line 193 timer.start(startTime.milliseconds)
循環定時器啓動;讀者是否是很熟悉,是否是在哪見過這個循環定時器?源碼分析
沒錯,就是BlockGenerator.scala line 105 、109 ,兩個線程,其中一個是循環定時器,定時將數據放入待push隊列中。post
// RecurringTimer.scala line 59 def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime }
具體的邏輯是在構造是傳入的方法:longTime => eventLoop.post(GenerateJobs(new Time(longTime)));ui
輸入是Long,this
方法體是eventLoop.post(GenerateJobs(new Time(longTime)))
// JobGenerator.scala line 58 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
只要線程狀態不是stopped,一直循環。
private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { // RecurringTimer.scala line 27 private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } // RecurringTimer.scala line 56 /** * Start at the given start time. */ def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } // RecurringTimer.scala line 92 private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } // RecurringTimer.scala line 100 /** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } // ...一些代碼 }
定時發送GenerateJobs 類型的事件消息,eventLoop.post中將事件消息加入到eventQueue中
// EventLoop.scala line 102 def post(event: E): Unit = { eventQueue.put(event) }
同時,此EventLoop中的另外一個成員變量 eventThread。會一直從隊列中取事件消息,將此事件做爲參數調用onReceive。而此onReceive在實例化時被override了。
// JobGenerator.scala line 86 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start()
onReceive調用的是
// JobGenerator.scala line 177 /** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) // 其餘case class } }
GenerateJobs case class 是匹配到 generateJobs(time:Time) 來處理
// JobGenerator.scala line 240 /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
上述代碼不是特別容易理解。細細拆分:咋一看覺得是try{} catch{case ... },仔細一看,是Try{}match{}
追蹤下代碼,原來Try是大寫的,是一個伴生對象,apply接收的參數是一個方法,返回Try的實例。在scala.util.Try.scala 代碼以下:
// scala.util.Try.scala line 155 object Try { /** Constructs a `Try` using the by-name parameter. This * method will ensure any non-fatal exception is caught and a * `Failure` object is returned. */ def apply[T](r: => T): Try[T] = try Success(r) catch { case NonFatal(e) => Failure(e) } }
Try有兩個子類,都是case class 。分別是Success和Failure。如圖。
再返回調用處,Try中的代碼塊最後執行的是 graph.generateJobs(time) 。跟蹤下:
返回的是outputStream.generateJob(time)。
// DStreamGraph.scala line 111 def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
從前文可知,outputStream其實都是ForEachDStream。進入ForEachDStream,override了generateJob。
// ForEachDStream.scala line 46 override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
那麼ForEachDStream的parent是什麼呢?看下咱們的案例:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次數據 val lines = ssc.socketTextStream("localhost", 9999) // 監聽 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 後 reduce words.print() // 打印結果 ssc.start() // 啓動 ssc.awaitTermination() ssc.stop(true) } }
按照前文的描述:本例中 DStream的依賴是 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream
筆者掃描了下DStream及其全部子類,發現只有DStream有 getOrCompute,沒有一個子類override了此方法。如此一來,是ShuffledDStream.getorCompute
在通常狀況下,是RDD不存在,執行orElse代碼快,
// DStream.scala line 338 /** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) // line 352 } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }
ShuffledDStream.compute
又調用parent.getOrCompute
// ShuffledDStream.scala line 40 override def compute(validTime: Time): Option[RDD[(K, C)]] = { parent.getOrCompute(validTime) match { case Some(rdd) => Some(rdd.combineByKey[C]( createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)) case None => None } }
MappedDStream的compute,又是父類的getOrCompute,結果又調用compute,如此循環。
// MappedDStream.scala line 34 override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) }
FlatMappedDStream的compute,又是父類的getOrCompute。結果又調用compute,如此循環。
// FlatMappedDStream.scala line 34 override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) }
直到DStreamshi SocketInputDStream,也就是inputStream時,compute是繼承自父類。
先不考慮if中的邏輯,直接else代碼塊。
進入createBlockRDD
// ReceiverInputDStream.scala line 69 override def compute(validTime: Time): Option[RDD[T]] = { val blockRDD = { if (validTime < graph.startTime) { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // driver failure without any write ahead log to recover pre-failure data. new BlockRDD[T](ssc.sc, Array.empty) } else { // Otherwise, ask the tracker for all the blocks that have been allocated to this stream // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }
new BlockRDD[T](ssc.sc, validBlockIds) line 127,RDD實例化成功
// ReceiverInputDStream.scala line 94 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { if (blockInfos.nonEmpty) { val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } if (areWALRecordHandlesPresent) { // If all the blocks have WAL record handle, then create a WALBackedBlockRDD val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) } else { // Else, create a BlockRDD. However, if there are some blocks with WAL info but not // others then that is unexpected and log a warning accordingly. if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { logError("Some blocks do not have Write Ahead Log information; " + "this is unexpected and data may not be recoverable after driver failures") } else { logWarning("Some blocks have Write Ahead Log information; this is unexpected") } } val validBlockIds = blockIds.filter { id => ssc.sparkContext.env.blockManager.master.contains(id) } if (validBlockIds.size != blockIds.size) { logWarning("Some blocks could not be recovered as they were not found in memory. " + "To prevent such data loss, enabled Write Ahead Log (see programming guide " + "for more details.") } new BlockRDD[T](ssc.sc, validBlockIds) // line 127 } } else { // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD // according to the configuration if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { new WriteAheadLogBackedBlockRDD[T]( ssc.sparkContext, Array.empty, Array.empty, Array.empty) } else { new BlockRDD[T](ssc.sc, Array.empty) } } }
此BlockRDD是Spark Core的RDD的子類,且沒有依賴的RDD。至此,RDD的實例化已經完成。
// BlockRDD.scala line 30 private[spark] class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId]) extends RDD[T](sc, Nil) // RDd.scala line 74 abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging
至此,最終還原回來的RDD:
new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(flatMapFunc)).map(_.map[U](mapFunc)).combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)。
在本例中則爲
new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true)
而最終的print爲
() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)
其中foreachFunc爲 DStrean.scala line 766
至此,RDD已經經過DStream實例化完成,如今再回顧下,是否能夠理解DStream是RDD的模版。
不過別急,回到ForEachDStream.scala line 46 ,將上述函數做爲構造參數,傳入Job。
-------------分割線--------------
補充下Job建立的流程圖,來源於版本定製班學員博客,略有修改。
補充下RDD按照lineage從 OutputDStream 回溯 建立RDD Dag的流程圖,來源於版本定製班學員博客
補充案例中 RDD按照lineage從 OutputDStream 回溯 建立RDD Dag的流程圖,來源於版本定製班學員博客
下節內容從源碼分析Job提交,敬請期待。