在消費Kafka中分區的數據時,咱們須要跟蹤哪些消息是讀取過的、哪些是沒有讀取過的。這是讀取消息不丟失的關鍵所在。數據庫
Kafka是經過offset順序讀取事件的。若是一個消費者退出,再重啓的時候,它知道從哪兒繼續讀取消息進行處理。因此,消費者須要「提交」屬於它們本身的偏移量。若是消費者已經提交了偏移量,但消息沒有獲得有效處理,此時就會形成消費者消息丟失。因此,咱們應該重視偏移量提交的時間點以及提交的方式。緩存
一、group.id性能
二、auto.offset.reset線程
三、enable.auto.commit事件
四、auto.commit.interval.ms事務
若是咱們但願可以更有效地控制偏移量提交的時間點,就須要顯示地提交偏移量。同步
一、老是在處理完事件後再提交偏移量it
若是全部的處理都是在輪詢裏完成,無需在輪詢之間維護狀態,那麼可使用自動提交,或者在輪詢結束後進行手動提交。ast
二、提交頻率是性能和重複消息數量之間的權衡test
這個意思是:提交頻率越高,重複消息處理的數量越少,性能也是比較低的。提交頻率越低,重複消息處理的數量越多,性能是比較好的。因此,要根據實際的狀況,來衡量在什麼時機,來提交偏移量。即便是在最簡單的場景你,也須要在一個循環中屢次提交偏移量。
三、確保對提交的偏移量內心有數
必定要在處理完消息後,再提交偏移量,不然會出現某些消息會被處理。
四、消費者可能須要重試
但處理消息出現問題時,例如:把Kafka中的數據寫入到HBase中,此時HBase臨時不可用。咱們想要重試。假設這條消息是:#30,#30處理失敗了。那你們想一想?#31能提交嗎?
顯然是不能的,若是#31提交了,那麼#31以前的全部數據,都不會被處理了。咱們可使用如下幾種模式來處理:
模式一
① 但遇到可重試錯誤時,提交最後一個處理成功的偏移量
② 把沒有處理好的消息保存到緩衝區
③ 調用 pause() 方法,確保其餘的輪詢不會返回數據
④ 嘗試從新處理緩存中的數據,若是重試成功,或者重試次數達到上限並決定放棄,把錯誤記錄下來並丟棄消息
⑤ 調用 resume() 方法讓消費者繼續從輪詢裏獲取新數據
模式二
① 遇到可重試錯誤時,把錯誤寫入一個獨立的主題,而後繼續
② 用一個獨立的消費者組負責從該主題上讀取錯誤消息,並進行重試
五、長時間處理
有時候要進行比較複雜的處理,暫停輪詢的時間不能超過幾秒鐘。要保持輪詢,由於只有在輪詢過程當中,才能往broker發送心跳。可使用一個線程池來處理數據,可讓輪詢不獲取新的數據,直到工做縣好吃呢個處理完成。消費者一直保持輪詢,心跳正常,就不會發生再均衡。
八、僅一次傳遞
有的程序不只是須要「至少一次」(at least-once語義)(意味着沒有數據丟失),還須要僅一次(exactly-once)語義。實現一次性語義,最經常使用的辦法就是把結果寫入到一個支持惟一鍵的系統裏,好比:k-v存儲、關係數據庫、ES或者其餘數據存儲。可使用主題、分區和偏移量來做爲主鍵,這樣,能夠碰巧讀取到同一個相同的消息,直接覆蓋寫入就能夠了。這種稱爲冪等性寫入。
還有一種,就是使用關係型數據庫,HDFS中一些被定義過的原子操做也常常用來達到相同的目的。把消息和偏移量放在同一個事務裏,這樣讓它們保持同步。消費者啓動,獲取最近處理過的偏移量,調用seek()方法從偏移量位置繼續讀取數據
參考文件:
「Kafka權威指南」