轉發請註明原創地址 http://www.cnblogs.com/dongxiao-yang/p/7994357.htmlhtml
spark-streaming定時對 DStreamGraph
和 JobScheduler
作 Checkpoint,來記錄整個 DStreamGraph
的變化和每一個 batch 的 job 的完成狀況,Checkpoint 發起的間隔默認的是和 batchDuration 一致;即每次 batch 發起、提交了須要運行的 job 後就作 Checkpoint。另外在 job 完成了更新任務狀態的時候再次作一下 Checkpoint。git
job生成github
private def generateJobs(time: Time) { // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
job 完成架構
private def clearMetadata(time: Time) { ssc.graph.clearMetadata(time) // If checkpointing is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) } else { // If checkpointing is not enabled, then delete metadata information about // received blocks (block data not saved in any case). Otherwise, wait for // checkpointing of this batch to complete. val maxRememberDuration = graph.getMaxInputStreamRememberDuration() jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration) jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration) markBatchFullyProcessed(time) } }
上文裏面的eventLoop是JobGenerator內部的一個消息事件隊列的封裝,eventLoop內部會有一個後臺線程不斷的去消費事件,因此DoCheckpoint這種類型的事件會通過processEvent ->app
doCheckpoint 由checkpointWriter把生成的Checkpoint對象寫到外部存儲:ide
/** Processes all events */ private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) } } /** Perform checkpoint for the give `time`. */ private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) } }
doCheckpoint在調用checkpointWriter寫數據到hdfs以前,首先會運行一下ssc.graph.updateCheckpointData(time),這個方法的主要做用是更新DStreamGraph裏全部input和output stream對應的checkpointData屬性,調用鏈路爲DStreamGraph.updateCheckpointData -> Dstream.updateCheckpointData -> checkpointData.updateoop
def updateCheckpointData(time: Time) { logInfo("Updating checkpoint data for time " + time) this.synchronized { outputStreams.foreach(_.updateCheckpointData(time)) } logInfo("Updated checkpoint data for time " + time) } private[streaming] def updateCheckpointData(currentTime: Time) { logDebug(s"Updating checkpoint data for time $currentTime") checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug(s"Updated checkpoint data for time $currentTime: $checkpointData") } private[streaming] class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) { def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = { data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]] } override def update(time: Time): Unit = { batchForTime.clear() generatedRDDs.foreach { kv => val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray batchForTime += kv._1 -> a } } override def cleanup(time: Time): Unit = { } override def restore(): Unit = { batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") generatedRDDs += t -> new KafkaRDD[K, V]( context.sparkContext, executorKafkaParams, b.map(OffsetRange(_)), getPreferredHosts, // during restore, it's possible same partition will be consumed from multiple // threads, so dont use cache false ) } } }
以DirectKafkaInputDStream爲例,代碼裏重寫了checkpointData的update等接口,因此DirectKafkaInputDStream會在checkpoint以前把正在運行的kafkaRDD對應的topic,partition,fromoffset,untiloffset所有存儲到checkpointData裏面data這個HashMap的屬性當中,用於寫checkpoint時進行序列化。post
一個checkpoint裏面包含的對象列表以下:ui
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) extends Logging with Serializable { val master = ssc.sc.master val framework = ssc.sc.appName val jars = ssc.sc.jars val graph = ssc.graph val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val sparkConfPairs = ssc.conf.getAll
spark-streaming啓用checkpoint代碼裏的StreamingContext必須嚴格按照官方demo實例的架構使用,即全部的streaming邏輯都放在一個返回StreamingContext的createContext方法上,this
經過StreamingContext.getOrCreate方法進行初始化,在CheckpointReader.read找到checkpoint文件而且成功反序列化出checkpoint對象後,返回一個包含該checkpoint對象的StreamingContext,這個時候程序裏的createContext就不會被調用,反之若是程序是第一次啓動會經過createContext初始化StreamingContext
def getOrCreate( checkpointPath: String, creatingFunc: () => StreamingContext, hadoopConf: Configuration = SparkHadoopUtil.get.conf, createOnError: Boolean = false ): StreamingContext = { val checkpointOption = CheckpointReader.read( checkpointPath, new SparkConf(), hadoopConf, createOnError) checkpointOption.map(new StreamingContext(null, _, null)).getOrElse(creatingFunc()) } def read( checkpointDir: String, conf: SparkConf, hadoopConf: Configuration, ignoreReadError: Boolean = false): Option[Checkpoint] = { val checkpointPath = new Path(checkpointDir) val fs = checkpointPath.getFileSystem(hadoopConf) // Try to find the checkpoint files val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse if (checkpointFiles.isEmpty) { return None } // Try to read the checkpoint files in the order logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}") var readError: Exception = null checkpointFiles.foreach { file => logInfo(s"Attempting to load checkpoint from file $file") try { val fis = fs.open(file) val cp = Checkpoint.deserialize(fis, conf) logInfo(s"Checkpoint successfully loaded from file $file") logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}") return Some(cp) } catch { case e: Exception => readError = e logWarning(s"Error reading checkpoint from file $file", e) } } // If none of checkpoint files could be read, then throw exception if (!ignoreReadError) { throw new SparkException( s"Failed to read checkpoint from directory $checkpointPath", readError) } None } }
在從checkpoint恢復的過程當中DStreamGraph由checkpoint恢復,下文的代碼調用路徑StreamingContext.graph->DStreamGraph.restoreCheckpointData -> DStream.restoreCheckpointData->checkpointData.restore
private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { _cp.graph.setContext(this) _cp.graph.restoreCheckpointData() _cp.graph } else { require(_batchDur != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(_batchDur) newGraph } } def restoreCheckpointData() { logInfo("Restoring checkpoint data") this.synchronized { outputStreams.foreach(_.restoreCheckpointData()) } logInfo("Restored checkpoint data") } private[streaming] def restoreCheckpointData() { if (!restoredFromCheckpointData) { // Create RDDs from the checkpoint data logInfo("Restoring checkpoint data") checkpointData.restore() dependencies.foreach(_.restoreCheckpointData()) restoredFromCheckpointData = true logInfo("Restored checkpoint data") } } override def restore(): Unit = { batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) => logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}") generatedRDDs += t -> new KafkaRDD[K, V]( context.sparkContext, executorKafkaParams, b.map(OffsetRange(_)), getPreferredHosts, // during restore, it's possible same partition will be consumed from multiple // threads, so dont use cache false ) } }
仍然以DirectKafkaInputDStreamCheckpointData爲例,這個方法從上文保存的checkpoint.data對象裏獲取RDD運行時的對應信息恢復出中止時的KafkaRDD。
private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart time val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failure val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) logInfo("Batches pending processing (" + pendingTimes.length + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime } .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.length + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => // Allocate the related blocks when recovering from failure, because some blocks that were // added but not allocated, are dangling in the queue after recovering, we have to allocate // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) } // Restart the timer timer.start(restartTime.milliseconds) logInfo("Restarted JobGenerator at " + restartTime) }
最後,在restart的過程當中,JobGenerator經過當前時間和上次程序中止的時間計算出程序重啓過程當中共有多少batch沒有生成,加上上一次程序死掉的過程當中有多少正在運行的job,所有
進行Reschedule,補跑任務。
參考文檔