自定義spring kafka consumer 線程池

本文講述一下如何自定義spring kafka的consumer線程池java

KafkaMessageListenerContainer

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.javaspring

protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();

        if (!this.consumerFactory.isAutoCommit()) {
            AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                    && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000);
            }
        }

        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (messageListener instanceof GenericAcknowledgingMessageListener) {
            this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
        }
        else if (messageListener instanceof GenericMessageListener) {
            this.listener = (GenericMessageListener<?>) messageListener;
        }
        else {
            throw new IllegalStateException("messageListener must be 'MessageListener' "
                    + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
        }
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        if (containerProperties.getListenerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-L-");
            containerProperties.setListenerTaskExecutor(listenerExecutor);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties
                .getConsumerTaskExecutor()
                .submitListenable(this.listenerConsumer);
    }

這裏涉及到了兩個線程池,一個是ConsumerTaskExecutor,一個是ListenerTaskExecutor,都在containerProperties裏頭配置
若是沒有默認配置,則分別建立帶"-C-"和"-L-"的SimpleAsyncTaskExecutor
ConsumerTaskExecutor用來去poll kafka消息
ListenerTaskExecutor用來調用@KafkaListener標註的方法ide

自定義

自定義executor,將其託管給spring容器的好處就是能夠跟隨容器的生命週期,在容器銷燬以前優雅關閉線程池ui

擴展ConcurrentKafkaListenerContainerFactory

能夠重寫spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java的initializeContainer方法,而後進行設置this

public class CustomConcurrentKafkaListenerContainerFactory<K,V> extends ConcurrentKafkaListenerContainerFactory<K,V> {

    /**
     * The executor for threads that poll the consumer.
     */
    private AsyncListenableTaskExecutor consumerTaskExecutor;

    /**
     * The executor for threads that invoke the listener.
     */
    private AsyncListenableTaskExecutor listenerTaskExecutor;

    public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor, AsyncListenableTaskExecutor listenerTaskExecutor) {
        this.consumerTaskExecutor = consumerTaskExecutor;
        this.listenerTaskExecutor = listenerTaskExecutor;
    }

    @Override
    protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {
        super.initializeContainer(instance);
        instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor);
        instance.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor);
    }
}

設置

應用自定義kafkaListenerContainerFactory,替換爲本身擴展的ConcurrentKafkaListenerContainerFactory便可。線程

@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class KafkaExecutorConfig {

    @Bean(name = "consumerTaskExecutor")
    public ThreadPoolTaskExecutor consumerTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("my-C-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean(name = "listenerTaskExecutor")
    public ThreadPoolTaskExecutor listenerTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("my-L-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor(),listenerTaskExecutor());
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }
}

這樣就大功告成了code

相關文章
相關標籤/搜索