Spark源碼分析之Checkpoint的過程

概述

checkpoint 的機制保證了須要訪問重複數據的應用 Spark 的DAG執行圖可能很龐大,task 中計算鏈可能會很長,這時若是 task 中途運行出錯,那麼 task 的整個須要重算很是耗時,所以,有必要將計算代價較大的 RDD checkpoint 一下,當下遊 RDD 計算出錯時,能夠直接從 checkpoint 過的 RDD 那裏讀取數據繼續算。apache

咱們先來看一個例子,checkpoint的使用:api

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object CheckPointTest { def main(args: Array[String]) { val sc: SparkContext = SparkContext.getOrCreate(new SparkConf().setAppName("ck").setMaster("local[2]")) sc.setCheckpointDir("/Users/kinge/ck") val rdd: RDD[(String, Int)] = sc.textFile("").map{x=>(x,1) }.reduceByKey(_+_) rdd.checkpoint() rdd.count() rdd.groupBy(x=>x._2).collect().foreach(println) } }

checkpoint流程分析

checkpoint初始化

咱們能夠看到最早調用了SparkContextsetCheckpointDir 設置了一個checkpoint 目錄
咱們跟進這個方法看一下dom

/** * Set the directory under which RDDs are going to be checkpointed. The directory must * be a HDFS path if running on a cluster. */ def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from // its own local file system, which is incorrect because the checkpoint files // are actually on the executor machines. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Checkpoint directory must be non-local " + "if Spark is running on a cluster: " + directory) } //利用hadoop的api建立了一個hdfs目錄 checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } }

這個方法挺簡單的,就建立了一個目錄,接下來咱們看RDD核心的checkpoint 方法,跟進去ide

def checkpoint(): Unit = RDDCheckpointData.synchronized { if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } }

這個方法沒有返回值,邏輯只有一個判斷,checkpointDir剛纔設置過了,不爲空,而後建立了一個ReliableRDDCheckpointData,咱們來看ReliableRDDCheckpointDataoop

/** * An implementation of checkpointing that writes the RDD data to reliable storage. * This allows drivers to be restarted on failure with previously computed state. */ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) extends RDDCheckpointData[T](rdd) with Logging { 。。。。。 }

這個ReliableRDDCheckpointData的父類RDDCheckpointData咱們再繼續看它的父類this

/** * RDD 須要通過 * [ Initialized --> CheckpointingInProgress--> Checkpointed ] * 這幾個階段才能被 checkpoint。 */ private[spark] object CheckpointState extends Enumeration { type CheckpointState = Value val Initialized, CheckpointingInProgress, Checkpointed = Value } private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) extends Serializable { import CheckpointState._ // The checkpoint state of the associated RDD. protected var cpState = Initialized 。。。。。。 }
RDD 須要通過
[ Initialized --> CheckpointingInProgress--> Checkpointed ]
這幾個階段才能被 checkpoint。
這類裏面有一個枚舉來標識CheckPoint的狀態,第一次初始化時是Initialized。
checkpoint這個一步已經完成了,回到咱們的RDD成員變量裏checkpointData這個變量指向的RDDCheckpointData的實例。
Checkpoint初始化時序圖:

checkpoint何時寫入數據

咱們知道一個spark job運行最終會調用SparkContextrunJob方法將任務提交給Executor去執行,咱們來看runJobatom

def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }

最後一行代碼調用了doCheckpoint,在dagScheduler將任務提交給集羣運行以後,我來看這個doCheckpoint方法spa

