提交和偏移量數據庫
每次調用poll 方法,老是返回生產者寫入Kafka但尚未被消費者讀取過的記錄咱們所以能夠追蹤到哪些記錄時被羣組裏的哪一個消費者讀取過的。安全
咱們把更新分區當前位置的操做叫作提交。服務器
那麼消費者時如何提交偏移量的呢?消費者往一個叫作_consumer_offset的特殊主題發送消息,消息裏包含每一個分區的偏移量。若是消費者一直處於運行狀態,那麼偏移量沒有什麼用處。不過若是消費者發生崩潰或者有新的消費者加入羣組,就會觸發在均衡,完成再均衡以後,每一個消費者可能分配到新的分區,而不是以前那一個,爲了可以繼續以前的工做,消費者須要讀取每一個分區最後一次提交的偏移量,而後從偏移量指定的地方繼續處理。異步
若是提交的偏移量小於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息就會被重複處理。線程
若是提交的偏移量大於客戶端處理的最後一個消息的偏移量,那麼處於兩個偏移量之間的消息將會丟失。事務
KafkaConsumer API提供了不少方式來提交偏移量。開發
自動提交it
最簡單的方式時讓消費者本身提交偏移量,若是enable.auto.commit被設爲true,那麼每過5s,消費者自動把從poll方法接收到的最大偏移量提交上去。提交間隔時間由auto.commit.interval.ms控制,默認值時5s,若是這個5s內發生了再均衡,那麼就會有消息被重複處理。自動提交雖然方便,可是並無爲開發者留有餘地來避免重複處理消息。io
手動提交List
開發者能夠在必要的時候提交當前偏移量,而不是基於時間間隔。把enable.auto.commit設爲false,讓應用程序決定什麼時候提交偏移量。使用commitSync方法提交偏移量最簡單也最可靠。這個API會提交由poll方法返回的最新偏移量,提交成功後立刻返回,若是提交失敗就會拋出異常。
commitSync方法將會提交由poll方法返回的最新偏移量,因此處理完全部記錄後要確保調用了commitSync,不然仍是會有丟失消息的風險。若是發生了再均衡,從最近一批消息到發生在均衡之間的全部消息都將會被重複處理。
異步提交
手動提交有一個不足之處,在broker對提交請求做出迴應以前,應用程序會一直阻塞,這會影響應用程序的吞吐量,咱們能夠經過下降提交頻率來提高吞吐量,但若是發生了再均衡,會增長重複消息的數量。
這個時候能夠考慮使用異步提交API,咱們只管發送提交請求,無需等待broker的響應。
在成功提交或碰到沒法恢復的錯誤以前,commitSync方法會一直重試,可是commitAsync不會,他之因此不進行重試,是由於它在收到服務器響應的時候,可能有一個更大的偏移量已經提交成功。
咱們之因此提到這個問題的複雜性和提交順序的重要性,是由於commitAsync方法支持回調,在broker做出響應時會執行回調。回調常常被用於記錄提交錯誤或者生成度量指標,不過若是用於重試,必定注意提交的順序。
咱們可使用一個單調遞增的序列號來維護異步提交的順序,在每次提交偏移量以後或在回調裏提交偏移量時遞增序列號。在進行重試前,先檢查回調的序列號和即將提交的偏移量是否相等,若是相等,說明沒有新的提交,那麼能夠安全地進行重試,若是序列號比較大,說明有一個新的提交意見發送,應該中止重試。
提交特定的偏移量
提交偏移量的頻率和處理消息的頻率時同樣的,但若是向頻繁提交怎麼辦?
消費者API容許在調用commitSync和commitAsync方法時傳進去但願提交的分區和偏移量map。
再均衡監聽器
在爲消費者分配新分區或者移除舊分區時,能夠經過消費者API執行一些應用程序代碼,在調用subscribe方法時傳進去一個ConsumerRebalanceListener實例就能夠了,它有兩個須要實現的方法:
onPartitionRevoked方法會在再均衡開始以前和消費者中止讀取消息以後被調用。若是在這裏提交偏移量,下一個接管分區的消費者就知道該從哪裏開始讀取。
onPartitionAssigned方法會在從新分配分區以後和消費者開始讀取消息以前被調用。
從特定偏移量開始處理記錄
有時候須要從特定的偏移量開始讀取消息。若是想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息,可使用seekToBeginning方法和seekToEnd方法。
若是保存記錄和偏移量能夠在一個原子操做裏完成,就能夠避免重複處理。若是在同一個事務裏把記錄和偏移量都寫到數據庫裏,那麼咱們就會知道記錄和偏移量要麼都成功,要麼都失敗,而後從新處理記錄。
如今的問題時:若是偏移量時保存在數據庫裏而不是Kafka裏,那麼消費者在獲得新分區時怎麼知道該從哪裏開始讀取?這個時候可使用seek方法,在消費者啓動或分配到新分區時,可使用seek方法查找保存到數據庫裏的偏移量。如何退出
若是肯定要退出循環,須要經過另一個線程調用consumer.wakeup方法。