Spark Streaming如何使用checkpoint容錯

在互聯網場景下,常常會有各類實時的數據處理,這種處理方式也就是流式計算,延遲一般也在毫秒級或者秒級,比較有表明性的幾個開源框架,分別是Storm,Spark Streaming和Filnk。 

倫理片 http://www.dotdy.com/ 
曾經在一個項目裏面用過阿里改造後的JStrom,總體感覺就是編程略複雜,在不使用Trident Api的時候是不能保證準確一次的數據處理的,可是能保證不丟數據,可是不保證數據重複,咱們在使用期間也出現過幾回問題,bolt或者worker重啓時候會致使大量數據重複計算,這個問無法解決,若是想解決就得使用Trident來保證,使用比較繁瑣。 



最近在作一個實時流計算的項目,採用的是Spark Steaming,主要是對接Spark方便,固然後續有機會也會嘗試很是具備潛力的Filnk,大體流程,就是消費kafka的數據,而後中間作業務上的一些計算,中間須要讀取redis,計算的結果會落地在Hbase中,Spark2.x的Streaming能保證準確一次的數據處理,經過spark自己維護kafka的偏移量,可是也須要啓用checkpoint來支持,由於你無法預料到可能出現的故障,好比斷電,系統故障,或者JVM崩潰等等。 

鑑於上面的種種可能,Spark Streaming須要經過checkpoint來容錯,以便於在任務失敗的時候能夠從checkpoint裏面恢復。 

在Spark Streaming裏面有兩種類型的數據須要作checkpoint: 

A :元數據信息checkpoint 主要是驅動程序的恢復 

(1)配置 構建streaming應用程序的配置 

(2)Dstream操做 streaming程序中的一系列Dstream操做 

(3)沒有完成的批處理 在運行隊列中的批處理可是沒有完成 

B:消費數據的checkpoint 

保存生成的RDD到一個可靠的存儲系統中,經常使用的HDFS,一般有狀態的數據橫跨多個batch流的時候,須要作checkpoint 


總結下: 

元數據的checkpoint是用來恢復當驅動程序失敗的場景下 
而數據自己或者RDD的checkpoint一般是用來容錯有狀態的數據處理失敗的場景 


大多數場景下沒有狀態的數據或者不重要的數據是不須要激活checkpoint的,固然這會面臨丟失少數數據的風險(一些已經消費了,可是沒有處理的數據) 


如何在代碼裏面激活checkpoint? 
 redis

Java代碼  收藏代碼編程

  1. // 經過函數來建立或者從已有的checkpoint裏面構建StreamingContext  
  2. def functionToCreateContext(): StreamingContext = {  
  3.   val ssc = new StreamingContext(...)   // new context  
  4.   val rdds = ssc.socketTextStream(...) // create DStreams  
  5.   ...  
  6.   ssc.checkpoint("/spark/kmd/checkpoint")   // 設置在HDFS上的checkpoint目錄  
  7.   //設置經過間隔時間,定時持久checkpoint到hdfs上  
  8.   rdds.checkpoint(Seconds(batchDuration*5))  
  9.     
  10.   rdds.foreachRDD(rdd=>{  
  11.   //能夠針對rdd每次調用checkpoint  
  12.   //注意上面設置了,定時持久checkpoint下面這個地方能夠不用寫  
  13.   rdd.checkpoint()  
  14.     
  15.   }  
  16.   )  
  17.   //返回ssc  
  18.   ssc  
  19. }  
  20.   
  21. def main(args:Array){  
  22. // 建立context  
  23. val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)  
  24. // 啓動流計算  
  25. context.start()  
  26. context.awaitTermination()  
  27. }  



啓動項目以後,咱們能在HDFS上看到對應目錄下面的checkpoint內容 
 


這裏有有兩個坑: 


(1)處理的邏輯必須寫在functionToCreateContext函數中,你要是直接寫在main方法中,在首次啓動後,kill關閉,再啓動就會報錯 

關閉命令 服務器

Java代碼  收藏代碼app

  1. yarn application -kill application_1482996264071_34284  


再次啓動後報錯信息 框架

Java代碼  收藏代碼socket

  1. has not been initialized when recovery from checkpoint  



解決方案:將邏輯寫在函數中,不要寫main方法中, 

(2)首次編寫Spark Streaming程序中,由於處理邏輯沒放在函數中,所有放在main函數中,雖然能正常運行,也能記錄checkpoint數據,可是再次啓動先報(1)的錯誤,而後你解決了,打包編譯從新上傳服務器運行,會發現依舊報錯,此次的錯誤和(1)不同: ide

Java代碼  收藏代碼函數

  1. xxxx classs ClassNotFoundException  


但令你疑惑的是明明打的jar包中包含了,這個類,上一次還能正常運行此次爲啥就不能了,問題就出在checkpoint上,由於checkpoint的元數據會記錄jar的序列化的二進制文件,由於你改動過代碼,而後從新編譯,新的序列化jar文件,在checkpoint的記錄中並不存在,因此就致使了上述錯誤,如何解決: 

也很是簡單,刪除checkpoint開頭的的文件便可,不影響數據自己的checkpoint oop

Java代碼  收藏代碼spa

  1. hadoop fs -rm /spark/kmd/check_point/checkpoint*  

而後再次啓動,發現一切ok,能從checkpoint恢復數據,而後kill掉又一次啓動  就能正常工做了。  最後注意的是,雖然數據可靠性獲得保障了,可是要謹慎的設置刷新間隔,這可能會影響吞吐量,由於每隔固定時間都要向HDFS上寫入checkpoint數據,spark streaming官方推薦checkpoint定時持久的刷新間隔通常爲批處理間隔的5到10倍是比較好的一個方式。

相關文章
相關標籤/搜索