本文主要解析一下spring for kafka對原生的kafka client consumer的封裝與集成。java
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaConsumerFactory.javaspring
protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) { if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) { return createKafkaConsumer(); } else { Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs); modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix); return createKafkaConsumer(modifiedClientIdConfigs); } } protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) { return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer); }
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java安全
關於consumer的主要的封裝在ConcurrentKafkaListenerContainerFactory這個裏頭,自己的KafkaConsumer是線程不安全的,沒法併發操做,這裏spring又在包裝了一層,根據配置的spring.kafka.listener.concurrency來生成多個併發的KafkaMessageListenerContainer實例
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java併發
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> { private final ConsumerFactory<K, V> consumerFactory; private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>(); @Override protected void doStart() { if (!isRunning()) { ContainerProperties containerProperties = getContainerProperties(); TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); if (topicPartitions != null && this.concurrency > topicPartitions.length) { this.logger.warn("When specific partitions are provided, the concurrency must be less than or " + "equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length); this.concurrency = topicPartitions.length; } setRunning(true); for (int i = 0; i < this.concurrency; i++) { KafkaMessageListenerContainer<K, V> container; if (topicPartitions == null) { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); } else { container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, partitionSubset(containerProperties, i)); } if (getBeanName() != null) { container.setBeanName(getBeanName() + "-" + i); } if (getApplicationEventPublisher() != null) { container.setApplicationEventPublisher(getApplicationEventPublisher()); } container.setClientIdSuffix("-" + i); container.start(); this.containers.add(container); } } } //...... }
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.javaapp
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> { private final ConsumerFactory<K, V> consumerFactory; private final TopicPartitionInitialOffset[] topicPartitions; private ListenerConsumer listenerConsumer; private ListenableFuture<?> listenerConsumerFuture; private GenericMessageListener<?> listener; private GenericAcknowledgingMessageListener<?> acknowledgingMessageListener; private String clientIdSuffix; @Override protected void doStart() { if (isRunning()) { return; } ContainerProperties containerProperties = getContainerProperties(); if (!this.consumerFactory.isAutoCommit()) { AckMode ackMode = containerProperties.getAckMode(); if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); } if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) && containerProperties.getAckTime() == 0) { containerProperties.setAckTime(5000); } } Object messageListener = containerProperties.getMessageListener(); Assert.state(messageListener != null, "A MessageListener is required"); if (messageListener instanceof GenericAcknowledgingMessageListener) { this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener; } else if (messageListener instanceof GenericMessageListener) { this.listener = (GenericMessageListener<?>) messageListener; } else { throw new IllegalStateException("messageListener must be 'MessageListener' " + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName()); } if (containerProperties.getConsumerTaskExecutor() == null) { SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-C-"); containerProperties.setConsumerTaskExecutor(consumerExecutor); } if (containerProperties.getListenerTaskExecutor() == null) { SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor( (getBeanName() == null ? "" : getBeanName()) + "-L-"); containerProperties.setListenerTaskExecutor(listenerExecutor); } this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); setRunning(true); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer); } @Override protected void doStop(final Runnable callback) { if (isRunning()) { this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() { @Override public void onFailure(Throwable e) { KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", e); if (callback != null) { callback.run(); } } @Override public void onSuccess(Object result) { if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) { KafkaMessageListenerContainer.this.logger .debug(KafkaMessageListenerContainer.this + " stopped normally"); } if (callback != null) { callback.run(); } } }); setRunning(false); this.listenerConsumer.consumer.wakeup(); } } //...... }
每一個KafkaMessageListenerContainer都本身建立一個ListenerConsumer,而後本身建立一個獨立的kafka consumer,每一個ListenerConsumer在線程池裏頭運行,這樣來實現併發。less
每一個ListenerConsumer裏頭都有一個recordsToProcess隊列,從原始的kafka consumer poll出來的記錄會放到這個隊列裏頭,而後有一個ListenerInvoker線程循環超時等待從recordsToProcess取出記錄,而後交給應用程序的KafkaListener標註的方法去執行ide
private final class ListenerInvoker implements SchedulingAwareRunnable { private final CountDownLatch exitLatch = new CountDownLatch(1); private volatile boolean active = true; private volatile Thread executingThread; ListenerInvoker() { super(); } @Override public void run() { Assert.isTrue(this.active, "This instance is not active anymore"); if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) { ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this); } try { this.executingThread = Thread.currentThread(); while (this.active) { try { ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1, TimeUnit.SECONDS); if (this.active) { if (records != null) { invokeListener(records); } else { if (ListenerConsumer.this.logger.isTraceEnabled()) { ListenerConsumer.this.logger.trace("No records to process"); } } } } catch (InterruptedException e) { if (!this.active) { Thread.currentThread().interrupt(); } else { ListenerConsumer.this.logger.debug("Interrupt ignored"); } } } } finally { this.active = false; this.exitLatch.countDown(); } } @Override public boolean isLongLived() { return true; } private void stop() { if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Stopping invoker"); } this.active = false; try { if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS) && this.executingThread != null) { if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Interrupting invoker"); } this.executingThread.interrupt(); } } catch (InterruptedException e) { if (this.executingThread != null) { this.executingThread.interrupt(); } Thread.currentThread().interrupt(); } if (ListenerConsumer.this.logger.isDebugEnabled()) { ListenerConsumer.this.logger.debug("Invoker stopped"); } } }
這裏的invokeListener就是調用listener的onMessage方法post
這裏咱們來看看,標註KafkaListener的方法,最後是怎麼包裝成ListenerInvoker這個類裏頭調用的listener的ui
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
這個類會掃描bean的KafkaListener註解,而後將其信息註冊到KafkaListenerEndpointRegistrarthis
@Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List<Method> multiMethods = new ArrayList<Method>(); Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() { @Override public Set<KafkaListener> inspect(Method method) { Set<KafkaListener> listenerMethods = findListenerAnnotations(method); return (!listenerMethods.isEmpty() ? listenerMethods : null); } }); if (hasClassLevelListeners) { Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, new ReflectionUtils.MethodFilter() { @Override public boolean matches(Method method) { return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null; } }); multiMethods.addAll(methodsWithHandler); } if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); if (this.logger.isTraceEnabled()) { this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass()); } } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } if (this.logger.isDebugEnabled()) { this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } } if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; } protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>(); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, kafkaListener, bean, methodToUse, beanName); } protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); String group = kafkaListener.containerGroup(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } KafkaListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(kafkaListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } this.registrar.registerEndpoint(endpoint, factory); }
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
/** * Register a new {@link KafkaListenerEndpoint} alongside the * {@link KafkaListenerContainerFactory} to use to create the underlying container. * <p>The {@code factory} may be {@code null} if the default factory has to be * used for that endpoint. * @param endpoint the {@link KafkaListenerEndpoint} instance to register. * @param factory the {@link KafkaListenerContainerFactory} to use. */ public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the container KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { this.endpointDescriptors.add(descriptor); } } }
這裏將KafkaListenerEndpoint包裝爲KafkaListenerEndpointDescriptor,註冊到名爲endpointDescriptors的KafkaListenerEndpointDescriptor集合中
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean { @Override public void afterPropertiesSet() { registerAllEndpoints(); } protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } } }
這個類實現了InitializingBean接口的afterPropertiesSet方法(
初始化bean的時候執行
),在這個裏頭去根據endpointDescriptors去挨個調用registerListenerContainer註冊
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { protected final Log logger = LogFactory.getLog(getClass()); //NOSONAR private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<String, MessageListenerContainer>(); //...... public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } if (startImmediately) { startIfNecessary(container); } } } protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer; } @Override public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } } @Override public void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); } } }
註冊的時候將endpoint轉換爲MessageListenerContainer,而後放到listenerContainers的map當中
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
public C createListenerContainer(KafkaListenerEndpoint endpoint) { C instance = createContainerInstance(endpoint); if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } if (this.phase != null) { instance.setPhase(this.phase); } if (this.applicationEventPublisher != null) { instance.setApplicationEventPublisher(this.applicationEventPublisher); } if (endpoint.getId() != null) { instance.setBeanName(endpoint.getId()); } if (endpoint instanceof AbstractKafkaListenerEndpoint) { AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint; if (this.recordFilterStrategy != null) { aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy); } if (this.ackDiscarded != null) { aklEndpoint.setAckDiscarded(this.ackDiscarded); } if (this.retryTemplate != null) { aklEndpoint.setRetryTemplate(this.retryTemplate); } if (this.recoveryCallback != null) { aklEndpoint.setRecoveryCallback(this.recoveryCallback); } if (this.batchListener != null) { aklEndpoint.setBatchListener(this.batchListener); } } endpoint.setupListenerContainer(instance, this.messageConverter); initializeContainer(instance); return instance; }
這裏主要看這個setupMessageListener方法
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { Object messageListener = createMessageListener(container, messageConverter); Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener"); if (this.retryTemplate != null) { if (messageListener instanceof AcknowledgingMessageListener) { messageListener = new RetryingAcknowledgingMessageListenerAdapter<>( (AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate, this.recoveryCallback); } else { messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener, this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback); } } if (this.recordFilterStrategy != null) { if (messageListener instanceof AcknowledgingMessageListener) { messageListener = new FilteringAcknowledgingMessageListenerAdapter<>( (AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded); } else if (messageListener instanceof MessageListener) { messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener, this.recordFilterStrategy); } else if (messageListener instanceof BatchAcknowledgingMessageListener) { messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>( (BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy, this.ackDiscarded); } else if (messageListener instanceof BatchMessageListener) { messageListener = new FilteringBatchMessageListenerAdapter<>( (BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy); } } container.setupMessageListener(messageListener); }
這個messageListener包含了原始endpoint攜帶的bean以及method轉換成的InvocableHandlerMethod,而後注入到MessageListenerContainer(
ConcurrentMessageListenerContainer
),而後這裏就跟上頭的ConcurrentMessageListenerContainer這個銜接上,根據配置的spring.kafka.listener.concurrency來生成多個併發的KafkaMessageListenerContainer實例
createMessageListener這個方法將endpoint包含的原始標註KafkaListener註解的bean以及方法,包裝爲InvocableHandlerMethod,注入到MessagingMessageListenerAdapter當中
public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> { private Object bean; private Method method; private MessageHandlerMethodFactory messageHandlerMethodFactory; //...... @Override protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container, MessageConverter messageConverter) { Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); return messageListener; } /** * Create a {@link HandlerAdapter} for this listener adapter. * @param messageListener the listener adapter. * @return the handler adapter. */ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) { InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod()); return new HandlerAdapter(invocableHandlerMethod); } }
這個類會將原始的bean跟方法包裝爲InvocableHandlerMethod這個類,而後注入到MessagingMessageListenerAdapter當中
對於消費者來講,因爲spring是採用註解的形式去標註消息處理方法的,因此這裏稍微費勁一點:
即KafkaListener註解標準的方法
)ListenerConsumer是重點,裏頭還有包括offset的提交,這裏改天再詳解一下。