如何管理Spark Streaming消費Kafka的偏移量(二)

上篇文章,討論了在spark streaming中管理消費kafka的偏移量的方式,本篇就接着聊聊上次說升級失敗的案例。併發

事情發生一個月前,因爲當時咱們想提升spark streaming程序的並行處理性能,因而須要增長kafka分區個數,,這裏須要說下,在新版本spark streaming和kafka的集成中,按照官網的建議 spark streaming的executors的數量要和kafka的partition的個數保持相等,這樣每個executor處理一個kafka partition的數據,效率是最高的。若是executors的數量大於kafka的分區個數,其實多餘的executors至關因而不會處理任何數據,這部分的進程實際上是白白浪費性能。運維

若是executor的個數小於kafka partition的個數,那麼其實有一些executors進程是須要處理多個partition分區的數據的,因此官網建議spark executors的進程數和kafka partition的個數要保持一致。性能

那麼問題來了,若是想要提升spark streaming的並行處理性能,只能增長kafka的分區了,給kafka增長分區比較容易,直接執行一個命令便可,不過這裏須要注意,kafka的分區只能增長不能減小,因此添加分區要考慮到底多少個才合適。測試

接下來咱們便增長了kafka分區的數量,同時修改了spark streaming的executors的個數和kafka的分區個數一一對應,而後就啓動了流程序,結果出現了比較詭異的問題,表現以下:spa

造幾條測試數據打入kafka中,發現程序老是隻能處理其中的一部分數據,而每次總有一些數據丟失。按理說代碼沒有任何改動,只是增長kafka的分區和spark streaming的executors的個數,應該不會出現問題纔對,因而又從新測了原來的舊分區和程序,發現沒有問題,通過對比發現問題只會出如今kafka新增分區後,而後出現這種丟數據的狀況。而後和運維同窗一塊兒看了新增的kafka的分區的磁盤目錄是否有數據落入,經查詢發現新的分區確實已經有數據進入了,這就很奇怪了丟的數據究竟是怎麼丟的?進程

最後我又檢查了咱們本身保存的kafka的offset,發現裏面的偏移量居然沒有新增kafka的分區的偏移量,至此,終於找到問題所在,也就是說,若是沒有新增分區的偏移量,那麼程序運行時是不會處理新增分區的數據,而咱們新增的分區確確實實有數據落入了,這就是爲啥前面說的詭異的丟失數據的緣由,實際上是由於新增kafka的分區的數據程序並無處理過而這個緣由正是咱們的本身保存offset中沒有記錄新增分區的偏移量。kafka

問題找到了,那麼如何修復線上丟失的數據呢?源碼

當時想了一個比較笨的方法,由於咱們的kafka線上默認是保留7天的數據,舊分區的數據已經處理過,就是新增的分區數據沒有處理,因此咱們刪除了已經處理過的舊的分區的數據,而後在業務流量底峯時期,從新啓了流程序,讓其從最先的數據開始消費處理,這樣以來由於舊的分區被刪除,只有新分區有數據,因此至關因而把丟失的那部分數據給修復了。修復完成後,又把程序中止,而後配置從最新的偏移量開始處理,這樣偏移量裏面就能識別到新增的分區,而後就繼續正常處理便可。it

注意這裏面的刪除kafka舊分區的數據,是一個比較危險的操做,它要求kafka的節點須要所有重啓才能生效,因此除非特殊狀況,不要使用這麼危險的方式。spark

後來,仔細分析了咱們使用的一個開源程序管理offset的源碼,發現這個程序有一點bug,沒有考慮到kafka新增分區的狀況,也就是說若是你的kafka分區增長了,你的程序在重啓後是識別不到新增的分區的,因此若是新增的分區還有數據進入,那麼你的程序必定會丟數據,由於擴展kafka分區這個操做,並不常見,因此這個bug比較難易觸發。

知道緣由後,解決起來比較容易了,就是每次啓動流程序前,對比一下當前咱們本身保存的kafka的分區的個數和從zookeeper裏面的存的topic的分區個數是否一致,若是不一致,就把新增的分區給添加到咱們本身保存的信息中,併發偏移量初始化成0,這樣以來在程序啓動後,就會自動識別新增分區的數據。

因此,回過頭來看上面的那個問題,最簡單優雅的解決方法就是,直接手動修改咱們本身的保存的kafka的分區偏移量信息,把新增的分區給加入進去,而後重啓流程序便可。

這個案例也就是我上篇文章所說的第三個場景的case,若是是本身手動管理kafka的offset必定要注意兼容新增分區後的這種狀況,不然程序可能會出現丟失數據的問題。

相關文章
相關標籤/搜索