前文已經講解很清晰,Spark Streaming是經過定時器按照DStream 的DAG 回溯出整個RDD的DAG。app
細心的讀者必定有一個疑問,隨着時間的推移,生產愈來愈多的RDD,SparkStreaming是如何保證RDD的生命週期的呢?oop
咱們直接快進到JobGenerator中,post
交由JobHandler執行,JobHandler是一個Runnable接口this
,spa
我們已經閱讀過JobStarted事件,繼續往下看。scala
// JobScheduler.scala line 202 spark 1.6.0 def run() { try { val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]" ssc.sc.setJobDescription( s"""Streaming job from $batchLinkText""") ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) // We need to assign `eventLoop` to a temp variable. Otherwise, because // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // 以前已經解析過。 // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } _eventLoop = eventLoop if (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) // 本講的重點在此。 } } else { // JobScheduler has been stopped. } } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } }
清理RDD日誌
清理Checkpointcode
// Checkpoint.scala line 196 def run() { // ... 代碼... jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater) return } catch { case ioe: IOException => logWarning("Error in attempt " + attempts + " of writing checkpoint to " + checkpointFile, ioe) reset() } } logWarning("Could not write checkpoint for time " + checkpointTime + " to file " + checkpointFile + "'") }
清理ReceiverTracker的block和batchorm
清理InputInfo對象
至此,全部的清理工做已經完成。
總結下:
JVM中對不使用的對象有GC,Spark Streaming中也是如此。
清理對象以下:
最後配上日誌截圖: