


 緩存(cache)把 RDD 計算出來而後放在內存中,可是RDD 的依賴鏈(至關於數據庫中的redo 日誌),也不能丟掉,當某個點某個 executor 宕了,上面cache 的RDD就會丟掉,須要經過依賴鏈重放計算出來。不一樣的是,checkpoint是把 RDD 保存在 HDFS中, 是多副本可靠存儲,因此依賴鏈就能夠丟掉了,就斬斷了依賴鏈, 是經過複製實現的高容錯。數據庫


1) DAG中的Lineage過長,若是重算,則開銷太大(如在PageRank中)。app

2) 在寬依賴上作Checkpoint得到的收益更大。dom





RDD checkpoint 過程當中會通過如下幾個狀態:函數

[ Initialized → marked for checkpointing → checkpointing in progress → checkpointed ]oop










RDD 須要通過 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個階段才能被 checkpoint。this

Initialized: 首先 driver program 須要使用 rdd.checkpoint() 去設定哪些 rdd 須要 checkpoint,設定後,該 rdd 就接受 RDDCheckpointData 管理。用戶還要設定 checkpoint 的存儲路徑,通常在 HDFS 上。

marked for checkpointing:初始化後,RDDCheckpointData 會將 rdd 標記爲 MarkedForCheckpoint,這時候標記爲 Initialized 狀態。

checkpointing in progress:每一個 job 運行結束後會調用 finalRdd.doCheckpoint(),finalRdd 會順着 computing chain 回溯掃描,碰到要 checkpoint 的 RDD 就將其標記爲 CheckpointingInProgress,而後將寫磁盤(好比寫 HDFS)須要的配置文件(如 core-site.xml 等)broadcast 到其餘 worker 節點上的 blockManager。完成之後,啓動一個 job 來完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。

checkpointed:job 完成 checkpoint 後,將該 rdd 的 dependency 所有清掉, 怎麼清除依賴的呢, 就是把RDD 變量的強引用設置爲 null,垃圾回收了,會觸發 ContextCleaner 裏面的監聽,清除實際 BlockManager 緩存中的數據。並設定該 rdd 狀態爲 checkpointed。而後,爲該 rdd 強加一個依賴,設置該 rdd 的 parent rdd 爲 CheckpointRDD,該 CheckpointRDD 負責之後讀取在文件系統上的 checkpoint 文件,生成該 rdd 的 partition。



在 runJob() 的時候會先調用 finalRDD 的 partitions() 來肯定最後會有多個 task。rdd.partitions() 會去檢查(經過 RDDCheckpointData 去檢查,由於它負責管理被 checkpoint 過的 rdd)該 rdd 是會否被 checkpoint 過了,若是該 rdd 已經被 checkpoint 過了,直接返回該 rdd 的 partitions 也就是 Array[Partition]。

當調用 rdd.iterator() 去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition) 去查看該 rdd 是否被 checkpoint 過了,若是是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 負責讀取文件系統上的文件,生成該 rdd 的 partition。這就解釋了爲何那麼 trickly 地爲 checkpointed rdd 添加一個 parent CheckpointRDD。


    checkpoint 的機制保證了須要訪問重複數據的應用 Spark 的DAG執行行圖可能很龐大,task 中計算鏈可能會很長,這時若是 task 中途運行出錯,那麼 task 的整個須要重算很是耗時,所以,有必要將計算代價較大的 RDD checkpoint 一下,當下遊 RDD 計算出錯時,能夠直接從 checkpoint 過的 RDD 那裏讀取數據繼續算。
  • 下面來看一個關於checkpoint的例子:
    object testCheckpoint {
      def main(args: Array[String]): Unit = {
        val sc =new SparkContext(new SparkConf().setAppName("testCheckpoint").setMaster("local[*]"))
        val rdd=sc.textFile("file:///F:/spark/b.txt").flatMap{line=>line.split(" ")}.map(word=>(word,1)).reduceByKey(_+_)


  • checkpoint流程分析


    咱們能夠看到最早調用了SparkContextsetCheckpointDir 設置了一個checkpoint 目錄

       * Set the directory under which RDDs are going to be checkpointed. The directory must
       * be a HDFS path if running on a cluster.
      def setCheckpointDir(directory: String) {
        // If we are running on a cluster, log a warning if the directory is local.
        // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from
        // its own local file system, which is incorrect because the checkpoint files
        // are actually on the executor machines.
        if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) {
          logWarning("Spark is not running in local mode, therefore the checkpoint directory " +
            s"must not be on the local filesystem. Directory '$directory' " +
            "appears to be on the local filesystem.")
        checkpointDir = Option(directory).map { dir =>
          val path = new Path(dir, UUID.randomUUID().toString)
          val fs = path.getFileSystem(hadoopConfiguration)

    這個方法挺簡單的,就建立了一個目錄,接下來咱們看RDD核心的checkpoint 方法,跟進去

       * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
       * directory set with `SparkContext#setCheckpointDir` and all references to its parent
       * RDDs will be removed. This function must be called before any job has been
       * executed on this RDD. It is strongly recommended that this RDD is persisted in
       * memory, otherwise saving it on a file will require recomputation.
      def checkpoint(): Unit = RDDCheckpointData.synchronized {
        // NOTE: we use a global lock here due to complexities downstream with ensuring
        // children RDD partitions point to the correct parent partitions. In the future
        // we should revisit this consideration.
        if (context.checkpointDir.isEmpty) {
          throw new SparkException("Checkpoint directory has not been set in the SparkContext")
        } else if (checkpointData.isEmpty) {
          checkpointData = Some(new ReliableRDDCheckpointData(this))


     * An implementation of checkpointing that writes the RDD data to reliable storage.
     * This allows drivers to be restarted on failure with previously computed state.
    private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
      extends RDDCheckpointData[T](rdd) with Logging {


    *   RDD 須要通過
    *    [ Initialized  --> CheckpointingInProgress--> Checkpointed ] 
    *    這幾個階段才能被 checkpoint。
    private[spark] object CheckpointState extends Enumeration {
      type CheckpointState = Value
      val Initialized, CheckpointingInProgress, Checkpointed = Value
    private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
      extends Serializable {
      import CheckpointState._
      // The checkpoint state of the associated RDD.
      protected var cpState = Initialized
    RDD 須要通過
    [ Initialized --> CheckpointingInProgress--> Checkpointed ]
    這幾個階段才能被 checkpoint。


  • 咱們知道一個spark job運行最終會調用SparkContextrunJob方法將任務提交給Executor去執行,咱們來看runJob
    def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)


       * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
       * has completed (therefore the RDD has been materialized and potentially stored in memory).
       * doCheckpoint() is called recursively on the parent RDDs.
      private[spark] def doCheckpoint(): Unit = {
        RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
          if (!doCheckpointCalled) {
            doCheckpointCalled = true
            if (checkpointData.isDefined) {
              if (checkpointAllMarkedAncestors) {
                // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
                // them in parallel.
                // Checkpoint parents first because our lineage will be truncated after we
                // checkpoint ourselves
            } else {
    這個是一個遞歸,遍歷RDD依賴鏈條,當rdd是checkpointData不爲空時,調用checkpointDatacheckpoint()方法。還記得checkpointData類型是什麼嗎?就是RDDCheckpointData ,咱們來看它的checkpoint方法,如下
       * Materialize this RDD and persist its content.
       * This is called immediately after the first action invoked on this RDD has completed.
      final def checkpoint(): Unit = {
        // Guard against multiple threads checkpointing the same RDD by
        // atomically flipping the state of this RDDCheckpointData
        RDDCheckpointData.synchronized {
          if (cpState == Initialized) {
    //標記當前狀態爲 CheckpointingInProgress cpState
    = CheckpointingInProgress } else { return } } //這裏調用的是子類的 doCheckPoint() val newRDD = doCheckpoint() // Update our state and truncate the RDD lineage RDDCheckpointData.synchronized { cpRDD = Some(newRDD) cpState = Checkpointed rdd.markCheckpointed() } }



    • 咱們知道Task是spark運行任務的最小單元,當Task執行失敗的時候spark會從新計算,這裏Task進行計算的地方就是讀取checkpoint的入口。咱們能夠看一下ShuffleMapTask裏的計算方法runTask,以下
      override def runTask(context: TaskContext): MapStatus = {
          // Deserialize the RDD using the broadcast variable.
          val threadMXBean = ManagementFactory.getThreadMXBean
          val deserializeStartTime = System.currentTimeMillis()
          val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
          } else 0L
          val ser = SparkEnv.get.closureSerializer.newInstance()
          val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
            ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
          _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
          _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
            threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
          } else 0L
          var writer: ShuffleWriter[Any, Any] = null
          try {
            val manager = SparkEnv.get.shuffleManager
            writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
            writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
            writer.stop(success = true).get
          } catch {
            case e: Exception =>
              try {
                if (writer != null) {
                  writer.stop(success = false)
              } catch {
                case e: Exception =>
                  log.debug("Could not stop writer", e)
              throw e

      這是spark真正調用計算方法的邏輯runTask調用 rdd.iterator() 去計算該 rdd 的 partition 的,咱們來看RDD的iterator()

         * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
         * This should ''not'' be called by users directly, but is available for implementors of custom
         * subclasses of RDD.
        final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
          if (storageLevel != StorageLevel.NONE) {
            getOrCompute(split, context)
          } else {
            computeOrReadCheckpoint(split, context)
    • 這裏會繼續調用computeOrReadCheckpoint,咱們看該方法
         * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
        private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
          if (isCheckpointedAndMaterialized) {
            firstParent[T].iterator(split, context)
          } else {
            compute(split, context)
      當調用rdd.iterator()去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition)去查看該 rdd 是否被 checkpoint 過了,若是是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),不然直接調用該RDD的compute, 那麼咱們就跟進CheckpointRDDcompute
         * Read the content of the checkpoint file associated with the given partition.
        override def compute(split: Partition, context: TaskContext): Iterator[T] = {
          val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
          ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)


         * Read the content of the specified checkpoint file.
        def readCheckpointFile[T](
            path: Path,
            broadcastedConf: Broadcast[SerializableConfiguration],
            context: TaskContext): Iterator[T] = {
          val env = SparkEnv.get
          val fs = path.getFileSystem(broadcastedConf.value.value)
          val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
          val fileInputStream =, bufferSize)
          val serializer = env.serializer.newInstance()
          val deserializeStream = serializer.deserializeStream(fileInputStream)
          // Register an on-task-completion callback to close the input stream.
          context.addTaskCompletionListener(context => deserializeStream.close())

      CheckpointRDD 負責讀取文件系統上的文件,生成該 rdd 的 partition。這就解釋了爲何要爲調用了checkpoint的RDD 添加一個 parent CheckpointRDD的緣由。


