1、Checkpoint相關源碼分爲四個部分安全
一、Checkpoint的基本使用:spark_core & spark_streamingspa
二、初始化的源碼線程
三、Checkpoint的job生成及執行的過程3d
四、讀Checkpoint的過程rest
2、Checkpoint的基本使用blog
Checkpoint能夠是還原藥水。輔助Spark應用從故障中恢復。SparkStreaming宕機恢復,適合調度器有自動重試功能的。對於 SparkCore 則適合那些計算鏈條超級長或者計算耗時的
關鍵點進行 Checkpoint, 便於故障恢復 。kafka
Checkpoint和persist從根本上是不同的:
源碼
一、Cache or persist:it
Cache or persist保存了RDD的血統關係,假若有部分cache的數據丟失能夠根據血緣關係從新生成。spark
二、Checkpoint
會將RDD數據寫到hdfs這種安全的文件系統裏面,而且拋棄了RDD血緣關係的記錄。即便persist存儲到了磁盤裏面,在driver停掉以後會被刪除,而checkpoint能夠被下次啓動使用。
Checkpoint基本使用
對於spark_streaming的checkpoint:
spark streaming有一個單獨的線程CheckpointWriteHandler,每generate一個batch interval的RDD數據都會觸發checkpoint操做。對於kafka的DirectKafkaInputDStreamCheckpointData,實質是重寫DStreamCheckpointData的update和restore方法,這樣checkpoint的數據就是topic,partition,fromOffset和untilOffset。更多請參考源碼例子RecoverableNetworkWordCount
對於spark_core的checkpoint:
docheckpoint:
recover:
2、Checkpoint的初始化源碼
一、設置Checkpoint目錄
二、調用Checkpoint方法,構建checkpointData
3、DoCheckpoint源碼
在SparkContext的runjob方法中
進入以後
RDDCheckpointData中真正作Checkpoint返回一個新的RDD並清除掉依賴關係
ReliableRDDCheckpointData中真正進行Checkpoint操做
在該方法中
一、獲取sc
二、建立輸出目錄
三、以Job的方式進行Checkpoint操做
四、將分區策略寫入Checkpoint目錄
4、讀取Checkpoint數據
三個方法:
一、同一個Spark任務,共有了Checkpoint的RDD,在該RDD的iterator方法中
進入 computeOrReadCheckpoint
若是進行了 Checkpoint, 條件爲真firstParent[T].iterator(split, context)其中, firstParent 爲
/** Returns the first parent RDD */
接着是獲取依賴
假如進行了Checkpoint,那麼CheckpointRDD就是存在
在初始化Checkpoint的時候,咱們已經初始化了CheckpointData了。
二、RDD的計算鏈條失敗,主動去讀Checkpoint文件的過程
這個要求咱們的入口類在下面這個包
三、SparkStreaming的故障恢復
首先,看一下SteamingContext的須要
而後去讀取Checkpoint
分兩個步驟:
A、獲取最新的Checkpoint目錄
B、迭代找到最新的Checkpoint就返回
最後就是使用獲取的Checkpoint去構建ssc
主要是作了一下動做