「Kafka」Kafka中offset偏移量提交

在消費Kafka中分區的數據時,咱們須要跟蹤哪些消息是讀取過的、哪些是沒有讀取過的。這是讀取消息不丟失的關鍵所在。數據庫

Kafka是經過offset順序讀取事件的。若是一個消費者退出,再重啓的時候,它知道從哪兒繼續讀取消息進行處理。因此,消費者須要「提交」屬於它們本身的偏移量。若是消費者已經提交了偏移量,但消息沒有獲得有效處理,此時就會形成消費者消息丟失。因此,咱們應該重視偏移量提交的時間點以及提交的方式。緩存


Kafka消費者的可靠性配置

一、group.id性能

  • 若是兩個消費者有相同的 group.id,而且定義同一個主題,那麼每一個消費者都會消費一個分區的數據

二、auto.offset.reset線程

  • 這個參數的做用是:當沒有偏移量提交(例如:消費者第一次啓動、或者請求的偏移量在broker上不存在時),消費者會如何處理
  • earliest:消費者從分區的開始位置讀取大量的重複數據,能夠保證個最少的數據丟失
  • latest:消費者會從分區的末尾開始讀取數據,能夠減小重複讀,但頗有可能會錯過一些消息

三、enable.auto.commit事件

  • 能夠設置自動提交偏移量,能夠在代碼中手動提交偏移量
  • 自動提交,可讓消費者邏輯更簡單
  • 但它沒法控制重複處理消息、或者若是消息交給另一個後臺線程去處理,自動提交機制可能會在消息尚未處理完就提交了偏移量

四、auto.commit.interval.ms事務

  • 經過該參數,能夠配置提交的頻率。默認:每5秒鐘提交一次
  • 提交的頻率高,也是會增長額外的開銷的


顯示提交偏移量

若是咱們但願可以更有效地控制偏移量提交的時間點,就須要顯示地提交偏移量。同步

一、老是在處理完事件後再提交偏移量it

若是全部的處理都是在輪詢裏完成,無需在輪詢之間維護狀態,那麼可使用自動提交,或者在輪詢結束後進行手動提交。ast


二、提交頻率是性能和重複消息數量之間的權衡test

這個意思是:提交頻率越高,重複消息處理的數量越少,性能也是比較低的。提交頻率越低,重複消息處理的數量越多,性能是比較好的。因此,要根據實際的狀況,來衡量在什麼時機,來提交偏移量。即便是在最簡單的場景你,也須要在一個循環中屢次提交偏移量。


三、確保對提交的偏移量內心有數

必定要在處理完消息後,再提交偏移量,不然會出現某些消息會被處理。


四、消費者可能須要重試

但處理消息出現問題時,例如:把Kafka中的數據寫入到HBase中,此時HBase臨時不可用。咱們想要重試。假設這條消息是:#30,#30處理失敗了。那你們想一想?#31能提交嗎?

顯然是不能的,若是#31提交了,那麼#31以前的全部數據,都不會被處理了。咱們可使用如下幾種模式來處理:

模式一

① 但遇到可重試錯誤時,提交最後一個處理成功的偏移量

② 把沒有處理好的消息保存到緩衝區

③ 調用 pause() 方法,確保其餘的輪詢不會返回數據

④ 嘗試從新處理緩存中的數據,若是重試成功,或者重試次數達到上限並決定放棄,把錯誤記錄下來並丟棄消息

⑤ 調用 resume() 方法讓消費者繼續從輪詢裏獲取新數據

模式二

① 遇到可重試錯誤時,把錯誤寫入一個獨立的主題,而後繼續

② 用一個獨立的消費者組負責從該主題上讀取錯誤消息,並進行重試


五、長時間處理

有時候要進行比較複雜的處理,暫停輪詢的時間不能超過幾秒鐘。要保持輪詢,由於只有在輪詢過程當中,才能往broker發送心跳。可使用一個線程池來處理數據,可讓輪詢不獲取新的數據,直到工做縣好吃呢個處理完成。消費者一直保持輪詢,心跳正常,就不會發生再均衡。


八、僅一次傳遞

有的程序不只是須要「至少一次」(at least-once語義)(意味着沒有數據丟失),還須要僅一次(exactly-once)語義。實現一次性語義,最經常使用的辦法就是把結果寫入到一個支持惟一鍵的系統裏,好比:k-v存儲、關係數據庫、ES或者其餘數據存儲。可使用主題、分區和偏移量來做爲主鍵,這樣,能夠碰巧讀取到同一個相同的消息,直接覆蓋寫入就能夠了。這種稱爲冪等性寫入。


還有一種,就是使用關係型數據庫,HDFS中一些被定義過的原子操做也常常用來達到相同的目的。把消息和偏移量放在同一個事務裏,這樣讓它們保持同步。消費者啓動,獲取最近處理過的偏移量,調用seek()方法從偏移量位置繼續讀取數據


參考文件:

「Kafka權威指南」

相關文章
相關標籤/搜索