原文連接:https://cloud.tencent.com/developer/article/1462432javascript
在新消費者客戶端中,消費位移是存儲在Kafka內部的主題 __consumer_offsets 中。把消費位移存儲起來(持久化)的動做稱爲 「提交」 ,消費者在消費完消息以後須要執行消費位移的提交。java
參考下圖的消費位移,x 表示某一次拉取操做中此分區消息的最大偏移量,假設當前消費者已經消費了 x 位置的消息,那麼咱們就能夠說消費者的消費位移爲 x ,圖中也用了 lastConsumedOffset 這個單詞來標識它。git
不過須要很是明確的是,當前消費者須要提交的消費位移並非 x ,而是 x+1 ,對應上圖中的 position ,它表示下一條須要拉取的消息的位置。github
KafkaConsumer 類提供了 partition(TopicPartition) 和 committed(TopicPartition) 兩個方法來分別獲取上面所說的 postion 和 committed offset 的值。這兩個方法的定義以下所示:異步
可經過 TestOffsetAndPosition.java 來測試consumed offset、committed offset、position之間的關係。該 TestOffsetAndPosition.java 文件的地址爲:函數
https://github.com/841809077/hdpproject/blob/master/src/main/java/com/hdp/project/kafka/consumer/TestOffsetAndPosition.javapost
在 Kafka 中默認的消費位移的提交方式爲自動提交,這個由消費者客戶端參數 enable.auto.commit 配置,默認值爲 true 。這個默認的自動提交不是每消費一條消息就提交一次,而是按期提交,這個按期的週期時間由客戶端 auto.commit.interval.ms 配置,默認值爲 5 秒,此參數生效的前提是 enable.auto.commit 參數爲 true 。性能
在默認的配置下,消費者每隔 5 秒會將拉取到的每一個分區中最大的消息位移進行提交。自動位移提交的動做是在 poll() 方法的邏輯裏完成的,在每次真正向服務端發起拉取請求以前會檢查是否能夠進行位移提交,若是能夠,那麼就會提交上一次輪詢的位移。測試
Kafka 自動提交消費位移的方式很是簡便,它免去了複雜的位移提交邏輯,但並無爲開發者留有餘地來處理重複消費和消息丟失的問題。自動位移提交沒法作到精確的位移管理,因此Kafka還提供了手動位移提交的方式,這樣就可使得開發人員對消費位移的管理控制更加靈活。開啓手動提交功能的前提是消費者客戶端參數 enable.auto.commit 配置爲 false 。示例以下:spa
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手動提交又分爲同步提交和異步提交,對應於 KafkaConsumer 中的 commitSync() 和 commitAsync() 兩種類型的方法。
消費者能夠調用 commitSync() 方法,來實現位移的同步提交。
commitSync() 方法會根據 poll() 方法拉取的最新位移來進行提交,只要沒有發生不可回覆的錯誤,它就會阻塞消費者線程直至位移提交完成。
對於採用 commitSync() 的無參方法而言,它提交消費位移的頻率和拉取批次消息、處理批次消息的頻率是同樣的。若是想尋求更細粒度的、更精準的提交,那麼就須要使用 commitSync() 的另外一個含參方法,具體定義以下:
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
該方法提供了一個 offsets 參數,用來提交指定分區的位移。
與 commitSync() 方法相反,異步提交的方式在執行的時候消費者線程不會被阻塞,能夠在提交消費位移的結果還未返回以前就開始新一次的拉取操做。異步提交可使消費者的性能獲得必定的加強。commitAsync() 方法有三個不一樣的重載方法:
public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
第一個無參方法和第三個方法中的 offsets 都很好理解,對照 commitSync() 方法便可。關鍵是第二個方法與第三個方法的 callback 參數,它提供了一個異步提交的回調方法,當位移提交完成後會回調 OffsetCommitCallback 中的 onComplete() 方法。以下圖所示:
發送提交請求後能夠繼續作其它事情。若是提交失敗,錯誤信息和偏移量會被記錄下來。
通常狀況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,由於若是提交失敗是由於臨時問題致使的,那麼後續的提交總會有成功的。但若是這是發生在 關閉消費者 或 再均衡(分區的所屬權從一個消費者轉移到另外一個消費者的行爲) 前的最後一次提交,就要確保可以提交成功。
所以,在消費者關閉前通常會組合使用 commitAsync() 和 commitSync() 。使用 commitAsync() 方式來作每條消費信息的提交(由於該種方式速度更快),最後再使用 commitSync() 方式來作位移提交最後的保證。
try { while (true) { // 消費者poll而且執行一些操做 // ... // 異步提交,也可以使用有回調函數的異步提交。較同步提交速度更快。 consumer.commitAsync(); } } catch (Exception e) { logger.error("Unexpected error" , e); } finally { try { // 同步提交,來作位移提交最後的保證。 consumer.commitSync(); } finally { consumer.close(); } }
本文主要講解了消費者提交消息位移的兩種方式,分爲:
而 手動提交 又分爲:
而在通常狀況下,建議使用手動的方式:異步和同步組合提交消息位移。由於異步提交不須要等待提交的反饋結果,便可進行新一次的拉取消息操做,速度較同步提交更快。但在最後一次提交消息位移以前,爲了保證位移提交成功,仍是須要再作一次同步提交操做。
本文參考《Kafka權威指南》與《深刻理解Kafka:核心設計與實踐原理》,也推薦你們閱讀這兩本書。