最近遇到一個springboot整合kafka設置手動提交不生效的問題,後來發現是本身的方法不對,走了一些彎路,這裏記錄一下。java
新建一個topic,topic名是 spring-kafka-demo4
,以下:spring
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic spring-kafka-demo3
topic設置了兩個分區。springboot
消費者工程的設置以下:工具
spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-commit-interval=100
消費的邏輯使用springboot註解,以下:源碼分析
public class KafkaReceiver { @KafkaListener(clientIdPrefix = "consumer-1", topics = {"spring-kafka-demo4"}) public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("receive ------------------ message =" + message); } } }
我啓動一個生產者生產了5條消息,而後啓動消費者。從日誌上看消費正常,而後我使用kafka-consumer-groups.sh
工具查看了一下消費的狀況,以下:spa
從圖上看不對呀,不是設置了不自動提交offset嗎? 個人消費者邏輯裏也沒有手動提交的代碼,爲啥看到的兩個分區消費者提交了offset呢?日誌
爲了防止有些人不明白,我簡單對每列進行說明:code
後來查了一些資料後,發現還須要設置 ack-mode = manual
才能夠,完整的設置以下:blog
spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-commit-interval=100 spring.kafka.listener.ack-mode=manual
而後咱們再從新生產5條新的消息,而後一樣的邏輯消費,再看下消費的狀況:圖片
此次就正常了,能夠看到消費者在兩個分區上都沒有提交offset。
那麼既然咱們設置了手動提交,如何在代碼中手動提交呢?其實也很是簡單,
@KafkaListener(clientIdPrefix = "consumer-1", topics = {"spring-kafka-demo4"}) public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("receive ------------------ message =" + message); } acknowledgment.acknowledge(); }
如今咱們從源碼層面分析下,爲啥只設置enable-auto-commit=false
時,spring默認自動提交了offset。
咱們在依賴包里加入斷點分析下。
注意到 isManualAck
, isAnyManualAck
以及 isManualImmediateAck
這三個變量,當配置文件配置 ack-mode=manual
時,前兩個變量是true,最後一個是false。
繼續往下看,
processCommit方法裏有個 updatePendingOffsets
方法,這個方法就是用來提交offset的。很明顯當isManualAck
是true,isManualImmediateAck
是false時, 這個方法不會執行。