Spark Streaming數據清理內幕完全解密

本講從二個方面闡述:微信

  1. 數據清理緣由和現象
  2. 數據清理代碼解析

 

Spark Core從技術研究的角度講 對Spark Streaming研究的完全,沒有你搞不定的Spark應用程序。數據結構

Spark Streaming一直在運行,不斷計算,每一秒中在不斷運行都會產生大量的累加器、廣播變量,因此須要對對象及app

元數據須要按期清理。每一個batch duration運行時不斷觸發job後須要清理rdd和元數據。Clinet模式jvm

能夠看到打印的日誌,從文件日誌也能夠看到清理日誌內容。ide

如今要看其背後的事情:oop

Spark運行在jvm上,jvm會產生對象,jvm須要對對象進行回收工做,若是源碼分析

咱們無論理gc(對象產生和回收),jvm很快耗盡。如今研究的是Spark Streaming的Spark GCpost

。Spark Streaming對rdd的數據管理、元數據管理至關jvm對gc管理。this

數據、元數據是操做DStream時產生的,數據、元數據的回收則須要研究DStream的產生和回收。spa

看下DStream的繼承結構:

接收數據靠InputDStream,數據輸入、數據操做、數據輸出,整個生命週期都是基於DStream構建的;得出結論:DStream負責rdd的生命週期,rrd是DStream產生的,對rdd的操做也是對DStream的操做,因此不斷產生batchDuration的循環,因此研究對rdd的操做也就是研究對DStream的操做。

源碼分析:

經過對DirectKafkaInputDStream 會產生kafkardd:
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
  val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
  val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
  // Report the record number and metadata of this batch interval to InputInfoTracker.
  val offsetRanges = currentOffsets.map { case (tp, fo) =>
    val uo = untilOffsets(tp)
    OffsetRange(tp.topic, tp.partition, fo, uo.offset)
  }
  val description = offsetRanges.filter { offsetRange =>
    // Don't display empty ranges.
    offsetRange.fromOffset != offsetRange.untilOffset
  }.map { offsetRange =>
    s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
      s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
  }.mkString("\n")
  // Copy offsetRanges to immutable.List to prevent from being modified by the user
  val metadata = Map(
    "offsets" -> offsetRanges.toList,
    StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
  val inputInfo = StreamInputInfo(id, rdd.count, metadata)
  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
  Some(rdd)
}

foreachRDD會觸發ForEachDStream:

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent        Parent DStream
 * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           by
`foreachFunc` will be displayed in the UI; only the scope and
 *                           callsite of
`DStream.foreachRDD` will be displayed.
 */
