本期內容:app
1,JobScheduler內幕實現ide
2,JobScheduler深度思考函數
DStream的foreachRDD方法,實例化ForEachDStream對象,並將用戶定義的函數foreachFunc傳入到該對象中。foreachRDD方法是輸出操做,foreachFunc方法會做用到這個DStream中的每一個RDD。this
/** * Apply a function to each RDD in this DStream. This is an output operator, so * 'this' DStream will be registered as an output stream and therefore materialized. * @param foreachFunc foreachRDD function * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * in the `foreachFunc` to be displayed in the UI. If `false`, then * only the scopes and callsites of `foreachRDD` will override those * of the RDDs on the display. */ private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() }spa |
ForEachDStream對象中重寫了generateJob方法,調用父DStream的getOrCompute方法來生成RDD並封裝Job,傳入對該RDD的操做函數foreachFunc和time。dependencies方法定義爲父DStream的集合。.net
/** * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent Parent DStream * @param foreachFunc Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * by `foreachFunc` will be displayed in the UI; only the scope and * callsite of `DStream.foreachRDD` will be displayed. */ private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }線程 |
DStreamGraph的generateJobs方法中會調用outputStream的generateJob方法,就是調用ForEachDStream的generateJob方法。scala
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }對象 |
DStream的generateJob定義以下,其子類中只有ForEachDStream重寫了generateJob方法。ci
/** * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * to generate their own jobs. */ private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } } |
DStream的print方法內部仍是調用foreachRDD來實現,傳入了內部方法foreachFunc,來取出num+1個數後打印輸出。
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) } |
總結:JobScheduler是SparkStreaming 全部Job調度的中心,內部有兩個重要的成員:
JobGenerator負責Job的生成,ReceiverTracker負責記錄輸入的數據源信息。
JobScheduler的啓動會致使ReceiverTracker和JobGenerator的啓動。ReceiverTracker的啓動致使運行在Executor端的Receiver啓動而且接收數據,ReceiverTracker會記錄Receiver接收到的數據meta信息。JobGenerator的啓動致使每隔BatchDuration,就調用DStreamGraph生成RDD Graph,並生成Job。JobScheduler中的線程池來提交封裝的JobSet對象(時間值,Job,數據源的meta)。Job中封裝了業務邏輯,致使最後一個RDD的action被觸發,被DAGScheduler真正調度在Spark集羣上執行該Job。