Spring Boot 中使用@KafkaListener批量接收消息ack

以前介紹瞭如何在SpringBoot中集成Kafka,可是默認狀況下,@KafkaListener都是一條一條消費,若是想要一次消費一個批量的話,咱們都知道,在kafka原生的API能夠經過poll(num)來獲取一次獲取num條消息:

那麼使用在Springboot中使用@KafkaListener可否實現批量監聽呢?
看了spring-kafka的官方文檔介紹,能夠知道自1.1版本以後,@KafkaListener開始支持批量消費,只須要設置batchListener參數爲true
https://docs.spring.io/spring-kafka/reference/html/_reference.html

下面是我在項目中使用到的方法:html

 

 

1spring

2數據庫

3bootstrap

4app

5.net

6htm

7文檔

8get

9kafka

10

@Bean

    public KafkaListenerContainerFactory<?> batchFactory() {

        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        factory.setConcurrency(1);

        factory.setBatchListener(true);//設置爲批量消費,每一個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG

        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);//設置提交偏移量的方式

        return factory;

    }

 

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

@Bean

    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//每一批數量

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

        return props;

    }

開始監聽,批量消費後採用JPA的方式批量寫入數據庫,這裏containerFactory = 「batchFactory」要指定爲批量消費

 

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

@KafkaListener(topics = "${tgs.kafka.topics}", containerFactory = "batchFactory")

    public void listen(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {

        logger.info("records.size: " + records.size() + " in all");

        List<B_ZDRYGK_ZDRYXX_FJ_HCB1> list = new ArrayList<B_ZDRYGK_ZDRYXX_FJ_HCB1>();

        ObjectMapper mapper = new ObjectMapper();

        B_ZDRYGK_ZDRYXX_FJ_HCB1 b_zdrygk_zdryxx_fj_hcb1 =null;

        for (ConsumerRecord<?, ?> record : records) {

            try {

                b_zdrygk_zdryxx_fj_hcb1 = mapper.readValue(record.value().toString(), B_ZDRYGK_ZDRYXX_FJ_HCB1.class);

            } catch (IOException e) {

                e.printStackTrace();

            }

            if (null != b_zdrygk_zdryxx_fj_hcb1) {

                list.add(b_zdrygk_zdryxx_fj_hcb1);

            }

        }

        try {

            List<B_ZDRYGK_ZDRYXX_FJ_HCB1> hcb1List = b_ZDRYGK_ZDRYXX_FJ_HCB1Repository.save(list);

            b_ZDRYGK_ZDRYXX_FJ_HCB1Repository.flush();

            logger.info("flush size: " + hcb1List.size());

        } catch (Exception e) {

            e.printStackTrace();

        }finally{

          logger.info("start commit offset");

          ack.acknowledge();//手動提交偏移量

          logger.info("stop commit offset");

     }

}

 

運行結果以下圖,每批消費100條消息:

相關文章
相關標籤/搜索