Kafka中位移提交那些事兒

Kafka中位移提交那些事兒.png

本文已收錄GitHub,更有互聯網大廠面試真題,面試攻略,高效學習資料等git


以前咱們說過,Consumer 端有個位移的概念,它和消息在分區中的位移不是一回事兒,雖然它們的英文都是 Offset。今天咱們要聊的位移是 Consumer 的消費位移,它記錄了Consumer 要消費的下一條消息的位移。這可能和你之前瞭解的有些出入,不過切記是下一條消息的位移,而不是目前最新消費消息的位移。github

我來舉個例子說明一下。假設一個分區中有 10 條消息,位移分別是 0 到 9。某個Consumer 應用已消費了 5 條消息,這就說明該 Consumer 消費了位移爲 0 到 4 的 5 條消息,此時 Consumer 的位移是 5,指向了下一條消息的位移。面試

Consumer 須要向 Kafka 彙報本身的位移數據,這個彙報過程被稱爲提交位移(Committing Offsets)。由於 Consumer 可以同時消費多個分區的數據,因此位移的提交其實是在分區粒度上進行的,即Consumer 須要爲分配給它的每一個分區提交各自的位移數據。數據庫

提交位移主要是爲了表徵 Consumer 的消費進度,這樣當 Consumer 發生故障重啓以後,就可以從 Kafka 中讀取以前提交的位移值,而後從相應的位移處繼續消費,從而避免整個消費過程重來一遍。換句話說,位移提交是 Kafka 提供給你的一個工具或語義保障,你負責維持這個語義保障,即若是你提交了位移 X,那麼 Kafka 會認爲全部位移值小於 X 的消息你都已經成功消費了。apache

這一點特別關鍵。由於位移提交很是靈活,你徹底能夠提交任何位移值,但由此產生的後果你也要一併承擔。假設你的 Consumer 消費了 10 條消息,你提交的位移值倒是 20,那麼從理論上講,位移介於 11~19 之間的消息是有可能丟失的;相反地,若是你提交的位移值是 5,那麼位移介於 5~9 之間的消息就有可能被重複消費。因此,我想再強調一下,位移提交的語義保障是由你來負責的,Kafka 只會「無腦」地接受你提交的位移。你對位移提交的管理直接影響了你的 Consumer 所能提供的消息語義保障。bootstrap

鑑於位移提交甚至是位移管理對 Consumer 端的巨大影響,Kafka,特別是KafkaConsumer API,提供了多種提交位移的方法。從用戶的角度來講,位移提交分爲自動提交和手動提交;從 Consumer 端的角度來講,位移提交分爲同步提交和異步提交網絡

咱們先來講說自動提交和手動提交。所謂自動提交,就是指 Kafka Consumer 在後臺默默地爲你提交位移,做爲用戶的你徹底沒必要操心這些事;而手動提交,則是指你要本身提交位移,Kafka Consumer 壓根無論。異步

開啓自動提交位移的方法很簡單。Consumer 端有個參數 enable.auto.commit,把它設置爲 true 或者壓根不設置它就能夠了。由於它的默認值就是 true,即 Java Consumer 默認就是自動提交位移的。若是啓用了自動提交,Consumer 端還有個參數就派上用場了:auto.commit.interval.ms。它的默認值是 5 秒,代表 Kafka 每 5 秒會爲你自動提交一次位移。ide

爲了把這個問題說清楚,我給出了完整的 Java 代碼。這段代碼展現了設置自動提交位移的方法。有了這段代碼作基礎,今天后面的講解我就再也不展現完整的代碼了。函數

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserialprops.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeseriKafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),
}

上面的橙色粗體部分,就是開啓自動提交位移的方法。整體來講,仍是很簡單的吧。

和自動提交相反的,就是手動提交了。開啓手動提交位移的方法就是設置enable.auto.commit 爲 false。可是,僅僅設置它爲 false 還不夠,由於你只是告訴Kafka Consumer 不要自動提交位移而已,你還須要調用相應的 API 手動提交位移。

最簡單的 API 就是KafkaConsumer#commitSync()。該方法會提交KafkaConsumer#poll() 返回的最新位移。從名字上來看,它是一個同步操做,即該方法會一直等待,直到位移被成功提交纔會返回。若是提交過程當中出現異常,該方法會將異常信息拋出。下面這段代碼展現了 commitSync() 的使用方法:

