kafka多容器工廠反序列化kafkaListenerContainerFactory

原創java

業務須要,批量消費,可是又想批量直接按List<ModelDTO> 模式直接拉取數據,批量處理。spring

 

private final String topic = "queue_notify";

@KafkaListener(topics = topic ,containerFactory = "kafkaLiveListenerContainerFactory")
public void listen(List<PushLiveDTO> pushLiveDTOS) {
    Long startTime = System.currentTimeMillis();
    //批量個推
    pushService.notifyLiveGetui (pushLiveDTOS);
    Long endTime = System.currentTimeMillis();
}

 

private final String topic = "queue_push";
    //containerFactory  容器工廠方法
    @KafkaListener(topics = topic,containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<PushTestDTO> mallPushMongoDBS) {
        pushService.saveNoPush(mallPushMongoDBS);
    }

如上代碼apache

批量拉取不一樣topic獲取到list的DTO,進行處理。json

 

 

package com.dg.mall.push.config;

import com.dg.mall.push.kafka.PushJsonDeserializer;
import com.dg.mall.push.kafka.LiveJsonDeserializer;
import com.dg.mall.push.kafka.listen.LiveNotifyListener;
import com.dg.mall.push.kafka.listen.PushListener;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {


    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;
    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;
    @Value("${spring.kafka.consumer.max-consumer-number}")
    private Integer maxConsumerNumber;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        //併發數量
        factory.setConcurrency(concurrency);
        //批量獲取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, byte[]> consumerFactory() {
        return new DefaultKafkaConsumerFactory(consumerConfigs(),new StringDeserializer(),new pushJsonDeserializer());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //   //這裏是反序列化的 pushJsonDeserializer 
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, pushJsonDeserializer.class);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,40000);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //最多批量獲取100        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxConsumerNumber);
        return propsMap;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaLiveListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(liveConsumerFactory());
        //併發數量
        factory.setConcurrency(concurrency);
        //批量獲取
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, byte[]> liveConsumerFactory() {
        return new DefaultKafkaConsumerFactory(liveConsumerConfigs(),new StringDeserializer(),new PushLiveJsonDeserializer());
    }


    public Map<String, Object> liveConsumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //這裏是反序列化的liveJsonDeserializer
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, liveJsonDeserializer.class);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,40000);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        //最多批量獲取100        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,maxConsumerNumber);
        return propsMap;
    }

    @Bean
    public PushListener listener() {
        return new PushListener();
    }


    @Bean
    public NotifyListener livelistener() {
        return new NotifyListener();
    }



}
//反序列化
package com.dg.mall.push.kafka;

import com.dg.mall.push.model.message.PushLiveDTO;
import com.gexin.fastjson.JSON;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class LiveJsonDeserializer implements Deserializer<PushLiveDTO> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public PushLiveDTO deserialize(String topic, byte[] data) {
        return JSON.parseObject(data, PushLiveDTO.class);
    }

    @Override
    public void close() {

    }

}
//反序列化
package com.dg.mall.push.kafka;

import com.dg.mall.push.model.message.MallPushMongoDB;
import com.gexin.fastjson.JSON;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class MallPushJsonSerializer implements Serializer<MallPushMongoDB> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, MallPushMongoDB data) {
        return JSON.toJSONBytes(data);
    }

    @Override
    public void close() {

    }


}

 

總結 消費 bootstrap

@KafkaListener(topics = topic ,containerFactory = "kafkaLiveListenerContainerFactory")

監聽消費必定要加containerFactory 對應 容器工廠類 ,容器工廠類裏面有個反序列化,須要替換,通常都是String反序列化,這裏咱們替換成咱們本身建立的DTO,在來進行反序列化LiveJsonDeserializer 。併發

最後的最後監聽消費批量獲取的時候ide

數據就這樣所有獲取到了。ui

end!spa

相關文章
相關標籤/搜索