Spark的checkpoint源碼講解

1、Checkpoint相關源碼分爲四個部分安全

一、Checkpoint的基本使用:spark_core   &   spark_streamingspa

二、初始化的源碼線程

三、Checkpoint的job生成及執行的過程3d

四、讀Checkpoint的過程rest

2、Checkpoint的基本使用blog

Checkpoint能夠是還原藥水。輔助Spark應用從故障中恢復。SparkStreaming宕機恢復,適合調度器有自動重試功能的。對於 SparkCore 則適合那些計算鏈條超級長或者計算耗時的
關鍵點進行 Checkpoint, 便於故障恢復 。
kafka


Checkpoint和persist從根本上是不同的:
源碼

  一、Cache or persist:it

    Cache or persist保存了RDD的血統關係,假若有部分cache的數據丟失能夠根據血緣關係從新生成。spark

  二、Checkpoint

    會將RDD數據寫到hdfs這種安全的文件系統裏面,而且拋棄了RDD血緣關係的記錄。即便persist存儲到了磁盤裏面,在driver停掉以後會被刪除,而checkpoint能夠被下次啓動使用。

 

Checkpoint基本使用

  對於spark_streaming的checkpoint:

    spark streaming有一個單獨的線程CheckpointWriteHandler,每generate一個batch interval的RDD數據都會觸發checkpoint操做。對於kafka的DirectKafkaInputDStreamCheckpointData,實質是重寫DStreamCheckpointData的update和restore方法,這樣checkpoint的數據就是topic,partition,fromOffset和untilOffset。更多請參考源碼例子RecoverableNetworkWordCount

  對於spark_core的checkpoint: 

  docheckpoint: 

     

       recover:

  

2、Checkpoint的初始化源碼

一、設置Checkpoint目錄

二、調用Checkpoint方法,構建checkpointData

 

3、DoCheckpoint源碼

在SparkContext的runjob方法中

進入以後

RDDCheckpointData中真正作Checkpoint返回一個新的RDD並清除掉依賴關係

ReliableRDDCheckpointData中真正進行Checkpoint操做

在該方法中

一、獲取sc

二、建立輸出目錄

三、以Job的方式進行Checkpoint操做

四、將分區策略寫入Checkpoint目錄

 

4、讀取Checkpoint數據

三個方法:

一、同一個Spark任務,共有了Checkpoint的RDD,在該RDD的iterator方法中

進入 computeOrReadCheckpoint

若是進行了 Checkpoint, 條件爲真firstParent[T].iterator(split, context)其中, firstParent

/** Returns the first parent RDD */

接着是獲取依賴

假如進行了Checkpoint,那麼CheckpointRDD就是存在

在初始化Checkpoint的時候,咱們已經初始化了CheckpointData了。

二、RDD的計算鏈條失敗,主動去讀Checkpoint文件的過程

這個要求咱們的入口類在下面這個包

三、SparkStreaming的故障恢復

首先,看一下SteamingContext的須要

而後去讀取Checkpoint

分兩個步驟:

A、獲取最新的Checkpoint目錄

B、迭代找到最新的Checkpoint就返回

最後就是使用獲取的Checkpoint去構建ssc

主要是作了一下動做

相關文章
相關標籤/搜索