while(true) {
ConsumerRecords<String, String>records=
consumer.poll(Duration.ofSeconds(1));
process(records);
//處理消息
try {
 consumer.commitSync();
}
catch (CommitFailedException e) {
 handle(e);
 //處理提交失敗異常
}
}

可見,調用 consumer.commitSync() 方法的時機,是在你處理完了 poll() 方法返回的全部消息以後。若是你莽撞地過早提交了位移,就可能會出現消費數據丟失的狀況。那麼你可能會問,自動提交位移就不會出現消費數據丟失的狀況了嗎?它能恰到好處地把握時機進行位移提交嗎?爲了搞清楚這個問題,咱們必需要深刻地瞭解一下自動提交位移的順序。

一旦設置了 enable.auto.commit 爲 true,Kafka 會保證在開始調用 poll 方法時,提交上次 poll 返回的全部消息。從順序上來講,poll 方法的邏輯是先提交上一批消息的位移,再處理下一批消息,所以它能保證不出現消費丟失的狀況。但自動提交位移的一個問題在於,它可能會出現重複消費

在默認狀況下,Consumer 每 5 秒自動提交一次位移。如今,咱們假設提交位移以後的 3秒發生了 Rebalance 操做。在 Rebalance 以後,全部 Consumer 從上一次提交的位移處繼續消費,但該位移已是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的全部數據都要從新再消費一次。雖然你可以經過減小 auto.commit.interval.ms 的值來提升提交頻率,但這麼作只能縮小重複消費的時間窗口,不可能徹底消除它。這是自動提交機制的一個缺陷。

反觀手動提交位移,它的好處就在於更加靈活,你徹底可以把控位移提交的時機和頻率。可是,它也有一個缺陷,就是在調用 commitSync() 時,Consumer 程序會處於阻塞狀態,直到遠端的 Broker 返回提交結果,這個狀態纔會結束。在任何系統中,由於程序而非資源限制而致使的阻塞均可能是系統的瓶頸,會影響整個應用程序的 TPS。固然,你能夠選擇拉長提交間隔,但這樣作的後果是 Consumer 的提交頻率降低,在下次 Consumer 重啓回來後,會有更多的消息被從新消費。

鑑於這個問題,Kafka 社區爲手動提交位移提供了另外一個 API 方法:

KafkaConsumer#commitAsync()。從名字上來看它就不是同步的,而是一個異步操做。調用 commitAsync() 以後,它會當即返回,不會阻塞,所以不會影響 Consumer 應用的 TPS。因爲它是異步的,Kafka 提供了回調函數(callback),供你實現提交以後的邏輯,好比記錄日誌或處理異常等。下面這段代碼展現了調用 commitAsync() 的方法:

