Spark Streaming如何使用checkpoint容錯

在互聯網場景下,常常會有各類實時的數據處理,這種處理方式也就是流式計算,延遲一般也在毫秒級或者秒級,比較有表明性的幾個開源框架,分別是Storm,Spark Streaming和Filnk。html

曾經在一個項目裏面用過阿里改造後的JStrom,總體感覺就是編程略複雜,在不使用Trident Api的時候是不能保證準確一次的數據處理的,可是能保證不丟數據,可是不保證數據重複,咱們在使用期間也出現過幾回問題,bolt或者worker重啓時候會致使大量數據重複計算,這個問無法解決,若是想解決就得使用Trident來保證,使用比較繁瑣。git

最近在作一個實時流計算的項目,採用的是Spark Steaming,主要是對接Spark方便,固然後續有機會也會嘗試很是具備潛力的Filnk,大體流程,就是消費kafka的數據,而後中間作業務上的一些計算,中間須要讀取redis,計算的結果會落地在Hbase中,Spark2.x的Streaming能保證準確一次的數據處理,經過spark自己維護kafka的偏移量,可是也須要啓用checkpoint來支持,由於你無法預料到可能出現的故障,好比斷電,系統故障,或者JVM崩潰等等。github

鑑於上面的種種可能,Spark Streaming須要經過checkpoint來容錯,以便於在任務失敗的時候能夠從checkpoint裏面恢復。redis

在Spark Streaming裏面有兩種類型的數據須要作checkpoint:apache

A :元數據信息checkpoint 主要是驅動程序的恢復編程

(1)配置 構建streaming應用程序的配置服務器

(2)Dstream操做 streaming程序中的一系列Dstream操做微信

(3)沒有完成的批處理 在運行隊列中的批處理可是沒有完成app

B:消費數據的checkpoint框架

保存生成的RDD到一個可靠的存儲系統中,經常使用的HDFS,一般有狀態的數據橫跨多個batch流的時候,須要作checkpoint

總結下:

元數據的checkpoint是用來恢復當驅動程序失敗的場景下 而數據自己或者RDD的checkpoint一般是用來容錯有狀態的數據處理失敗的場景

大多數場景下沒有狀態的數據或者不重要的數據是不須要激活checkpoint的,固然這會面臨丟失少數數據的風險(一些已經消費了,可是沒有處理的數據)

如何在代碼裏面激活checkpoint?

// 經過函數來建立或者從已有的checkpoint裏面構建StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val rdds = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint("/spark/kmd/checkpoint")   // 設置在HDFS上的checkpoint目錄
  //設置經過間隔時間,定時持久checkpoint到hdfs上
  rdds.checkpoint(Seconds(batchDuration*5))
  
  rdds.foreachRDD(rdd=>{
  //能夠針對rdd每次調用checkpoint
  //注意上面設置了,定時持久checkpoint下面這個地方能夠不用寫
  rdd.checkpoint()
  
  }
  )
  //返回ssc
  ssc
}

def main(args:Array){
// 建立context
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 啓動流計算
context.start()
context.awaitTermination()
}

啓動項目以後,咱們能在HDFS上看到對應目錄下面的checkpoint內容 image

這裏有有兩個坑:

(1)處理的邏輯必須寫在functionToCreateContext函數中,你要是直接寫在main方法中,在首次啓動後,kill關閉,再啓動就會報錯

關閉命令

yarn application -kill application_1482996264071_34284

再次啓動後報錯信息

has not been initialized when recovery from checkpoint

解決方案:將邏輯寫在函數中,不要寫main方法中,

(2)首次編寫Spark Streaming程序中,由於處理邏輯沒放在函數中,所有放在main函數中,雖然能正常運行,也能記錄checkpoint數據,可是再次啓動先報(1)的錯誤,而後你解決了,打包編譯從新上傳服務器運行,會發現依舊報錯,此次的錯誤和(1)不同:

xxxx classs ClassNotFoundException

但令你疑惑的是明明打的jar包中包含了,這個類,上一次還能正常運行此次爲啥就不能了,問題就出在checkpoint上,由於checkpoint的元數據會記錄jar的序列化的二進制文件,由於你改動過代碼,而後從新編譯,新的序列化jar文件,在checkpoint的記錄中並不存在,因此就致使了上述錯誤,如何解決:

也很是簡單,刪除checkpoint開頭的的文件便可,不影響數據自己的checkpoint

hadoop fs -rm /spark/kmd/check_point/checkpoint*

而後再次啓動,發現一切ok,能從checkpoint恢復數據,而後kill掉又一次啓動 就能正常工做了。

最後注意的是,雖然數據可靠性獲得保障了,可是要謹慎的設置刷新間隔,這可能會影響吞吐量,由於每隔固定時間都要向HDFS上寫入checkpoint數據,spark streaming官方推薦checkpoint定時持久的刷新間隔通常爲批處理間隔的5到10倍是比較好的一個方式。

參考連接:

https://issues.apache.org/jira/browse/SPARK-6770

http://www.jianshu.com/p/807b0767953a

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

有什麼問題能夠掃碼關注微信公衆號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明

相關文章
相關標籤/搜索