private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {
  override def dependencies: List[DStream[_]] = List(parent)
  override def slideDuration: Duration = parent.slideDuration
  override def compute(validTime: Time): Option[RDD[Unit]] = None
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

再看DStream源碼foreachRDD:

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * 'this' DStream will be registered as an output stream and therefore materialized.
 * @param foreachFunc foreachRDD function
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           in the
`foreachFunc` to be displayed in the UI. If `false`, then
 *                           only the scopes and callsites of
`foreachRDD` will override those
 *                           of the RDDs on the display.
 */
private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

 /**

 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if (isTimeValid(time)) {
      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }
      rddOption.foreach { case newRDD =>
        // Register the generated RDD for caching and checkpointing
        if (storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
        }
        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
        }
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else {
      None
    }
  }

DStream隨着時間進行,不斷在內存數據結構,generatorRDD中時間窗口和窗口下的rdd實例,

按照batchDuration存儲rdd以及刪除掉rdd的。有時候會調用DStream的cache操做,cache就是persist操做,實際上是對rdd的cache操做。

Rdd自己釋放,產生rdd有數據源和元數據,釋放rdd時山方面都須要考慮。數據週期性產生和週期性釋放,須要找到時鐘,須要找jobGenerator下的時鐘:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

根據時間發給eventloop,這邊receive的時候不斷的有generatorjobs產生:

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

短短几行代碼把整個做業的生命週期處理的清清楚楚。

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

看下clearMetadata方法:

/** Clear DStream metadata for the given `time`. */
private def clearMetadata(time: Time) {
  ssc.graph.clearMetadata(time)
  // If checkpointing is enabled, then checkpoint,
  // else mark batch to be fully processed
  if (shouldCheckpoint) {
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
  } else {
    // If checkpointing is not enabled, then delete metadata information about
    // received blocks (block data not saved in any case). Otherwise, wait for
    // checkpointing of this batch to complete.
    val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
 jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
    jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
    markBatchFullyProcessed(time)
  }
}

Inputinfotracker裏面是保存了元數據。

defclearMetadata(time: Time) {
  logDebug("Clearing metadata for time " + time)
  this.synchronized {
    outputStreams.foreach(_.clearMetadata(time))
  }
  logDebug("Cleared old metadata for time " + time)
}

清理完成後輸出日誌。

有不少類型數據輸出,先清理outputds的內容,有不一樣的outputds,其實就是foreachds。

繼續跟蹤ds類的清理方法:

/**
 * Clear metadata that are older than
`rememberDuration` of this DStream.
 * This is an internal method that should not be called directly. This default
 * implementation clears the old generated RDDs. Subclasses of DStream may override
 * this to clear their own metadata along with the generated RDDs.
 */
private[streaming] def clearMetadata(time: Time) {
  val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
  val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))//batchdration的倍數
  logDebug("Clearing references to old RDDs: [" +
    oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
  generatedRDDs --= oldRDDs.keys
  if (unpersistData) {
    logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
    oldRDDs.values.foreach { rdd =>
      rdd.unpersist(false)
      // Explicitly remove blocks of BlockRDD
      rdd match {
        case b: BlockRDD[_] =>
          logInfo("Removing blocks of RDD " + b + " of time " + time)
          b.removeBlocks()
        case _ =>
      }
    }
  }
  logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
    (time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
  dependencies.foreach(_.clearMetadata(time))
}

除了清理rdd還須要清理元數據。

隨着時間推移,不斷收到清理的消息,不用擔憂driver內存問題。

接下來須要刪除RDD:

/**
 * Remove the data blocks that this BlockRDD is made from. NOTE: This is an
 * irreversible operation, as the data in the blocks cannot be recovered back
 * once removed. Use it with caution.
 */
private[spark] def removeBlocks() {
  blockIds.foreach { blockId =>
    sparkContext.env.blockManager.master.removeBlock(blockId)
  }
  _isValid = false
}

基於rdd確定背blockmanager,須要刪除block的話須要告訴blockmanager master來作。

接下來須要處理depanedcied foreach須要把依賴的父ds都會被清理掉。

最後一個問題:清理是在何時被觸發的?

根據源碼分析,做業產生的jobGenerator類中有下面的方法:

/**
 * Callback called when a batch has been completely processed.
 */
def onBatchCompletion(time: Time) {
  eventLoop.post(ClearMetadata(time))
}
/**
 * Callback called when the checkpoint of a batch has been written.
 */
def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
  if (clearCheckpointDataLater) {
    eventLoop.post(ClearCheckpointData(time))
  }
}

 每一個batchDuration處理完成後都會被回調、發消息,checkpoint完成以後也會調用checkpointdata,須要從做業運行來分析:JobScheduler類下的jobHandler方法:private def processEvent(event: JobSchedulerEvent) {

  try {
    event match {
      case JobStarted(job, startTime) => handleJobStart(job, startTime)
      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
      case ErrorReported(m, e) => handleError(m, e)
    }
  } catch {
    case e: Throwable =>
      reportError("Error in job scheduler", e)
  }
}

 

private def handleJobCompletion(job: Job, completedTime: Long) {
  val jobSet = jobSets.get(job.time)
  jobSet.handleJobCompletion(job)
  job.setEndTime(completedTime)
  listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
  logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
  if (jobSet.hasCompleted) {
    jobSets.remove(jobSet.time)
    jobGenerator.onBatchCompletion(jobSet.time)
    logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
      jobSet.totalDelay / 1000.0, jobSet.time.toString,
      jobSet.processingDelay / 1000.0
    ))
    listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
  }
  job.result match {
    case Failure(e) =>
      reportError("Error running job " + job, e)
    case _ =>
  }
}

完成後調用onBatchCompletion:

/**
 * Callback called when a batch has been completely processed.
 */
def onBatchCompletion(time: Time) {
  eventLoop.post(ClearMetadata(time))
}

總結:

Spark Streaming在batchDuration處理完成後都會對產生的信息作清理,對輸出DStream清理、依賴關係進行清理、清理默認也會清理rdd數據信息、元數據清理。

感謝王家林老師的知識分享

Spark Streaming發行版筆記16

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

手機:18610086859

QQ:1740415547

郵箱:18610086859@vip.126.com

相關文章
相關標籤/搜索