本文講述一下如何自定義spring kafka的consumer線程池java
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.javaspring
protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } } Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, "A MessageListener is required"); if (messageListener instanceof GenericAcknowledgingMessageListener) { this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener; } else if (messageListener instanceof GenericMessageListener) { this.listener = (GenericMessageListener<?>) messageListener; } else { throw new IllegalStateException("messageListener must be 'MessageListener' " + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName()); } if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } if (containerProperties.getListenerTaskExecutor() == null) { SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-L-"); containerProperties.setListenerTaskExecutor(listenerExecutor); } this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); setRunning(true); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); }
這裏涉及到了兩個線程池,一個是ConsumerTaskExecutor,一個是ListenerTaskExecutor,都在containerProperties裏頭配置
若是沒有默認配置,則分別建立帶"-C-"和"-L-"的SimpleAsyncTaskExecutor
ConsumerTaskExecutor用來去poll kafka消息
ListenerTaskExecutor用來調用@KafkaListener標註的方法ide
自定義executor,將其託管給spring容器的好處就是能夠跟隨容器的生命週期,在容器銷燬以前優雅關閉線程池ui
能夠重寫spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java的initializeContainer方法,而後進行設置this
public class CustomConcurrentKafkaListenerContainerFactory<K,V> extends ConcurrentKafkaListenerContainerFactory<K,V> { /** * The executor for threads that poll the consumer. */ private AsyncListenableTaskExecutor consumerTaskExecutor; /** * The executor for threads that invoke the listener. */ private AsyncListenableTaskExecutor listenerTaskExecutor; public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor, AsyncListenableTaskExecutor listenerTaskExecutor) { this.consumerTaskExecutor = consumerTaskExecutor; this.listenerTaskExecutor = listenerTaskExecutor; } @Override protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) { super.initializeContainer(instance); instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor); instance.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor); } }
應用自定義kafkaListenerContainerFactory,替換爲本身擴展的ConcurrentKafkaListenerContainerFactory便可。線程
@Configuration @AutoConfigureBefore(KafkaAutoConfiguration.class) public class KafkaExecutorConfig { @Bean(name = "consumerTaskExecutor") public ThreadPoolTaskExecutor consumerTaskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); executor.setMaxPoolSize(5); executor.setQueueCapacity(100); executor.setThreadNamePrefix("my-C-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; } @Bean(name = "listenerTaskExecutor") public ThreadPoolTaskExecutor listenerTaskExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(10); executor.setQueueCapacity(100); executor.setThreadNamePrefix("my-L-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; } @Bean("kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor(),listenerTaskExecutor()); configurer.configure(factory, kafkaConsumerFactory); return factory; } }
這樣就大功告成了code