SparkStreaming不間斷運行模式下的流式數據清理機制源碼深度剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 不掃一屋何以掃天下

  • SparkStreaming 應用在持續不斷的運行,假設Spark 數據接入只進不出,那麼即便Spark內存使用多麼牛逼,也會崩掉的,所以及時進行內存數據的清理和磁盤的清理可謂重中之重。
  • 那麼SparkStreaming應用的對象,數據,元數據這些信息如何進行回收呢?先拋個問題。

2 什麼時候掃天下?

2.1 jobScheduler之job的提交到結束

  • JobGenerator觸發generateJobs
  • JobGenerator -> jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
  • submitJobSet -> jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  • jobScheduler -> _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
  • jobScheduler -> handleJobCompletion(job, completedTime)
  • jobScheduler -> jobGenerator.onBatchCompletion(jobSet.time)
  • jobGenerator -> eventLoop.post(ClearMetadata(time))

2.2 clearMetadata 神龍見首不見尾

  • 主要的緩存元數據
    private val batchTimeToInputInfos =
      new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
      
     private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]    
    複製代碼
  • ssc.graph.clearMetadata : 基於outputStreams 清除 RDD,經過BlockManager清除Block數據緩存

  • jobScheduler.inputInfoTracker.cleanup : 基於inputInfoTracker清除緩存中的timeToAllocatedBlocks。架構

  • jobScheduler.inputInfoTracker.cleanup : 基於inputInfoTracker清除batchTimeToInputInfos中元數據框架

    private def clearMetadata(time: Time) {
    
        ssc.graph.clearMetadata(time)               <- 核心之處
    
        // If checkpointing is enabled, then checkpoint,
        // else mark batch to be fully processed
        if (shouldCheckpoint) {
          eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))  <- 核心之處
    
        } else {
          // If checkpointing is not enabled, then delete metadata information about
          // received blocks (block data not saved in any case). Otherwise, wait for
          // checkpointing of this batch to complete.
          val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
    
          jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)  <- 核心之處
          jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)    <- 核心之處
          markBatchFullyProcessed(time)
        }
      } 
    複製代碼

2.3 ssc.graph.clearMetadata 之RDD再見

* Clear metadata that are older than `rememberDuration` of this DStream.
   * This is an internal method that should not be called directly. This default
   * implementation clears the old generated RDDs. Subclasses of DStream may override
   * this to clear their own metadata along with the generated RDDs.
   
  private[streaming] def clearMetadata(time: Time) {
    val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
    val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
    logDebug("Clearing references to old RDDs: [" +
      oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
      
    generatedRDDs --= oldRDDs.keys        <- 核心之處
    
    if (unpersistData) {
      logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
      oldRDDs.values.foreach { rdd =>
      
        rdd.unpersist(false)               <- 核心之處
        
        // Explicitly remove blocks of BlockRDD
        rdd match {
          case b: BlockRDD[_] =>
            logInfo(s"Removing blocks of RDD $b of time $time")
            b.removeBlocks()
          case _ =>
        }
      }
    }
複製代碼
RDD清除細節
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
    env.blockManager.master.removeRdd(rddId, blocking)
    persistentRdds.remove(rddId)
    listenerBus.post(SparkListenerUnpersistRDD(rddId))
  }
複製代碼

2.4 cleanupOldBlocksAndBatches 之 batch數據再見

* Clean up block information of old batches. If waitForCompletion is true, this method
   * returns only after the files are cleaned up.
   
  def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
    require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
    
    val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
    logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
    
    if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
      timeToAllocatedBlocks --= timesToCleanup
      writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
    } else {
      logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
    }
  }
複製代碼

2.5 cleanupOldBlocksAndBatches 之 batch info 元數據再見

def cleanup(batchThreshTime: Time): Unit = synchronized {
    val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
    logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
    batchTimeToInputInfos --= timesToCleanup
  }
複製代碼

3 總結

不掃一屋何以掃天下 終章ide

秦凱新 於深圳 1:13 2018oop

相關文章
相關標籤/搜索