圖解Spark系列:Checkpoint高可用與容錯機制分析

CheckpointSpark中比較高級的功能,適合整個處理鏈特別長,轉換算子特別多的場景下。特別是對於那些運算時間長、須要較大運算量才能獲得的RDD數據,以及計算鏈過長或大量依賴其餘RDDRDD數據。由於對於這類複雜應用,處理過程當中可能會反覆調用某些RDD數據,一旦節點發生故障致使持久化數據丟失,則須要從新計算數據,致使整個計算耗時的上升,所以該機制可以有效提高Spark應用的容錯性。緩存

 

Checkpoint本質上是對於複雜RDD計算鏈中,相對核心的,可能因爲節點故障致使持久化數據丟失的數據,經過高可靠文件系統(例如HDFS)進行存儲來實現容錯與高可用性。ide

 

啓用Checkpoint能夠經過調用SparkContextsetCheckpointDir()方法,設置容錯文件系統目錄;而後經過RDD對象調用checkpoint()方法。當RDD對應的Job運行結束後,會啓動單獨Job將標記爲checkpointRDD數據寫入設置的文件系統中,從而實現高可用與持久化容錯。當發生節點故障致使數據丟失,此時仍然能夠從Checkpoint對應的文件系統中直接讀取數據,而再也不須要從新計算數據。性能

 

與上一章介紹的Cache緩存機制不一樣,當應用Cache緩存時,每計算一個須要緩存的分區數據會直接將其寫入內存中。而如上所述,Checkpoint會在Job結束以後再啓動新的Job執行持久化操做;也就意味着,若是此時數據沒有緩存過,則會從新再計算一次用於Checkpoint持久化,無疑從性能開銷上是不划算的。所以,建議在須要Checkpoint處理RDD上經過rdd.cache()rdd.persist(StorageLevel.DISK_ONLY)方法事先執行緩存,避免二次計算的性能開銷。spa

 

Checkpoint的實現過程包括4個階段:初始化、標記數據、Checkpoint進行中以及Checkpoint完成。對象

  • 初始化階段:應用程序調用rdd.checkpoint()標記哪些RDD須要Checkpoint,標記後則代表該RDDRDDCheckpointData管理。此外還須要設定Checkpoint存儲路徑(如HDFS)。blog

  • 標記數據階段:初始化以後RDDCheckpointData會將RDD數據逐一標記爲MarkedForCheckpoint內存

  • 持久化處理階段:每一個Job運行結束後會調用finalRdd.doCheckpoint()方法,該方法會沿着計算鏈回溯掃描直到發現須要CheckpointRDD數據爲止,將其標記爲CheckpointingInProgress並將持久化須要的配置廣播到其餘工做節點上的BlockManager。最後啓動單獨Job來完成Checkpoint操做寫入磁盤文件系統。it

  • Checkpointed完成階段Job執行完Checkpoint操做後,會將對應RDD的血緣關係所有清除,並將RDD狀態設置爲checkpointed。而後將該RDD的父RDD設置爲CheckpointRDDio

 

image.png

1Checkpoint執行過程class

 

如何讀取Checkpoint的數據?

讀取RDD數據(rdd.partitions()方法)會經過RDDCheckpointData檢查checkpointedRDD數據,若是獲取到則直接返回Array[Partition]。此外,當調用rdd.iterator()方法計算RDD的分區數據時,也會調用computeOrReadCheckpoint(split: Partition)查看該RDD作過Checkpoint;有則調用CheckpointRDD.iterator()方法直接讀取數據。

 

Cache機制與Checkpoint機制二者最大的差別在於血緣關係(lineages)。Cache機制緩存數據到內存(大機率)或磁盤(不多)中,但不會改變RDD數據的血緣關係,一旦節點故障致使持久化失敗,則會經過血緣關係從新計算RDD;而Checkpoint機制通常緩存數據到高可用的文件系統,如HDFS,並抹除RDD的血緣,將CheckpointRDD設置爲其父RDD

相關文章
相關標籤/搜索