前段時間上線一個新服務,咱們的運營在測試的時候,導入了一批數據,結果目標表裏的數據是預期數量的2倍,有大量的重複數據,一開始我認爲多是我沒有過濾數據類型致使的,我所消費的數據是經過監聽數據庫的binlog解析後推送到Kafka的數據,我收到kafka消息通過反序列化獲得多條數據庫表的變更記錄,每一條記錄都有一個類型:INSERT
,UPDATE
,DELETE
,其實也就是對應SQL的類型,我一開始並無判斷這個類型,而是收到的全部數據都進行後續的處理,運營也說多是有UPDATE
操做的,因而我加了過濾,只處理INSERT
類型數據,改好以後想着確定沒問題了,因而讓運營再刪掉原有數據從新導入一遍,次日再看結果。html
然而次日,問題依舊......這下可難到我了,想了一會猜想會不會是前面的環節推送的數據就是重複的呢?可是也不能瞎猜,因而我在處理一條數據以前打印上offset,發版後開始觀察日誌,不一下子庫裏出現了重複數據了,我拿到重複的訂單號以後去日誌中搜索,結果,兩條數據的offset是同樣的?數據庫
看來這真是我消費的問題了,可是以前相似的項目也是一樣的消費方式,歷來沒出現太重複消費呀,這就讓我很是納悶,因而找了幾條重複數據,觀察了一下插入時間,發現時間間隔還挺有規律,基本都是五分鐘左右重複插入一次,在日誌上發現重複消費是兩臺機器交替進行的,我所消費的Topic是隻有一個分區的,因此只會有一臺機器消費,從日誌上看出來兩臺機器在交替消費,於是產生了重複消費,可是爲何呢,因而再搜了下Exception關鍵詞,發現了一個CommitFailedException
:apache
能夠看出來,是當前消費端被踢出了消費組,隨後offset提交失敗,而後換了一臺機器從新消費這個offset,致使了重複消費,可是爲何被被剔除出消費組我仍然沒法解釋,由於日誌上再也沒其餘錯誤信息了,好在後來又讓我發現了點蛛絲馬跡,那會恰好使用了tail -f
命令,不經意間發現過了很久,接收到的數據offset都是同樣的,一條kafka消息解壓後有這麼多?(後來在本地試了下,果真一條Kafka消息,反序列化後都有3000到9000條不等,這也太多了吧。。)通過一番Google以後注意到了Kafka的兩個配置項:max.poll.interval.ms
和max.poll.records
分別表明拉取消息的間隔時間和拉取的最大條數,個人配置是:緩存
max.poll.interval.ms = 600000 # 默認5分鐘 max.poll.records = 20 # 這個是我最開始寫的20條
也就是說,最快要5分鐘內處理完20個offset,不然將認爲客戶端不在消費了,也就產生了上面的異常,被踢出消費組,然後又commit失敗。從打印的日誌來看,這個時間明顯不夠,處理一個offset的消息都要好久,更別說最大20個了。知道緣由後,我改了參數:運維
max.poll.interval.ms = 1200000 max.poll.records = 1
也就是一條十分鐘的一個的下限速度,可是後來證實,這個時間依舊遠遠不夠(最後這個時間已經改成了1個小時...),通過一段時間的觀察,一條Kafka消息反序列化最多會有10000條數據,處理時間最長大約40多分鐘,我最開始確實沒有想到這個Topic中的一條消息包含的數據會這麼多,致使了這一系列的問題,時間改成1小時後連續兩天再沒出現太重複消費offset的問題。性能
可是可是,這個程序的處理速度慢,也是致使此問題的一個緣由,固然後來發現代碼中,處理一條數據,平均查詢與插入數據庫的次數有五六十次!這固然快不起來啊,因而我在大部分的SQL查詢部分使用WeakHashMap
進行了數據緩存,由於這部分查詢的數據基本是不會有變更的,極大的減小了數據庫查詢次數,處理速度提高了將近10倍!再後來,因爲這個Topic只有一個partition,徹底沒辦法用到多臺機器的性能,並且據運維反饋,這個TiDB binlog監聽的中間件只支持一個分區發送, 因而我本身在程序中增長了一道轉發,即消費到Kafka消息反序列化以後,將反序列化以後的數據先不作處理,直接一條一條轉發至RocketMQ,這個過程是很是快的,rocketMQ再發送至各個機器上(也可新建一個多partition的Kafka Topic用於轉發),這樣就能充分利用集羣的優點,進一步極大地提升處理速度,這一塊說這麼多其實偏題了,屬於後續的一個優化。測試
後來發現,其實每一次在消費端即將離開以前,都會有一條日誌:fetch
提示向服務端發送了離開消費組的請求,由於客戶端poll操做已超時,並建議增大最大拉取間隔時間或者減少最大拉取數量(這個不行,我都改到1了 T_T )。優化
而後在處理完一個offset提交的時候會提示請求失敗,當前消費者再也不是此消費組的一部分。指針
可是日誌級別是INFO,確實不太容易發現,至此問題已徹底解決,緣由也徹底清楚了。
總的來講這次遇到的Kafka重複消費的緣由,第一是Kafka的消息太大(後來解析到最大有包含25萬條數據的一條消息。。。這都是使用protobuf序列化的消息),可是這部分咱們沒法變更,第二是一開始處理速度也比較慢,默認間隔時間徹底不夠,綜合致使頻繁從新消費。
通過這次問題對Kafka參數有了更深的一些認識,除上面兩個用到的同時也是比較重要參數以外,還有請求的超時時間、會話超時時間、心跳檢測事件、拉取消息的超時時間等,在本地Debug期間還遇到過一個老是拉取不到消息而後報空指針異常的問題,而後查到緣由是拉取Kafka消息超時了,一條消息可能有十幾兆,那會恰好我電腦的網速很是慢,就超時了,後來加大了fetch.max.wait.ms
和request.timeout.ms
就沒問題了。頗有意義的一次Kafka問題排查經歷