private val batchTimeToInputInfos =
new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]
ssc.graph.clearMetadata : 基於outputStreams 清除 RDD,經過BlockManager清除Block數據緩存
jobScheduler.inputInfoTracker.cleanup : 基於inputInfoTracker清除緩存中的timeToAllocatedBlocks。架構
jobScheduler.inputInfoTracker.cleanup : 基於inputInfoTracker清除batchTimeToInputInfos中元數據框架
private def clearMetadata(time: Time) {
ssc.graph.clearMetadata(time) <- 核心之處
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) <- 核心之處
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
// checkpointing of this batch to complete.
val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration) <- 核心之處
jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration) <- 核心之處
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This default
* implementation clears the old generated RDDs. Subclasses of DStream may override
* this to clear their own metadata along with the generated RDDs.
private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys <- 核心之處
if (unpersistData) {
logDebug(s"Unpersisting old RDDs: ${oldRDDs.values.map(_.id).mkString(", ")}")
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false) <- 核心之處
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
logInfo(s"Removing blocks of RDD $b of time $time")
case _ =>
private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
env.blockManager.master.removeRdd(rddId, blocking)
* Clean up block information of old batches. If waitForCompletion is true, this method
* returns only after the files are cleaned up.
def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
require(cleanupThreshTime.milliseconds < clock.getTimeMillis())
val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
logInfo(s"Deleting batches: ${timesToCleanup.mkString(" ")}")
if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
timeToAllocatedBlocks --= timesToCleanup
writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
} else {
logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
def cleanup(batchThreshTime: Time): Unit = synchronized {
val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
logInfo(s"remove old batch metadata: ${timesToCleanup.mkString(" ")}")
batchTimeToInputInfos --= timesToCleanup
