本期內容:java
1,Spark Streaming Job生成深度思考git
2,Spark Streaming Job生成源碼解析github
先來看下JobGenerator類,其構造函數中須要傳入JobScheduler對象,而JobScheduler類是Spark Streaming Job生成和提交Job到集羣的核心。JobGenerator基於DStreamGraph 來生成Job,再次強調這裏的Job至關於Java中Runnable接口對業務邏輯的封裝,他和Spark Core中Job不是同一個概念,Spark Core中的Job就是運行的做業,Spark Streaming中的Job是更高層的抽象。數組
/** * This class generates jobs from DStreams as well as drives checkpointing and cleaning * up DStream metadata. */ private[streaming] class JobGenerator(jobScheduler: JobScheduler) extends Logging { private val ssc = jobScheduler.ssc private val conf = ssc.conf private val graph = ssc.graph微信 |
Spark Streaming中的Job,只是一個Java Bean,業務邏輯在func這個函數中。併發
/** * Class representing a Spark computation. It may contain multiple Spark jobs. */ private[streaming] class Job(val time: Time, func: () => _) { private var _id: String = _ private var _outputOpId: Int = _ private var isSet = false private var _result: Try[_] = null private var _callSite: CallSite = null private var _startTime: Option[Long] = None private var _endTime: Option[Long] = Noneapp |
DStream有三種類型,第一種是不一樣的輸入來源構建的Stream,例如來自Socket,Kafka,Flume,第二種是輸出,outputStreams 是邏輯級別的Action,因爲仍是Spark Streaming框架級別的,最終還要變爲物理級別的Action,第三種是Transforms操做從一種DStream轉變爲另外一種DStream,即基於其餘DStream產生的。其中DStreamGraph 類記錄了數據來源的DStream,和輸出類型的DStream。框架
//DStreamGraph是RDD的靜態的模板,表示RDD依賴關係構成的具體處理邏輯步驟 final private[streaming] class DStreamGraph extends Serializable with Logging { // InputDStream類型的動態數組 //輸入流:數據來源 private val inputStreams = new ArrayBuffer[InputDStream[_]]() //輸出流:具體Action的輸出操做 private val outputStreams = new ArrayBuffer[DStream[_]]()ide |
JobGenerator會根據BatchDuration時間間隔,隨着時間的推移,會不斷的產生做業,驅使checkpoint操做和清理以前DStream的數據。函數
對於流處理和批處理的思考。批處理間隔時間足夠短的話就是流處理。Spark Streaming的流處理是以時間爲觸發器的,Strom的流處理是事件爲觸發器的。定時任務,流處理,J2EE觸發做業。
思考一個問題:DStreamGraph邏輯級別翻譯成物理級別的RDD Graph,最後一個操做是RDD的action操做,是否會當即觸發Job?
JobGenerator產生的Job是Runnable的封裝,對DStream的依賴關係生成RDD之間的依賴關係,最後的操做就是Action,因爲這些操做都是在方法中,尚未被調用因此並無在翻譯時觸發Job。若是在翻譯時就觸發Job,這樣整個Spark Streaming的Jon提交就不受管理了。
當JobScheduler要調度Job的時候,就從線程池中拿出一個線程來執行封裝Dstream到RDD的方法。
接下來從JobGenerator,JobScheduler,ReceiverTracker這三個角度來說Job的生成。其中JobGenerator是負責Job的生成,JobScheduler是負責Job的調度,ReceiverTracker是記錄數據的來源。JobGenerator和ReceiverTracker是JobScheduler的成員。
/** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool. * 本類對運行在Spark上的job進行調度。使用JobGenerator來生成Jobs,而且在線程池運行。 * 說的很清楚了。由JobGenerator生成Job,在線程池中運行。 */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 默認併發Jobs數爲1 private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) // 使用線程池方式執行 private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") // 建立JobGenerator,後續會詳細分析 private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null |
在JobScheduler的start方法中,分別調用了ReceiverTracker和JobGenerator的start方法。
def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") //消息驅動系統 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } //啓動消息循環處理線程 eventLoop.start() // attach rate controllers of input streams to receive batch completion updates for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) //啓動receiverTracker receiverTracker.start() //啓動Job生成器 jobGenerator.start() logInfo("Started JobScheduler") } |
先看下JobGenerator的start方法,checkpoint的初始化操做,實例化並啓動消息循環體EventLoop,開啓定時生成Job的定時器。
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } //啓動消息循環處理線程 eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { //開啓定時生成Job的定時器 startFirstTime() } } |
EvenLoop類中有存儲消息的LinkedBlockingDeque和後臺線程,後臺線程從隊列中獲取消息,而後調用onReceive方法對該消息進行處理,這裏的onReceive方法即匿名內部類中重寫onReceive方法的processEvent方法。
private[spark] abstract class EventLoop[E](name: String) extends Logging { private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]() private val stopped = new AtomicBoolean(false) private val eventThread = new Thread(name) { setDaemon(true) override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } } def start(): Unit = { if (stopped.get) { throw new IllegalStateException(name + " has already been stopped") } // Call onStart before starting the event thread to make sure it happens before onReceive onStart() eventThread.start() } |
processEvent方法是對消息類型進行模式匹配,而後路由到對應處理該消息的方法中。消息的處理通常是發給另一個線程來處理的,消息循環器不處理耗時的業務邏輯。
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } } |
以GenerateJobs消息的處理函數generateJobs爲例,在獲取到數據後調用DStreamGraph的generateJobs方法來生成Job。
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { //根據特定的時間獲取具體的數據 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //調用DStreamGraph的generateJobs生成Job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } |
generateJobs方法中outputStreams是整個DStream中的最後一個DStream。這裏outputStream.generateJob(time)相似於RDD中從後往前推。
def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) jobOption } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs } |
generateJob方法中jobFunc 封裝了context.sparkContext.runJob(rdd, emptyFunc)
private[streaming] def generateJob(time: Time): Option[Job] = { getOrCompute(time) match { case Some(rdd) => { //用函數封裝了Job自己,該方法如今沒有執行 val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None } } |
Job對象,方法run會致使傳入的func被調用。
/** * Class representing a Spark computation. It may contain multiple Spark jobs. */ private[streaming] class Job(val time: Time, func: () => _) { private var _id: String = _ private var _outputOpId: Int = _ private var isSet = false private var _result: Try[_] = null private var _callSite: CallSite = null private var _startTime: Option[Long] = None private var _endTime: Option[Long] = None def run() { _result = Try(func()) } |
getOrCompute方法,先根據傳入的時間在HashMap中查找下RDD是否存在,若是不存在則調用compute方法計算獲取RDD,再根據storageLevel 是否須要persist,是否到了checkpoint時間點進行checkpoint操做,最後把該RDD放入到HashMap中。
/** * Get the RDD corresponding to the given time; either retrieve it from cache * or compute-and-cache it. */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } } |
再次回到JobGenerator類中,看下start方法中在消息循環體啓動後,先判斷以前是否進行checkpoint操做,若是是從checkpoint目錄中讀取而後再調用restart重啓JobGenerator,若是是第一次則調用startFirstTime方法。
/** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } //啓動消息循環處理線程 eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { //開啓定時生成Job的定時器 startFirstTime() } } |
JobGenerator類中的startFirstTime方法,啓動定時生成Job的Timer
/** Starts the generator for the first time */ private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) } |
timer對象爲RecurringTimer,其start方法內部啓動一個線程,在線程中不斷調用triggerActionForNextInterval方法
// 循環定時器,定時回調 eventLoop.post(GenerateJobs(new Time(longTime)))。 // 定義了定時觸發的函數,此函數就是將 發送 類型爲"GenerateJobs"的事件 // 值得注意的事,這裏只是定義了回調函數。 //根據建立StreamContext時傳入的batchInterval,定時發送GenerateJobs消息 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") /** * Start at the given start time. */ def start(startTime: Long): Long = synchronized { nextTime = startTime thread.start() logInfo("Started timer for " + name + " at time " + nextTime) nextTime } // 這裏建立了一個守護線程 private val thread = new Thread("RecurringTimer - " + name) { setDaemon(true) override def run() { loop } } /** * Repeatedly call the callback every interval. */ private def loop() { try { while (!stopped) { triggerActionForNextInterval() } triggerActionForNextInterval() } catch { case e: InterruptedException => } } |
triggerActionForNextInterval方法,等待BatchDuration後回調callback這個方法,這裏的callback方法是構造RecurringTimer對象時傳入的方法,即longTime => eventLoop.post(GenerateJobs(new Time(longTime))),不斷向消息循環體發送GenerateJobs消息。
private def triggerActionForNextInterval(): Unit = { clock.waitTillTime(nextTime) callback(nextTime) prevTime = nextTime nextTime += period logDebug("Callback for " + name + " called at time " + prevTime) } private[streaming] class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String) extends Logging { |
咱們再次聚焦generateJobs這個方法生成Job的步驟,
第一步:獲取當前時間段內的數據。
第二步:生成Job,RDD之間的依賴關係。
第三步:獲取生成Job對應的StreamId的信息。
第四步:封裝成JobSet交給JobScheduler。
第五步:進行checkpoint操做。
/** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { //第一步:獲取當前時間段內的數據。 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch //第二步:生成Job,RDD之間的依賴關係。 graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => //第三步:獲取生成Job對應的StreamId的信息。 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) //第四步:封裝成JobSet交給JobScheduler。 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } //第五步:進行checkpoint操做。 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } |
其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封裝爲JobHandler提交到jobExecutor線程池中
def submitJobSet(jobSet: JobSet) { if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } } private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] |
JobHandler對象爲實現Runnable 接口,job的run方法致使了func的調用,即基於DStream的業務邏輯
private class JobHandler(job: Job) extends Runnable with Logging { import JobScheduler._ def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } } } } |
備註:
一、DT大數據夢工廠微信公衆號DT_Spark 二、IMF晚8點大數據實戰YY直播頻道號:68917580 三、新浪微博: http://www.weibo.com/ilovepains