while(true) {
ConsumerRecords<String, String>records=
consumer.poll(Duration.ofSeconds(1));
process(records);
//處理消息
consumer.commitAsync((offsets, exception) -> {
 if(exception != null)
   handle(exception);
}
);

commitAsync 是否可以替代 commitSync 呢?答案是不能。commitAsync 的問題在於,出現問題時它不會自動重試。由於它是異步操做,假若提交失敗後自動重試,那麼它重試時提交的位移值可能早已經「過時」或不是最新值了。所以,異步提交的重試其實沒有意義,因此 commitAsync 是不會重試的。

顯然,若是是手動提交,咱們須要將 commitSync 和 commitAsync 組合使用才能到達最理想的效果,緣由有兩個:

  1. 咱們能夠利用 commitSync 的自動重試來規避那些瞬時錯誤,好比網絡的瞬時抖動,Broker 端 GC 等。由於這些問題都是短暫的,自動重試一般都會成功,所以,咱們不想本身重試,而是但願 Kafka Consumer 幫咱們作這件事。

  2. 咱們不但願程序總處於阻塞狀態,影響 TPS。

咱們來看一下下面這段代碼,它展現的是如何將兩個 API 方法結合使用進行手動提交。

try{
while(true) {
 ConsumerRecords<String, String> records =
 consumer.poll(Duration.ofSeconds(1));
 process(records);
 //處理消息
 commitAysnc();
 //使用異步提交規避阻塞
}
}
catch(Exception e) {
handle(e);
//處理異常
}
finally{
try{
 consumer.commitSync();
 //最後一次提交使用同步阻塞式提交
}
finally{
 consumer.close();

這段代碼同時使用了 commitSync() 和 commitAsync()。對於常規性、階段性的手動提交,咱們調用 commitAsync() 避免程序阻塞,而在 Consumer 要關閉前,咱們調用commitSync() 方法執行同步阻塞式的位移提交,以確保 Consumer 關閉前可以保存正確的位移數據。將二者結合後,咱們既實現了異步無阻塞式的位移管理,也確保了Consumer 位移的正確性,因此,若是你須要自行編寫代碼開發一套 Kafka Consumer 應用,那麼我推薦你使用上面的代碼範例來實現手動的位移提交。

咱們說了自動提交和手動提交,也說了同步提交和異步提交,這些就是 Kafka 位移提交的所有了嗎?其實,咱們還差一部分。

實際上,Kafka Consumer API 還提供了一組更爲方便的方法,能夠幫助你實現更精細化的位移管理功能。剛剛咱們聊到的全部位移提交,都是提交 poll 方法返回的全部消息的位移,好比 poll 方法一次返回了 500 條消息,當你處理完這 500 條消息以後,前面咱們提到的各類方法會一次性地將這 500 條消息的位移一併處理。簡單來講,就是直接提交最新一條消息的位移。但若是我想更加細粒度化地提交位移,該怎麼辦呢?

設想這樣一個場景:你的 poll 方法返回的不是 500 條消息,而是 5000 條。那麼,你確定不想把這 5000 條消息都處理完以後再提交位移,由於一旦中間出現差錯,以前處理的所有都要重來一遍。這相似於咱們數據庫中的事務處理。不少時候,咱們但願將一個大事務分割成若干個小事務分別提交,這可以有效減小錯誤恢復的時間。

在 Kafka 中也是相同的道理。對於一次要處理不少消息的 Consumer 而言,它會關心社區有沒有方法容許它在消費的中間進行位移提交。好比前面這個 5000 條消息的例子,你可能但願每處理完 100 條消息就提交一次位移,這樣可以避免大批量的消息從新消費。

慶幸的是,Kafka Consumer API 爲手動提交提供了這樣的方法:

commitSync(Map<TopicPartition, OffsetAndMetadata>) 和commitAsync(Map<TopicPartition, OffsetAndMetadata>)。它們的參數是一個 Map對象,鍵就是 TopicPartition,即消費的分區,而值是一個 OffsetAndMetadata 對象,保存的主要是位移數據。

就拿剛剛提過的那個例子來講,如何每處理 100 條消息就提交一次位移呢?在這裏,我以commitAsync 爲例,展現一段代碼,實際上,commitSync 的調用方法和它是如出一轍的。

privateMap<TopicPartition, OffsetAndMetadata> offsets =newHashMap<>();
int count = 0;
……
while(true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record: records) {
 process(record);
 //處理消息
 offsets.put(newTopicPartition(record.topic(), record.partition
 newOffsetAndMetadata(record.offset() + 1);
 if(count % 100 == 0)
 consumer.commitAsync(offsets, null);
 //回調處理邏輯是
 count++;
}
}

簡單解釋一下這段代碼。程序先是建立了一個 Map 對象,用於保存 Consumer 消費處理過程當中要提交的分區位移,以後開始逐條處理消息,並構造要提交的位移值。還記得以前我說過要提交下一條消息的位移嗎?這就是這裏構造 OffsetAndMetadata 對象時,使用當前消息位移加 1 的緣由。代碼的最後部分是作位移的提交。我在這裏設置了一個計數器,每累計 100 條消息就統一提交一次位移。與調用無參的 commitAsync 不一樣,這裏調用了帶 Map 對象參數的 commitAsync 進行細粒度的位移提交。這樣,這段代碼就可以實現每處理 100 條消息就提交一次位移,不用再受 poll 方法返回的消息總數的限制了。

總結

Kafka Consumer 的位移提交,是實現 Consumer 端語義保障的重要手段。位移提交分爲自動提交和手動提交,而手動提交又分爲同步提交和異步提交。在實際使用過程當中,推薦你使用手動提交機制,由於它更加可控,也更加靈活。另外,建議你同時採用同步提交和異步提交兩種方式,這樣既不影響 TPS,又支持自動重試,改善 Consumer 應用的高可用性。總之,Kafka Consumer API 提供了多種靈活的提交方法,方便你根據本身的業務場景定製你的提交策略。

相關文章
相關標籤/搜索