private[spark] def doCheckpoint(): Unit = { RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { checkpointData.get.checkpoint() } else { //遍歷依賴的rdd,調用每一個rdd的doCheckpoint方法 dependencies.foreach(_.rdd.doCheckpoint()) } } } }
這個是一個遞歸,遍歷RDD依賴鏈條,當rdd是checkpointData不爲空時,調用checkpointDatacheckpoint()方法。還記得checkpointData類型是什麼嗎?就是RDDCheckpointData ,咱們來看它的checkpoint方法,如下
final def checkpoint(): Unit = { // Guard against multiple threads checkpointing the same RDD by // atomically flipping the state of this RDDCheckpointData  RDDCheckpointData.synchronized { if (cpState == Initialized) { //一、標記當前狀態爲正在checkpoint中 cpState = CheckpointingInProgress } else { return } } //2 這裏調用的是子類的doCheckpoint() val newRDD = doCheckpoint() // 3 標記checkpoint已完成,清空RDD依賴  RDDCheckpointData.synchronized { cpRDD = Some(newRDD) cpState = Checkpointed rdd.markCheckpointed() } }

這個方法開始作checkpoint操做了,將doCheckpoint交給子類去實現checkpoint的邏輯,咱們去看子類怎麼實現doCheckpointdebug

protected override def doCheckpoint(): CheckpointRDD[T] = { // Create the output path for the checkpoint val path = new Path(cpDir) val fs = path.getFileSystem(rdd.context.hadoopConfiguration) if (!fs.mkdirs(path)) { throw new SparkException(s"Failed to create checkpoint path $cpDir") } //須要的配置文件(如 core-site.xml 等)broadcast 到其餘 worker 節點的 blockManager。  val broadcastedConf = rdd.context.broadcast( new SerializableConfiguration(rdd.context.hadoopConfiguration)) //向集羣提交一個Job去執行checkpoint操做,將RDD序列化到HDFS目錄上  rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _) // 爲該 rdd 生成一個新的依賴,設置該 rdd 的 parent rdd 爲 //CheckpointRDD,該 CheckpointRDD 負責之後讀取在文件系統上的 //checkpoint 文件,生成該 rdd 的 partition。 val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir) if (newRDD.partitions.length != rdd.partitions.length) { throw new SparkException( s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + s"number of partitions from original RDD $rdd(${rdd.partitions.length})") } // 是否清除checkpoint文件若是超出引用的資源範圍 if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) { rdd.context.cleaner.foreach { cleaner => cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id) } } logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") // 將新產生的RDD返回給父類  newRDD }

上面的代碼最終會返回新的CheckpointRDD ,父類將它賦值給成員變量cpRDD,最終標記當前狀態爲Checkpointed並清空當前RDD的依賴鏈。到此Checkpoint的數據就被序列化到HDFS上了。rest

 Checkpoint 寫數據時序圖

checkpoint何時讀取數據

咱們知道Task是spark運行任務的最小單元,當Task執行失敗的時候spark會從新計算,這裏Task進行計算的地方就是讀取checkpoint的入口。咱們能夠看一下ShuffleMapTask 裏的計算方法runTask,以下

override def runTask(context: TaskContext): MapStatus = { 。。。。。。。 try { val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) //調用rdd.iterator,迭代每一個partition裏的數據,計算並寫入磁盤 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }

這是spark真正調用計算方法的邏輯runTask調用 rdd.iterator() 去計算該 rdd 的 partition 的,咱們來看RDD的iterator()

final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }

這裏會繼續調用computeOrReadCheckpoint,咱們看該方法

** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
當調用rdd.iterator()去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition)去查看該 rdd 是否被 checkpoint 過了,若是是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),不然直接調用該RDD的compute, 那麼咱們就跟進CheckpointRDDcompute
/** * Read the content of the checkpoint file associated with the given partition. */ override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index)) ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context) }

這裏就兩行代碼,意思是從Path上讀取咱們的CheckPoint數據,看一下readCheckpointFile

/** * Read the content of the specified checkpoint file. */ def readCheckpointFile[T]( path: Path, broadcastedConf: Broadcast[SerializableConfiguration], context: TaskContext): Iterator[T] = { val env = SparkEnv.get // 用hadoop API 讀取HDFS上的數據 val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => deserializeStream.close()) //反序列化數據後轉換爲一個Iterator deserializeStream.asIterator.asInstanceOf[Iterator[T]]

CheckpointRDD 負責讀取文件系統上的文件,生成該 rdd 的 partition。這就解釋了爲何要爲調用了checkpoint的RDD 添加一個 parent CheckpointRDD的緣由。
到此,整個checkpoint的流程就結束了。

Checkpoint 讀取數據時序圖

相關文章
相關標籤/搜索