關於SparkStreaming的checkpoint的弊端

框架版本html

spark2.1.0mysql

kafka0.9.0.0git

當使用sparkstreaming處理流式數據的時候,它的數據源搭檔大部分都是Kafka,尤爲是在互聯網公司頗爲常見。 當他們集成的時候咱們須要重點考慮就是若是程序發生故障,或者升級重啓,或者集羣宕機,它究竟可否作到數據不丟不重呢?github

也就是一般咱們所說的高可靠和穩定性,一般框架裏面都帶有不一樣層次的消息保證機制,通常來講有三種就是:redis

at most once 最多一次
at least once 最少一次
exactly once  準確一次

在storm裏面是經過ack和Trident,在sparkstreaming裏面,若是是1.3版本以前是經過Receiver方式讀取kafka數據,1.3以後經過Direct Approach方式直接讀取kafka的數據,直接分配每一個Batch及RDD最新的Topic partition offset,任務運行後使用kafka的Simple Consumer API去獲取那一段的offset的數據,這樣的好處是避免了原來Receiver接受數據宕機帶來的數據可靠性風險,至關於原來的數據是在內存中而如今的數據是在kafka的磁盤中,經過偏移量可隨時再次消費數據,從而實現了數據的Exactly Once處理,此外還有個不一樣之處在於1.3以後,使用的checkpoint保存當前消費的kafka的offset,而以前用zk保存的,這就是今天這篇文章重點吐槽的地方。sql

在sparkstreaming如何作到數據不丟失呢?apache

(1)使用checkpoint (2)本身維護kafka偏移量微信

checkpoint配合kafka可以在特定環境下保證不丟不重,注意爲何要加上特定環境呢,這裏有一些坑,checkpoint是對sparkstreaming運行過程當中的元數據和 每次rdds的數據狀態保存到一個持久化系統中,固然這裏面也包含了offset,通常是HDFS,S3,若是程序掛了,或者集羣掛了,下次啓動仍然可以從checkpoint中恢復,從而作到生產環境的7*24高可用。app

可是checkpoint的最大的弊端在於,一旦你的流式程序代碼或配置改變了,或者更新迭代新功能了,這個時候,你先停舊的sparkstreaming程序,而後新的程序打包編譯後執行運行,會發現兩種狀況: (1)啓動報錯,反序列化異常 (2)啓動正常,可是運行的代碼仍然是上一次的程序的代碼。框架

爲何會出現上面的兩種狀況,這是由於checkpoint第一次持久化的時候會把整個相關的jar給序列化成一個二進制文件,每次重啓都會從裏面恢復,可是當你新的 程序打包以後序列化加載的仍然是舊的序列化文件,這就會致使報錯或者依舊執行舊代碼。有的同窗可能會說,既然如此,直接把上次的checkpoint刪除了,不就能啓動了嗎? 確實是能啓動,可是一旦你刪除了舊的checkpoint,新啓動的程序,只能從kafka的smallest或者largest的偏移量消費,默認是從最新的,若是是最新的,而不是上一次程序中止的那個偏移量 就會致使有數據丟失,若是是老的,那麼就會致使數據重複。無論怎麼樣搞,都有問題。 https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#upgrading-application-code

針對這種問題,spark官網給出了2種解決辦法:

(1)舊的不停機,新的程序繼續啓動,兩個程序並存一段時間消費。 評價:仍然有丟重複消費的可能 (2)停機的時候,記錄下最後一次的偏移量,而後新恢復的程序讀取這個偏移量繼續工做,從而達到不丟消息。 評價:官網沒有給出具體怎麼操做,只是給了個思路

第二種思路是正確的,但還須要本身維護一個offset狀態,這樣以來checkpoint這個功能只能在程序寫好以後不容許再次變更,但能夠重啓的狀況保證高可靠。

但實際狀況是大多數公司的代碼都會頻繁迭代和升級,與checkpoint恰好相悖,這樣以來checkpoint的做用便顯的有點沒用了,既然仍是須要本身維護offset狀態, 那麼不用checkpoint也罷,徹底本身維護offset狀態到zk中便可。因此果斷棄用checkpoint,採用本身維護offset。其原理以下:

首次啓動,先從zk中找是否有上次存儲的偏移量,若是沒有就從最新的消費,而後保存偏移量至zk中

若是從zk中找到了偏移量,那麼就從指定的偏移量處開始消費處理,每一個批處理處理完畢後,都會更新新的offset到zk中, 這樣以來不管是程序故障,仍是宕機,再次啓動後都會從上次的消費的偏移量處繼續開始消費,並且程序的升級或功能改動新版本的發佈都能正常運行 並作到了消息不丟。

須要注意的是,雖然上游可以作到準確一次的消費,可是下游的落地存儲輸出,好比寫入hbase,redis,mysql,es等等若是失敗了,整條消息依舊會失敗,這個徹底要靠本身的設計了,要麼記錄log,針對特定數據記錄,若是失敗按期 從新打入kafka走程序恢復或者手動恢復。

或者設計存儲的時候,有複合主鍵,把偏移量提早,就算重複消費,但主鍵同樣,最終只會有一條數據落地,這個要分場景和具體業務結合使用了。

回到主題,本身維護kafka的offset狀態,如何作? github上已經有大神貢獻了,咱們只須要拿過來稍加改動便可,使用本身維護的offset以後,就沒有必要再使用 checkpoint,github鏈接以下,有興趣的朋友能夠了解下:

https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala

使用zk維護offset也是比較不錯的選擇,若是將checkpoint存儲在HDFS上,每隔幾秒都會向HDFS上進行一次寫入操做並且大部分都是小文件,且不說寫入性能怎麼樣,就小文件過多,對整個Hadoop集羣都不太友好。由於只記錄偏移量信息,因此數據量很是小,zk做爲一個分佈式高可靠的的內存文件系統,很是適合這種場景。

全部參考連接:

http://aseigneurin.github.io/

http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html

http://why-not-learn-something.blogspot.jp/2016/08/upgrading-running-spark-streaming.html

http://www.binwang.me/2015-11-03-the-proper-way-to-use-spark-checkpoint.html

https://github.com/cpbaranwal/Spark-Streaming-DirectKafka-Examples/blob/master/src/main/scala/CustomDirectKafkaExample.scala

https://github.com/ippontech/spark-kafka-source

有什麼問題能夠掃碼關注微信公衆號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。

輸入圖片說明

相關文章
相關標籤/搜索