Kafka——SpringBoot整合(消費者位移的提交)

消費者位移的提交方式以及提交時機須要根據不一樣的業務場景進行選擇,能夠看以前的博客kafka消費者相關。 這裏只作應用相關,更多的使用場景,該怎麼用、什麼時候用要看前面的博客瞭解原理。java

參考博客:https://blog.csdn.net/yy756127197/article/details/103895810bootstrap

自動提交偏移量

// 自動提交偏移量
        // 若是設置成true,偏移量由auto.commit.interval.ms控制自動提交的頻率
        // 若是設置成false,不須要定時的提交offset,能夠本身控制offset,當消息認爲已消費過了,這個時候再去提交它們的偏移量。
        // 這個頗有用的,當消費的消息結合了一些處理邏輯,這個消息就不該該認爲是已經消費的,直到它完成了整個處理。
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自動提交的頻率
        configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

手動提交偏移量

主要步驟: 1.消費者配置 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 「false」); 2.消費者配置ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); 3.消費者手動提交 consumer.commitSync();異步

ConsumerConfig

@Configuration
@EnableKafka
public class ManualConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.topic.manual}")
    private String topic;

    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-group");
        // 手動提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
        // ack模式,詳細見下文註釋
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }

    /**
     * AckMode針對ENABLE_AUTO_COMMIT_CONFIG=false時生效,有如下幾種:
     *
     * RECORD
     * 每處理一條commit一次
     *
     * BATCH(默認)
     * 每次poll的時候批量提交一次,頻率取決於每次poll的調用頻率
     *
     * TIME
     * 每次間隔ackTime的時間去commit(跟auto commit interval有什麼區別呢?)
     *
     * COUNT
     * 累積達到ackCount次的ack去commit
     *
     * COUNT_TIME
     * ackTime或ackCount哪一個條件先知足,就commit
     *
     * MANUAL
     * listener負責ack,可是背後也是批量上去
     *
     * MANUAL_IMMEDIATE
     * listner負責ack,每調用一次,就當即commit
     *
     */

}

Consumer

@Component
@Slf4j
public class ManualConsumer {

    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void receive(@Payload String message,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        Consumer consumer,
                        Acknowledgment ack) {
        System.out.println(String.format("From partition %d : %s", partition, message));
        // 同步提交
        consumer.commitSync();

        // ack這種方式提交也能夠
        // ack.acknowledge();
    }

    /**
     * commitSync和commitAsync組合使用
     * <p>
     * 手工提交異步 consumer.commitAsync();
     * 手工同步提交 consumer.commitSync()
     * <p>
     * commitSync()方法提交最後一個偏移量。在成功提交或碰到無怯恢復的錯誤以前,
     * commitSync()會一直重試,可是commitAsync()不會。
     * <p>
     * 通常狀況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,由於若是提交失敗是由於臨時問題致使的,
     * 那麼後續的提交總會有成功的。但若是這是發生在關閉消費者或再均衡前的最後一次提交,就要確保可以提交成功。
     * 所以,在消費者關閉前通常會組合使用commitAsync()和commitSync()。
     */
//    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void manual(@Payload String message,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       Consumer consumer,
                       Acknowledgment ack) {
        try {
            System.out.println(String.format("From partition %d : %s", partition, message));
            // 同步提交
            consumer.commitSync();
        } catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

    }


    /**
     * 手動提交,指定偏移量
     *
     * @param record
     * @param consumer
     */
//    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void offset(ConsumerRecord record, Consumer consumer) {
        System.out.println(String.format("From partition %d : %s", record.partition(), record.value()));

        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
        currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(currentOffset);
    }
    
}
相關文章
相關標籤/搜索