最近工做有點忙,因此更新文章頻率低了點,在這裏給你們說聲抱歉,前面已經寫過在spark streaming中管理offset,但當時只知道怎麼用,並非很瞭解爲什麼要那樣用,最近一段時間又抽空看了一個github開源程序本身管理offset的源碼,基本已經理解透徹了,固然這裏面還包含了因爲理解不透徹致使升級失敗的一個案例,這個在下篇文章會分享出來。本篇咱們先從理論的角度聊聊在Spark Streaming集成Kafka時的offset狀態如何管理。git
spark streaming 版本 2.1github
kafka 版本0.9.0.0微信
在這以前,先重述下spark streaming裏面管理偏移量的策略,默認的spark streaming它自帶管理的offset的方式是經過checkpoint來記錄每一個批次的狀態持久化到HDFS中,若是機器發生故障,或者程序故障中止,下次啓動時候,仍然能夠從checkpoint的目錄中讀取故障時候rdd的狀態,便能接着上次處理的數據繼續處理,但checkpoint方式最大的弊端是若是代碼升級,新版本的jar不能複用舊版本的序列化狀態,致使兩個版本不能平滑過渡,結果就是要麼丟數據,要麼數據重複,因此官網搞的這個東西,幾乎沒有人敢在生產環境運行很是重要的流式項目。spa
因此比較通用的解決辦法就是本身寫代碼管理spark streaming集成kafka時的offset,本身寫代碼管理offset,其實就是把每批次offset存儲到一個外部的存儲系統裏面包括(Hbase,HDFS,Zookeeper,Kafka,DB等等),不用的什麼存儲系統, 都須要考慮到三種時刻的offset的狀態,不然offset的狀態不完整,就可能致使一些bug出現。圖片
場景一:kafka
當一個新的spark streaming+kafka的流式項目第一次啓動的時候,這個時候發現外部的存儲系統並無記錄任何有關這個topic全部分區的偏移量,因此就從 KafkaUtils.createDirectStream直接建立InputStream流,默認是從最新的偏移量消費,若是是第一次其實最新和最舊的偏移量時相等的都是0,而後在之後的每一個批次中都會把最新的offset給存儲到外部存儲系統中,不斷的作更新。源碼
場景二:it
當流式項目中止後再次啓動,會首先從外部存儲系統讀取是否記錄的有偏移量,若是有的話,就讀取這個偏移量,而後把偏移量集合傳入到KafkaUtils.createDirectStream中進行構建InputSteam,這樣的話就能夠接着上次中止後的偏移量繼續處理,而後每一個批次中仍然的不斷更新外部存儲系統的偏移量,這樣以來就可以無縫銜接了,不管是故障中止仍是升級應用,都是透明的處理。spark
場景三:後臺
對正在運行的一個spark streaming+kafka的流式項目,咱們在程序運行期間增長了kafka的分區個數,請注意:這個時候新增的分區是不能被正在運行的流式項目感應到的,若是想要程序可以識別新增的分區,那麼spark streaming應用程序必須得重啓,同時若是你還使用的是本身寫代碼管理的offset就千萬要注意,對已經存儲的分區偏移量,也要把新增的分區插入進去,不然你運行的程序仍然讀取的是原來的分區偏移量,這樣就會丟失一部分數據。
總結:
若是本身管理kafka的偏移量,必定要注意上面的三個場景,若是考慮不全,就有可能出現詭異的問題。
有什麼問題能夠掃碼關注微信公衆號:我是攻城師(woshigcs),在後臺留言諮詢。 技術債不能欠,健康債更不能欠, 求道之路,與君同行。