springboot手動提交kafka offset

enable.auto.commit參數設置成了false

可是測試發現enable.auto.commit參數設置成了false,kafka的offset依然提交了(也沒有進行人工提交offset)。spring

查看源碼springboot

若是咱們enable.auto.commit設置爲false,那麼就會走標紅的if語句。並且下面有個stopInvokerAndCommitManualAcks()方法,看名字就知道是人工提交的意思。那麼咱們進去stopInvokerAndCommitManualAcks()方法瞅瞅。 測試

如上圖所示有個processCommits()方法,那麼繼續追進去: spa

單單看標紅的方法是否是就知道這方法裏面是更新offset和提交offset的方法。那麼咱們繼續追進去:code


結論:若是咱們把enable.auto.commit參數設置成true。那麼offset交給kafka來管理,offset進行默認的提交模式。 
enable.auto.commit參數設置成false。那麼就是Spring來替爲咱們作人工提交,從而簡化了人工提交的方式。 
因此kafka和springboot結合中的enable.auto.commit爲false爲spring的人工提交模式。enable.auto.commit爲true是採用kafka的默認提交模式。 blog

手動提交

spring.kafka.consumer.enable-auto-commit設置爲false,設置AckMode的值get

/**
     * The offset commit behavior enumeration.
     */
    public enum AckMode {

        /**
         * Commit after each record is processed by the listener.
         */
        RECORD,

        /**
         * Commit whatever has already been processed before the next poll.
         */
        BATCH,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
         */
        TIME,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded.
         */
        COUNT,

        /**
         * Commit pending updates after
         * {@link ContainerProperties#setAckCount(int) ackCount} has been
         * exceeded or after {@link ContainerProperties#setAckTime(long)
         * ackTime} has elapsed.
         */
        COUNT_TIME,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}.
         */
        MANUAL,

        /**
         * User takes responsibility for acks using an
         * {@link AcknowledgingMessageListener}. The consumer is woken to
  • RECORD
    每處理一條commit一次
  • BATCH(默認)
    每次poll的時候批量提交一次,頻率取決於每次poll的調用頻率
  • TIME 
    每次間隔ackTime的時間去commit
  • COUNT 
    累積達到ackCount次的ack去commit
  • COUNT_TIME
    ackTime或ackCount哪一個條件先知足,就commit
  • MANUAL
    listener負責ack,可是背後也是批量上去
  • MANUAL_IMMEDIATE
    listner負責ack,每調用一次,就當即commit

manual commit

@KafkaListener(topics = "k010")
    public void listen(ConsumerRecord<?, ?> cr,Acknowledgment ack) throws Exception {
        LOGGER.info(cr.toString());
        ack.acknowledge();
    }

方法參數裏頭傳遞Acknowledgment,而後手工ackkafka

若是隻添加上面語句會報錯:源碼

the listener container must have a MANUAL Ackmode to populate the Acknowledgment

咱們要配置AckMode爲MANUAL Ackmodeit

factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
相關文章
相關標籤/搜索