本期內容:java
1,JobScheduler內幕實現微信
2,JobScheduler深度思考併發
摘要:JobScheduler是Spark Streaming整個調度的核心,其地位至關於Spark Core上的調度中心中的DAGScheduler!app
問:JobScheduler是在什麼地方生成的?ide
答:JobScheduler是在StreamingContext實例化時產生的,從StreamingContext的源碼第183行中能夠看出:oop
private[streaming] val scheduler = new JobScheduler(this)post
問:Spark Streaming爲啥要設置兩條線程?
答:setMaster指定的兩條線程是指程序運行的時候至少須要兩條線程。一條線程用於接收數據,須要不斷的循環。另外一條是處理線程,是咱們本身指定的線程數用於做業處理。如StreamingContext的start()方法所示:ui
def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) StreamingContext.ACTIVATION_LOCK.synchronized { StreamingContext.assertNoOtherContextIsActive() try { validate() // Start the streaming scheduler in a new thread, so that thread local properties // like call sites and job groups can be reset without affecting those of the // current thread.
//Spark Streaming內部啓動的線程,用於整個做業的調度 ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() } state = StreamingContextState.ACTIVE } catch { case NonFatal(e) => logError("Error starting the context, marking it as stopped", e) scheduler.stop(false) state = StreamingContextState.STOPPED throw e } StreamingContext.setActiveContext(this) } shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }
進入JobScheduler源碼:this
/**
JobScheduler負責邏輯層面的Job,並將其物理級別的運行在Spark之上 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool.
*/ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { //經過JobSet集合,不斷地存放接收到的Job private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
//設置並行度,默認爲1,想要修改做業運行的並行度在spark-conf或者應用程序中修改此值就中
爲何要修改併發度呢?
答:有時候應用程序中有多個輸出,會致使多個job的執行,都是在一個batchDurations裏面,job之間執行無需互相等待,因此能夠經過設置此值併發執行!
不一樣的Batch,線程池中有不少的線程,也能夠併發運行! private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
//將邏輯級別的Job轉化爲物理級別的job就是經過newDaemonFixedThreadPool線程實現的 private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
//實例化JobGenerator private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus()
//下面三個是說在JobScheduler啓動時實例化 // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null private var eventLoop: EventLoop[JobSchedulerEvent] = null def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }
下面從應用程序的輸出方法print()入手,反推Job的生成過程:spa
1.點擊應用程序中的print()方法後,跳入DStream的print():
/**
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(): Unit = ssc.withScope {
print(10)
}
2.再次點擊上面紅線標記的print()方法:
/** * Print the first num elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) }
從圖中紅色標記的代碼能夠得出:SparkStreaming最終執行的時候仍是對RDD進行各類邏輯級別的操做!
3.再次點擊圖上的foreachRDD進入foreachRDD方法:
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
* @param foreachFunc foreachRDD function
* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
* in the `foreachFunc` to be displayed in the UI. If `false`, then
* only the scopes and callsites of `foreachRDD` will override those
* of the RDDs on the display.
*/
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
4.點擊上圖的ForEachDStream進入ForEachDStream類並找到了generateJob方法:
/** * An internal DStream used to represent output operations like DStream.foreachRDD. * @param parent Parent DStream * @param foreachFunc Function to apply on each RDD generated by the parent DStream * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated * by `foreachFunc` will be displayed in the UI; only the scope and * callsite of `DStream.foreachRDD` will be displayed. */ private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None //根據時間間隔不斷的產生Job override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
//基於時間生成的RDD,因爲是輸出,因此是最後一個RDD,接下來咱們只要找出哪兒調用ForEachDStream的generateJob方法,就能知道job最終的生成 foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }
5.上一講中咱們得出了以下的流程:
streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)
其中最後的graph.generateJobs是DSTreamGraph的方法,進入之:
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { //此時的outputStream就是forEachDStream outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
private val outputStreams = new ArrayBuffer[DStream[_]]()
經過查看DStream的子類繼承結構和上面的ForEachDStream的generateJob方法,得出DStream的子類中只有ForEachDStream override了DStream的generateJob!
最終得出結論:
generateJob方法中:真正Job的生成是經過ForeachDStream的generateJob來生成的,此時的job是邏輯級別的,真正被物理級別的調用是在JobGenerator中
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
進入jobScheduler.submitJobSet方法:
//將邏輯級別的Job轉化爲物理級別的job就是經過newDaemonFixedThreadPool線程實現的 private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
至此,整個job的生成、執行就很是清晰了,最後總結以下:
從上一講中,咱們得知JobScheduler包含兩個核心組件JobGenerator和ReceiverTracker,它們分別負責Job的生成和源數據的接收,
ReceiverTracker啓動後會致使運行在Executor端的Receiver啓動而且接收數據,ReceiverTracker會記錄Receiver接收到的數據meta信息,
JobGenerator的啓動致使每隔BatchDuration,就調用DStreamGraph生成RDD Graph,並生成Job,
JobScheduler中的線程池來提交封裝的JobSet對象(時間值,Job,數據源的meta)。Job中封裝了業務邏輯,致使最後一個RDD的action被觸發,
被DAGScheduler真正調度在Spark集羣上執行該Job。
特別感謝王家林老師的獨具一格的講解:
王家林老師名片:
中國Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公衆號:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
QQ:1740415547
YY課堂:天天20:00現場授課頻道68917580