摘要:Kafka中的位移是個極其重要的概念,由於數據一致性、準確性是一個很重要的語義,咱們都不但願消息重複消費或者丟失。而位移就是控制消費進度的大佬。本文就詳細聊聊kafka消費位移的那些事,包括:java
關於位移(Offset),其實在kafka的世界裏有兩種位移:數據庫
分區位移:生產者向分區寫入消息,每條消息在分區中的位置信息由一個叫offset的數據來表徵。假設一個生產者向一個空分區寫入了 10 條消息,那麼這 10 條消息的位移依次是 0、一、…、9;編程
消費位移:消費者須要記錄消費進度,即消費到了哪一個分區的哪一個位置上,這是消費者位移(Consumer Offset)。bootstrap
注意,這和上面所說的消息在分區上的位移徹底不是一個概念。上面的「位移」表徵的是分區內的消息位置,它是不變的,即一旦消息被成功寫入到一個分區上,它的位移值就是固定的了。而消費者位移則不一樣,它多是隨時變化的,畢竟它是消費者消費進度的指示器。api
消費位移,記錄的是 Consumer 要消費的下一條消息的位移,切記,是下一條消息的位移! 而不是目前最新消費消息的位移異步
假設一個分區中有 10 條消息,位移分別是 0 到 9。某個 Consumer 應用已消費了 5 條消息,這就說明該 Consumer 消費了位移爲 0 到 4 的 5 條消息,此時 Consumer 的位移是 5,指向了下一條消息的位移。函數
至於爲何要有消費位移,很好理解,當 Consumer 發生故障重啓以後,就可以從 Kafka 中讀取以前提交的位移值,而後從相應的位移處繼續消費,從而避免整個消費過程重來一遍。就好像書籤同樣,須要書籤你才能夠快速找到你上次讀書的位置。學習
那麼瞭解了位移是什麼以及它的重要性,咱們天然而然會有一個疑問,kafka是怎麼記錄、怎麼保存、怎麼管理位移的呢?fetch
Consumer 須要上報本身的位移數據,這個彙報過程被稱爲位移提交。由於 Consumer 可以同時消費多個分區的數據,因此位移的提交其實是在分區粒度上進行的,即Consumer 須要爲分配給它的每一個分區提交各自的位移數據。優化
鑑於位移提交甚至是位移管理對 Consumer 端的巨大影響,KafkaConsumer API提供了多種提交位移的方法,每一種都有各自的用途,這些都是本文將要談到的方案。
void commitSync(Duration timeout); void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets); void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout); void commitAsync(); void commitAsync(OffsetCommitCallback callback); void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
先粗略的總結一下。位移提交分爲自動提交和手動提交;而手動提交又分爲同步提交和異步提交。
當消費配置enable.auto.commit=true
的時候表明自動提交位移。
自動提交位移是發生在何時呢?auto.commit.interval.ms
默認值是50000ms。即kafka每隔5s會幫你自動提交一次位移。自動位移提交的動做是在 poll()方法的邏輯裏完成的,在每次真正向服務端發起拉取請求以前會檢查是否能夠進行位移提交,若是能夠,那麼就會提交上一次輪詢的位移。假如消費數據量特別大,能夠設置的短一點。
越簡單的東西功能越不足,自動提交位移省事的同時確定會帶來一些問題。自動提交帶來重複消費和消息丟失的問題:
重複消費: 在默認狀況下,Consumer 每 5 秒自動提交一次位移。如今,咱們假設提交位移以後的 3 秒發生了 Rebalance 操做。在 Rebalance 以後,全部 Consumer 從上一次提交的位移處繼續消費,但該位移已是 3 秒前的位移數據了,故在 Rebalance 發生前 3 秒消費的全部數據都要從新再消費一次。雖然你可以經過減小 auto.commit.interval.ms
的值來提升提交頻率,但這麼作只能縮小重複消費的時間窗口,不可能徹底消除它。這是自動提交機制的一個缺陷。
消息丟失: 假設拉取了100條消息,正在處理第50條消息的時候,到達了自動提交窗口期,自動提交線程將拉取到的每一個分區的最大消息位移進行提交,若是此時消費服務掛掉,消息並未處理結束,但卻提交了最大位移,下次重啓就從100條那消費,即發生了50-100條的消息丟失。
當消費配置enable.auto.commit=false
的時候表明手動提交位移。用戶必須在適當的時機(通常是處理完業務邏輯後),手動的調用相關api方法提交位移。好比在下面的案例中,我須要確認個人業務邏輯返回true以後再手動提交位移
while (true) { try { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes)); if (!consumerRecords.isEmpty()) { for (ConsumerRecord<String, String> record : consumerRecords) { KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class); // 處理業務 boolean handleResult = handle(kafkaMessage); if (handleResult) { log.info(" handle success, kafkaMessage={}" ,kafkaMessage); } else { log.info(" handle failed, kafkaMessage={}" ,kafkaMessage); } } // 手動提交offset consumer.commitSync(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes)); } } catch (Exception e) { log.info("kafka consume error." ,e); } }
手動提交明顯能解決消息丟失的問題,由於你是處理完業務邏輯後再提交的,假如此時消費服務掛掉,消息並未處理結束,那麼重啓的時候還會從新消費。
可是對於業務層面的失敗致使消息未消費成功,是沒法處理的。由於業務層的邏輯變幻無窮、好比格式不正確,你叫Kafka消費端程序怎麼去處理?應該要業務層面本身處理,記錄失敗日誌作好監控等。
可是手動提交不能解決消息重複的問題,也很好理解,假如消費0-100條消息,50條時掛了,重啓後因爲沒有提交這一批消息的offset,是會從0開始從新消費。至於如何避免重複消費的問題,在這篇文章有說。
手動提交又分爲異步提交和同步提交。
上面案例代碼使用的是commitSync() ,顧名思義,是同步提交位移的方法。同步提交位移Consumer 程序會處於阻塞狀態,等待 Broker 返回提交結果。同步模式下提交失敗的時候一直嘗試提交,直到遇到沒法重試的狀況下才會結束。在任何系統中,由於程序而非資源限制而致使的阻塞均可能是系統的瓶頸,會影響整個應用程序的 TPS。固然,你能夠選擇拉長提交間隔,但這樣作的後果是 Consumer 的提交頻率降低,在下次 Consumer 重啓回來後,會有更多的消息被從新消費。所以,爲了解決這些不足,kafka還提供了異步提交方法。
異步提交會當即返回,不會阻塞,所以不會影響 Consumer 應用的 TPS。因爲它是異步的,Kafka 提供了回調函數,供你實現提交以後的邏輯,好比記錄日誌或處理異常等。下面這段代碼展現了調用 commitAsync() 的方法
consumer.commitAsync((offsets, exception) -> { if (exception != null) handleException(exception); });
可是異步提交會有一個問題,那就是它沒有重試機制,不過通常狀況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,由於若是提交失敗是由於臨時問題致使的,那麼後續的提交總會有成功的。因此消息也是不會丟失和重複消費的。
但若是這是發生在關閉消費者或再均衡前的最後一次提交,就要確保可以提交成功。所以,組合使用commitAsync()
和commitSync()
是最佳的方式。
try { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes)); if (!consumerRecords.isEmpty()) { for (ConsumerRecord<String, String> record : consumerRecords) { KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class); boolean handleResult = handle(kafkaMessage); } //異步提交位移 consumer.commitAsync((offsets, exception) -> { if (exception != null) handleException(exception); }); } } } catch (Exception e) { System.out.println("kafka consumer error:" + e.toString()); } finally { try { //最後同步提交位移 consumer.commitSync(); } finally { consumer.close(); } }
若是細心的閱讀了上面全部demo的代碼,那麼你會發現這樣幾個問題:
一、全部的提交,都是提交 poll 方法返回的全部消息的位移,poll 方法一次返回1000 條消息,則一次性地將這 1000 條消息的位移一併提交。可這樣一旦中間出現問題,位移沒有提交,下次會從新消費已經處理成功的數據。因此我想作到細粒度控制,好比每次提交100條,該怎麼辦?
答:能夠經過commitSync(Map<TopicPartition, OffsetAndMetadata>)
和 commitAsync(Map<TopicPartition, OffsetAndMetadata>)
對位移進行精確控制。
二、poll和commit方法對於普通的開發人員而言是一個黑盒,沒法精確地掌控其消費的具體位置。我都不知道此次的提交,是針對哪一個partition,提交上去的offset是多少。
答:能夠經過record.topic()
獲取topic信息, record.partition()
獲取分區信息,record.offset() + 1
獲取消費位移,記住消費位移是指示下一條消費的位移,因此要加一。
三、我想本身管理offset怎麼辦?一方面更加保險,一方面下次重啓以後能夠精準的從數據庫讀取最後的offset就不存在丟失和重複消費了。
答:能夠將消費位移保存在數據庫中。消費端程序使用comsumer.seek
方法指定從某個位移開始消費。
綜合以上幾個可優化點,並結合全文,能夠給出一個比較完美且完整的demo:聯合異步提交和同步提交,對處理過程當中全部的異常都進行了處理。細粒度的控制了消費位移的提交,而且保守的將消費位移記錄到了數據庫中,從新啓動消費端程序的時候會從數據庫讀取位移。這也是咱們消費端程序位移提交的最佳實踐方案。你只要繼承這個抽象類,實現你具體的業務邏輯就能夠了。
public abstract class PrefectCosumer { private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); int count = 0; public final void consume() { Properties properties = PropertiesConfig.getConsumerProperties(); properties.put("group.id", getGroupId()); Consumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(getTopics()); consumer.poll(0); // 把offset記錄到數據庫中 從指定的offset處消費 consumer.partitionsFor(getTopics()).stream().map(info -> new TopicPartition(getTopics(), info.partition())) .forEach(tp -> { consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition())); }); try { while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMinutes(KafkaConfig.pollTimeoutOfMinutes)); if (!consumerRecords.isEmpty()) { for (ConsumerRecord<String, String> record : consumerRecords) { KafkaMessage kafkaMessage = JSON.parseObject(record.value(), KafkaMessage.class); boolean handleResult = handle(kafkaMessage); if (handleResult) { //注意:提交的是下一條消息的位移。因此OffsetAndMetadata 對象時,必須使用當前消息位移加 1。 offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); // 細粒度控制提交 每10條提交一次offset if (count % 10 == 0) { // 異步提交offset consumer.commitAsync(offsets, (offsets, exception) -> { if (exception != null) { handleException(exception); } // 將消費位移再記錄一份到數據庫中 offsets.forEach((k, v) -> { String s = "insert into kafka_offset(`topic`,`group_id`,`partition_id`,`offset`) values" + " ('" + k.topic() + "','" + getGroupId() + "'," + k.partition() + "," + v.offset() + ")" + " on duplicate key update offset=values(offset);"; JdbcUtils.insertTable(s); }); }); } count++; } else { System.out.println("消費消息失敗 kafkaMessage={}" + getTopics() + getGroupId() + kafkaMessage.toString()); } } } } } catch (Exception e) { System.out.println("kafka consumer error:" + e.toString()); } finally { try { // 最後一次提交 使用同步提交offset consumer.commitSync(); } finally { consumer.close(); } } } /** * 具體的業務邏輯 * * @param kafkaMessage * @return */ public abstract boolean handle(KafkaMessage kafkaMessage); public abstract List<String> getTopics(); public abstract String getGroupId(); void handleException(Exception e) { //異常處理 } }
剛剛咱們說本身控制位移,使用seek方法能夠指定offset消費。那到底怎麼控制位移?怎麼重設消費組位移?seek是什麼?如今就來仔細說說。
並非全部的消息隊列均可以重設消費者組位移達到從新消費的目的。好比傳統的RabbitMq,它們處理消息是一次性的,即一旦消息被成功消費,就會被刪除。而Kafka消費消息是能夠重演的,由於它是基於日誌結構(log-based)的消息引擎,消費者在消費消息時,僅僅是從磁盤文件上讀取數據而已,因此消費者不會刪除消息數據。同時,因爲位移數據是由消費者控制的,所以它可以很容易地修改位移的值,實現重複消費歷史數據的功能。
瞭解如何重設位移是很重要的。假設這麼一個場景,我已經消費了1000條消息後,我發現處理邏輯錯了,因此我須要從新消費一下,但是位移已經提交了,我到底該怎麼從新消費這1000條呢??假設我想從某個時間點開始消費,我又該如何處理呢?
首先說個誤區:auto.offset.reset=earliest/latest
這個參數你們都很熟悉,可是初學者很容易誤會它。大部分朋友都以爲在任何狀況下把這兩個值設置爲earliest
或者latest
,消費者就能夠從最先或者最新的offset開始消費,但實際上並非那麼回事,他們生效都有一個前提條件,那就是對於同一個groupid的消費者,若是這個topic某個分區有已經提交的offset,那麼不管是把auto.offset.reset=earliest仍是latest,都將失效,消費者會從已經提交的offset開始消費。所以這個參數並不能解決用戶想重設消費位移的需求。
kafka有七種控制消費組消費offset的策略,主要分爲位移維度和時間維度,包括:
位移維度。這是指根據位移值來重設。也就是說,直接把消費者的位移值重設成咱們給定的位移值。包括Earliest/Latest/Current/Specified-Offset/Shift-By-N策略
時間維度。咱們能夠給定一個時間,讓消費者把位移調整成大於該時間的最小位移;也能夠給出一段時間間隔,好比 30 分鐘前,而後讓消費者直接將位移調回 30 分鐘以前的位移值。包括DateTime和Duration策略
說完了重設策略,咱們就來看一下具體應該如何實現,能夠從兩個角度,API方式和命令行方式。
API方式只要記住用seek方法就能夠了,包括seek,seekToBeginning 和 seekToEnd。
void seek(TopicPartition partition, long offset); void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata); void seekToBeginning(Collection<TopicPartition> partitions); void seekToEnd(Collection<TopicPartition> partitions);
從方法簽名咱們能夠看出seekToBeginning
和seekToEnd
是能夠一次性重設n個分區的位移,而seek 只容許重設指定分區的位移,即爲每一個分區都單獨設置位移,由於不可貴出,若是要自定義每一個分區的位移值則用seek,若是但願kafka幫你批量重設全部分區位移,好比從最新數據消費或者從最先數據消費,那麼用seekToEnd和seekToBeginning。
Earliest 策略:從最先的數據開始消費
從主題當前最先位移處開始消費,這個最先位移不必定就是 0 ,由於好久遠的消息會被 Kafka 自動刪除,主要取決於你的刪除配置。
代碼以下:
Properties properties = PropertiesConfig.getConsumerProperties(); properties.put("group.id", getGroupId()); Consumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(getTopics()); consumer.poll(0); consumer.seekToBeginning( consumer.partitionsFor(getTopics()).stream().map(partitionInfo -> new TopicPartition(getTopics(), partitionInfo.partition())) .collect(Collectors.toList()));
首先是構造consumer對象,這樣咱們能夠經過partitionsFor
獲取到分區的信息,而後咱們就能夠構造出TopicPartition
集合,傳給seekToBegining方法。須要注意的一個地方是:須要用consumer.poll(0)
,而不能用consumer.poll(Duration.ofMillis(0))
在poll(0)中consumer會一直阻塞直到它成功獲取了所需的元數據信息,以後它纔會發起fetch請求去獲取數據。而poll(Duration)會把元數據獲取也計入整個超時時間。因爲本例中使用的是0,即瞬時超時,所以consumer根本沒法在這麼短的時間內鏈接上coordinator,因此只能趕在超時前返回一個空集合。
Latest策略:從最新的數據開始消費
consumer.seekToEnd( consumer.partitionsFor(getTopics().get(0)).stream().map(partitionInfo -> new TopicPartition(getTopics().get(0), partitionInfo.partition())) .collect(Collectors.toList()));
Current策略:從當前已經提交的offset處消費
consumer.partitionsFor(getTopics().get(0)).stream().map(info -> new TopicPartition(getTopics().get(0), info.partition())) .forEach(tp -> { long committedOffset = consumer.committed(tp).offset(); consumer.seek(tp, committedOffset); });
**Special-offset策略:從指定的offset處消費 **
該策略使用的方法和current策略同樣,區別在於,current策略是直接從kafka元信息中讀取中已經提交的offset值,而special策略須要用戶本身爲每個分區指定offset值,咱們通常是把offset記錄到數據庫中而後能夠從數據庫去讀取這個值
consumer.partitionsFor(getTopics().get(0)).stream().map(info -> new TopicPartition(getTopics().get(0), info.partition())) .forEach(tp -> { try { consumer.seek(tp, JdbcUtils.queryOffset().get(tp.partition())); } catch (SQLException e) { e.printStackTrace(); } });
以上演示了用API方式重設位移,演示了四種常見策略的代碼,另外三種沒有演示,一方面是大同小異,另外一方面在實際生產中,用API的方式不太可能去作時間維度的重設,而基本都是用命令行方式。
命令行方式重設位移是經過 kafka-consumer-groups 腳本。比起 API 的方式,用命令行重設位移要簡單得多。
Earliest 策略指定–to-earliest。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
Latest 策略指定–to-latest。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
Current 策略指定–to-current。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
Specified-Offset 策略指定–to-offset。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
Shift-By-N 策略指定–shift-by N。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
DateTime 策略指定–to-datetime。
DateTime 容許你指定一個時間,而後將位移重置到該時間以後的最先位移處。常見的使用場景是,你想從新消費昨天的數據,那麼你可使用該策略重設位移到昨天 0 點。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
Duration 策略指定–by-duration。
Duration 策略則是指給定相對的時間間隔,而後將位移調整到距離當前給定時間間隔的位移處,具體格式是 PnDTnHnMnS
。若是你熟悉 Java 8 引入的 Duration 類的話,你應該不會對這個格式感到陌生。它就是一個符合 ISO-8601 規範的 Duration 格式,以字母 P 開頭,後面由 4 部分組成,即 D、H、M 和 S
,分別表示天、小時、分鐘和秒。舉個例子,若是你想將位移調回到 15 分鐘前,那麼你就能夠指定 PT0H15M0S
。
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
經過上面那幾部分的內容,咱們已經搞懂了位移提交的方方面面,那麼提交的位移它保存在哪裏呢?這就要去位移主題的的世界裏一探究竟了。kafka把位移保存在一個叫作__consumer_offsets的內部主題中,叫作位移主題。
注意:老版本的kafka實際上是把位移保存在zookeeper中的,可是zookeeper並不適合這種高頻寫的場景。因此新版本已是改進了這個方案,直接保存到kafka。畢竟kafka自己就適合高頻寫的場景,而且kafka也能夠保證高可用性和高持久性。
既然它也是主題,那麼離不開分區和副本這兩個機制。咱們並無手動建立這個主題而且指定,因此是kafka自動建立的, 分區的數量取決於Broker 端參數 offsets.topic.num.partitions,默認是50個分區,而副本參數取決於offsets.topic.replication.factor,默認是3。
既然也是主題,確定會有消息,那麼消息格式是什麼呢?參考前面咱們手動設計將位移寫入數據庫的方案,咱們保存了topic,group_id,partition,offset四個字段。topic,group_id,partition無疑是數據表中的聯合主鍵,而offset是不斷更新的。無疑kafka的位移主題消息也是相似這種設計。key也是那三個字段,而消息體其實很複雜,你能夠先簡單理解爲就是offset。
既然也是主題,確定也會有刪除策略,不然消息會無限膨脹。可是位移主題的刪除策略和其餘主題刪除策略又不太同樣。咱們知道普通主題的刪除是能夠經過配置刪除時間或者大小的。而位移主題的刪除,叫作 Compaction。Kafka 使用Compact 策略來刪除位移主題中的過時消息,對於同一個 Key 的兩條消息 M1 和 M2,若是 M1 的發送時間早於 M2,那麼 M1 就是過時消息。Compact 的過程就是掃描日誌的全部消息,剔除那些過時的消息,而後把剩下的消息整理在一塊兒。
Kafka 提供了專門的後臺線程按期地巡檢待 Compact 的主題,看看是否存在知足條件的可刪除數據。這個後臺線程叫 Log Cleaner。不少實際生產環境中都出現過位移主題無限膨脹佔用過多磁盤空間的問題,若是你的環境中也有這個問題,我建議你去檢查一下 Log Cleaner 線程的狀態,一般都是這個線程掛掉了致使的。
kafka的位移是個極其重要的概念,控制着消費進度,也即控制着消費的準確性,完整性,爲了保證消息不重複和不丟失。咱們最好作到如下幾點:
手動提交位移。
手動提交有異步提交和同步提交兩種方式,既然二者有利也有弊,那麼咱們能夠結合起來使用。
細粒度的控制消費位移的提交,這樣能夠避免重複消費的問題。
保守的將消費位移再記錄到了數據庫中,從新啓動消費端程序的時候從數據庫讀取位移。
獲取Kafka全套原創學習資料及思惟導圖,關注【胖滾豬學編程】公衆號,回覆"kafka"。
本文來源於公衆號:【胖滾豬學編程】。一枚集顏值與才華於一身,不算聰明卻足夠努力的女程序媛。用漫畫形式讓編程so easy and interesting!求關注!