Spark Streaming源碼解讀之數據清理 內幕

前文已經講解很清晰,Spark Streaming是經過定時器按照DStream 的DAG 回溯出整個RDD的DAG。app

細心的讀者必定有一個疑問,隨着時間的推移,生產愈來愈多的RDD,SparkStreaming是如何保證RDD的生命週期的呢?oop

咱們直接快進到JobGenerator中,post

交由JobHandler執行,JobHandler是一個Runnable接口this

spa

我們已經閱讀過JobStarted事件,繼續往下看。scala

// JobScheduler.scala line 202 spark 1.6.0
    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from $batchLinkText""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // 以前已經解析過。
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) // 本講的重點在此。
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }

清理RDD日誌

清理Checkpointcode

// Checkpoint.scala line 196
    def run() {
      // ... 代碼... 
          jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
          return
        } catch {
          case ioe: IOException =>
            logWarning("Error in attempt " + attempts + " of writing checkpoint to "
              + checkpointFile, ioe)
            reset()
        }
      }
      logWarning("Could not write checkpoint for time " + checkpointTime + " to file "
        + checkpointFile + "'")
    }

清理ReceiverTracker的block和batchorm

清理InputInfo對象

 

至此,全部的清理工做已經完成。

 

總結下:

JVM中對不使用的對象有GC,Spark Streaming中也是如此。

清理對象以下:

  1. Job
  2. RDD
  3. InputInfo
  4. Block 和 batch
  5. checkpoint的數據
  6. WAL的數據

 

最後配上日誌截圖:

相關文章
相關標籤/搜索