系列目錄html
kafka原理和實踐(二)spring-kafka簡單實踐git
kafka原理和實踐(三)spring-kafka生產者源碼github
kafka原理和實踐(四)spring-kafka消費者源碼spring
kafka原理和實踐(五)spring-kafka配置詳解apache
kafka原理和實踐(六)總結昇華bootstrap
==============正文分割線=====================數組
如上圖所示,spring-kafka消費者模型主要流程:緩存
1.容器啓動,輪詢執行消費。安全
2.kafkaConsumer拉取消息流程:
1)Fetcher請求獲取器獲取請求並存儲在unset中
2)ConsumerNetworkClient網絡客戶端執行poll(),調用NetWlrikClient的send()方法從unset中獲取ClientRequest請求轉成RequestSend最終塞進Selector的KafkaChannel通道中,Seletcor.send()從kafka集羣拉取待消費數據ConsumerRecords
3. 消費者監聽器MessageListener.onMessage()執行用戶自定義的實際消費業務邏輯。
1 @SuppressWarnings("unchecked") 2 private KafkaConsumer(ConsumerConfig config, 3 Deserializer<K> keyDeserializer, 4 Deserializer<V> valueDeserializer) { 5 try { 6 log.debug("Starting the Kafka consumer"); 7 this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); 8 int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG); 9 int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); 10 if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs) 11 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); 12 this.time = new SystemTime(); 13 14 String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG); 15 if (clientId.length() <= 0) 16 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); 17 this.clientId = clientId; 18 Map<String, String> metricsTags = new LinkedHashMap<>(); 19 metricsTags.put("client-id", clientId); 20 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) 21 .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) 22 .tags(metricsTags); 23 List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 24 MetricsReporter.class); 25 reporters.add(new JmxReporter(JMX_PREFIX)); 26 this.metrics = new Metrics(metricConfig, reporters, time); 27 this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); 28 29 // load interceptors and make sure they get clientId 30 Map<String, Object> userProvidedConfigs = config.originals(); 31 userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); 32 List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 33 ConsumerInterceptor.class); 34 this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList); 35 if (keyDeserializer == null) { 36 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 37 Deserializer.class); 38 this.keyDeserializer.configure(config.originals(), true); 39 } else { 40 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); 41 this.keyDeserializer = keyDeserializer; 42 } 43 if (valueDeserializer == null) { 44 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 45 Deserializer.class); 46 this.valueDeserializer.configure(config.originals(), false); 47 } else { 48 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); 49 this.valueDeserializer = valueDeserializer; 50 } 51 ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); 52 this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners); 53 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); 54 this.metadata.update(Cluster.bootstrap(addresses), 0); 55 String metricGrpPrefix = "consumer"; 56 ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); 57 NetworkClient netClient = new NetworkClient( 58 new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), 59 this.metadata, 60 clientId, 61 100, // a fixed large enough value will suffice 62 config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), 63 config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), 64 config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 65 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); 66 this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, 67 config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); 68 OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); 69 this.subscriptions = new SubscriptionState(offsetResetStrategy); 70 List<PartitionAssignor> assignors = config.getConfiguredInstances( 71 ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 72 PartitionAssignor.class); 73 this.coordinator = new ConsumerCoordinator(this.client, 74 config.getString(ConsumerConfig.GROUP_ID_CONFIG), 75 config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), 76 config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 77 config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 78 assignors, 79 this.metadata, 80 this.subscriptions, 81 metrics, 82 metricGrpPrefix, 83 this.time, 84 retryBackoffMs, 85 new ConsumerCoordinator.DefaultOffsetCommitCallback(), 86 config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), 87 config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), 88 this.interceptors, 89 config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); 90 this.fetcher = new Fetcher<>(this.client, 91 config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), 92 config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), 93 config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG), 94 config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG), 95 config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 96 config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG), 97 this.keyDeserializer, 98 this.valueDeserializer, 99 this.metadata, 100 this.subscriptions, 101 metrics, 102 metricGrpPrefix, 103 this.time, 104 this.retryBackoffMs); 105 106 config.logUnused(); 107 AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); 108 109 log.debug("Kafka consumer created"); 110 } catch (Throwable t) { 111 // call close methods if internal objects are already constructed 112 // this is to prevent resource leak. see KAFKA-2121 113 close(true); 114 // now propagate the exception 115 throw new KafkaException("Failed to construct kafka consumer", t); 116 } 117 }
從KafkaConsumer構造函數來看,核心組件有:
1.Metadata:封裝了元數據的一些邏輯的類。元數據僅保留一個主題的子集,隨着時間的推移能夠添加。當咱們請求一個主題的元數據時,咱們沒有任何元數據會觸發元數據更新。若是對元數據啓用了主題過時,那麼在更新以後,在過時時間間隔內未使用的任何主題都將從元數據刷新集中刪除。
2.ConsumerNetworkClient:高等級消費者訪問網絡層,爲請求Future任務提供基本支持。這個類是線程安全的,可是不提供響應回調的同步。這保證在調用它們時不會持有鎖。
3.SubscriptionState:訂閱的TopicPartition的offset狀態維護
4.ConsumerCoordinator:消費者的協調者,負責partitiion的分配,reblance
5.Fetcher:從brokers上按照配置獲取消息。
kafka消費者有兩種常見的實現方式:
1.xml配置文件
2.基於註解實現
其實,無論哪一種方式,本質只是生成Spring Bean的方式不一樣而已。咱們就以xml的實現方式來追蹤源碼。
基於xml的整體配置以下:
1 <!-- 1.定義consumer的參數 -->
2 <bean id="consumerProperties" class="java.util.HashMap"> 3 <constructor-arg> 4 <map> 5 <entry key="bootstrap.servers" value="${bootstrap.servers}" /> 6 <entry key="group.id" value="${group.id}" /> 7 <entry key="enable.auto.commit" value="${enable.auto.commit}" /> 8 <entry key="session.timeout.ms" value="${session.timeout.ms}" /> 9 <entry key="key.deserializer" 10 value="org.apache.kafka.common.serialization.StringDeserializer" /> 11 <entry key="value.deserializer" 12 value="org.apache.kafka.common.serialization.StringDeserializer" /> 13 </map> 14 </constructor-arg> 15 </bean> 16 17 <!-- 2.建立consumerFactory bean --> 18 <bean id="consumerFactory" 19 class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" > 20 <constructor-arg> 21 <ref bean="consumerProperties" /> 22 </constructor-arg> 23 </bean> 24 25 <!-- 3.定義消費實現類 --> 26 <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" /> 27 28 <!-- 4.消費者容器配置信息 --> 29 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 30 <!-- topic --> 31 <constructor-arg name="topics"> 32 <list> 33 <value>${kafka.consumer.topic.credit.for.lease}</value> 34 <value>${loan.application.feedback.topic}</value> 35 <value>${templar.agreement.feedback.topic}</value> 36 <value>${templar.aggrement.active.feedback.topic}</value> 37 <value>${templar.aggrement.agreementRepaid.topic}</value> 38 <value>${templar.aggrement.agreementWithhold.topic}</value> 39 <value>${templar.aggrement.agreementRepayRemind.topic}</value> 40 </list> 41 </constructor-arg> 42 <property name="messageListener" ref="kafkaConsumerService" /> 43 </bean> 44 <!-- 5.消費者併發消息監聽容器,執行doStart()方法 --> 45 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > 46 <constructor-arg ref="consumerFactory" /> 47 <constructor-arg ref="containerProperties" /> 48 <property name="concurrency" value="${concurrency}" /> 49 </bean>
分爲5個步驟:
consumerProperties ,就是個map<key,value>
DefaultKafkaConsumerFactory 實現了ConsumerFactory接口,提供建立消費者和判斷是否自動提交2個方法。經過consumerProperties做爲參數構造。
1 public interface ConsumerFactory<K, V> { 2 3 Consumer<K, V> createConsumer(); 4 5 boolean isAutoCommit(); 6 7 8 }
自定義一個類實現MessageListener接口,接口設計以下:
實現onMessage方法,去消費接收到的消息。兩種方案:
1)MessageListener 消費完消息後自動提交offset(enable.auto.commit=true時),可提升效率,存在消費失敗但移動了偏移量的風險。
2)AcknowledgingMessageListener 消費完消息後手動提交offset(enable.auto.commit=false時)效率下降,無消費失敗但移動偏移量的風險。
ContainerProperties:包含了一個監聽容器的運行時配置信息,主要定義了監聽的主題、分區、初始化偏移量,還有消息監聽器。
1 public class ContainerProperties { 2 3 private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000; 4 5 private static final int DEFAULT_QUEUE_DEPTH = 1; 6 7 private static final int DEFAULT_PAUSE_AFTER = 10000; 8 9 /** 10 * Topic names.監聽的主題字符串數組 11 */ 12 private final String[] topics; 13 14 /** 15 * Topic pattern.監聽的主題模板 16 */ 17 private final Pattern topicPattern; 18 19 /** 20 * Topics/partitions/initial offsets. 21 */ 22 private final TopicPartitionInitialOffset[] topicPartitions; 23 24 /** 25 * 確認模式(自動確認屬性爲false時使用) 26 * <ul> 27 * <li>1.RECORD逐條確認: 每條消息被髮送給監聽者後確認</li> 28 * <li>2.BATCH批量確認: 當批量消息記錄被消費者接收到並傳送給監聽器時確認</li> 30 * <li>3.TIME超時確認:當超過設置的超時時間毫秒數時確認(should be greater than 31 * {@code #setPollTimeout(long) pollTimeout}.</li> 32 * <li>4.COUNT計數確認: 當接收到指定數量以後確認</li> 33 * <li>5.MANUAL手動確認:由監聽器負責確認(AcknowledgingMessageListener)</ul> 36 */ 37 private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH; 38 39 /** 40 * The number of outstanding record count after which offsets should be 41 * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being 42 * used. 43 */ 44 private int ackCount; 45 46 /** 47 * The time (ms) after which outstanding offsets should be committed when 48 * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be 49 * larger than 50 */ 51 private long ackTime; 52 53 /** 54 * 消息監聽器,必須是 MessageListener或者AcknowledgingMessageListener二者中的一個 55 * 56 */ 57 private Object messageListener; 58 59 /** 60 * The max time to block in the consumer waiting for records. 61 */ 62 private volatile long pollTimeout = 1000; 63 64 /** 65 * 線程執行器:輪詢消費者 66 */ 67 private AsyncListenableTaskExecutor consumerTaskExecutor; 68 69 /** 70 * 線程執行器:調用監聽器 71 */ 72 private AsyncListenableTaskExecutor listenerTaskExecutor; 73 74 /** 75 * 錯誤回調,當監聽器拋出異常時 76 */ 77 private GenericErrorHandler<?> errorHandler; 78 79 /** 80 * When using Kafka group management and {@link #setPauseEnabled(boolean)} is 81 * true, the delay after which the consumer should be paused. Default 10000. 82 */ 83 private long pauseAfter = DEFAULT_PAUSE_AFTER; 84 85 /** 86 * When true, avoids rebalancing when this consumer is slow or throws a 87 * qualifying exception - pauses the consumer. Default: true. 88 * @see #pauseAfter 89 */ 90 private boolean pauseEnabled = true; 91 92 /** 93 * Set the queue depth for handoffs from the consumer thread to the listener 94 * thread. Default 1 (up to 2 in process). 95 */ 96 private int queueDepth = DEFAULT_QUEUE_DEPTH; 97 98 /** 99 * 中止容器超時時間 */ 103 private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT; 104 105 /** 106 * 用戶定義的消費者再平衡監聽器實現類 */ 108 private ConsumerRebalanceListener consumerRebalanceListener; 109 110 /** 111 * 提交回調,默認記錄日誌。 */ 114 private OffsetCommitCallback commitCallback; 115 116 /** 117 * Whether or not to call consumer.commitSync() or commitAsync() when the 118 * container is responsible for commits. Default true. See 119 * https://github.com/spring-projects/spring-kafka/issues/62 At the time of 120 * writing, async commits are not entirely reliable. 121 */ 122 private boolean syncCommits = true; 123 124 private boolean ackOnError = true; 125 126 private Long idleEventInterval; 127 128 public ContainerProperties(String... topics) { 129 Assert.notEmpty(topics, "An array of topicPartitions must be provided"); 130 this.topics = Arrays.asList(topics).toArray(new String[topics.length]); 131 this.topicPattern = null; 132 this.topicPartitions = null; 133 } 134 135 public ContainerProperties(Pattern topicPattern) { 136 this.topics = null; 137 this.topicPattern = topicPattern; 138 this.topicPartitions = null; 139 } 140 141 public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) { 142 this.topics = null; 143 this.topicPattern = null; 144 Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided"); 145 this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions)) 146 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]); 147 } 148 ...省略各類set、get 149 150 }
核心類ConcurrentMessageListenerContainer,繼承自抽象類AbstractMessageListenerContainer,類圖以下:
看上圖可知AbstractMessageListenerContainer有2個實現類分別對應單線程和多線程,建議採用多線程消費。下面分析一下主要ConcurrentMessageListenerContainer類,注意2個方法:
1.構造函數,入參:消費者工廠ConsumerFactory+容器配置ContainerProperties
2.doStart():核心方法KafkaMessageListenerContainer的start()方法。源碼以下:
1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> { 2 3 private final ConsumerFactory<K, V> consumerFactory; 4 5 private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>(); 6 7 private int concurrency = 1; 8 9 /** 10 * Construct an instance with the supplied configuration properties. 11 * The topic partitions are distributed evenly across the delegate 12 * {@link KafkaMessageListenerContainer}s. 13 * @param consumerFactory the consumer factory. 14 * @param containerProperties the container properties. 15 */ 16 public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, 17 ContainerProperties containerProperties) { 18 super(containerProperties); 19 Assert.notNull(consumerFactory, "A ConsumerFactory must be provided"); 20 this.consumerFactory = consumerFactory; 21 } 22 23 public int getConcurrency() { 24 return this.concurrency; 25 } 26 27 /** 28 * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running. 29 * Messages from within the same partition will be processed sequentially. 30 * @param concurrency the concurrency. 31 */ 32 public void setConcurrency(int concurrency) { 33 Assert.isTrue(concurrency > 0, "concurrency must be greater than 0"); 34 this.concurrency = concurrency; 35 } 36 37 /** 38 * Return the list of {@link KafkaMessageListenerContainer}s created by 39 * this container. 40 * @return the list of {@link KafkaMessageListenerContainer}s created by 41 * this container. 42 */ 43 public List<KafkaMessageListenerContainer<K, V>> getContainers() { 44 return Collections.unmodifiableList(this.containers); 45 } 46 47 /* 48 * Under lifecycle lock. 49 */ 50 @Override 51 protected void doStart() { 52 if (!isRunning()) { 53 ContainerProperties containerProperties = getContainerProperties(); 54 TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions(); 55 if (topicPartitions != null//校驗併發數>分區數,報錯。 56 && this.concurrency > topicPartitions.length) { 57 this.logger.warn("When specific partitions are provided, the concurrency must be less than or " 58 + "equal to the number of partitions; reduced from " + this.concurrency + " to " 59 + topicPartitions.length); 60 this.concurrency = topicPartitions.length;//併發數最大隻能=分區數 61 } 62 setRunning(true); 63 //遍歷建立監聽器容器 64 for (int i = 0; i < this.concurrency; i++) { 65 KafkaMessageListenerContainer<K, V> container; 66 if (topicPartitions == null) { 67 container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties); 68 } 69 else { 70 container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties, 71 partitionSubset(containerProperties, i)); 72 } 73 if (getBeanName() != null) { 74 container.setBeanName(getBeanName() + "-" + i); 75 } 76 if (getApplicationEventPublisher() != null) { 77 container.setApplicationEventPublisher(getApplicationEventPublisher()); 78 } 79 container.setClientIdSuffix("-" + i); 80 container.start();//核心方法,啓動容器 81 this.containers.add(container); 82 } 83 } 84 }146 ...省略 147 }
繼續追蹤,調用AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把鎖,用於鎖住生命週期。
1 private final Object lifecycleMonitor = new Object(); 2 3 @Override 4 public final void start() { 5 synchronized (this.lifecycleMonitor) { 6 Assert.isTrue( 7 this.containerProperties.getMessageListener() instanceof KafkaDataListener, 8 "A " + KafkaDataListener.class.getName() + " implementation must be provided"); 9 doStart(); 10 } 11 } 12 13 protected abstract void doStart();
最終調用的是KafkaMessageListenerContainer的doStart()
1 @Override 2 protected void doStart() { 3 if (isRunning()) { 4 return; 5 } 6 ContainerProperties containerProperties = getContainerProperties(); 7 8 if (!this.consumerFactory.isAutoCommit()) { 9 AckMode ackMode = containerProperties.getAckMode(); 10 if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) { 11 Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0"); 12 } 13 if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) 14 && containerProperties.getAckTime() == 0) { 15 containerProperties.setAckTime(5000); 16 } 17 } 18 19 Object messageListener = containerProperties.getMessageListener(); 20 Assert.state(messageListener != null, "A MessageListener is required"); 21 if (messageListener instanceof GenericAcknowledgingMessageListener) { 22 this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener; 23 } 24 else if (messageListener instanceof GenericMessageListener) { 25 this.listener = (GenericMessageListener<?>) messageListener; 26 } 27 else { 28 throw new IllegalStateException("messageListener must be 'MessageListener' " 29 + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName()); 30 } 31 if (containerProperties.getConsumerTaskExecutor() == null) { 32 SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor( 33 (getBeanName() == null ? "" : getBeanName()) + "-C-"); 34 containerProperties.setConsumerTaskExecutor(consumerExecutor); 35 } 36 if (containerProperties.getListenerTaskExecutor() == null) { 37 SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor( 38 (getBeanName() == null ? "" : getBeanName()) + "-L-"); 39 containerProperties.setListenerTaskExecutor(listenerExecutor); 40 }//1.構建 監聽消費者 41 this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener); 42 setRunning(true);
//2.異步提交 監聽消費者任務,返回Future並賦值。 43 this.listenerConsumerFuture = containerProperties 44 .getConsumerTaskExecutor() 45 .submitListenable(this.listenerConsumer); 46 }
doStart主要包含2個操做:構建內部類ListenerConsumer和提交 監聽消費者任務,返回Future並賦值。
ListenerConsumer類圖以下:
ListenerConsumer構造函數源碼以下:
1 @SuppressWarnings("unchecked") 2 ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) { 3 Assert.state(!this.isAnyManualAck || !this.autoCommit, 4 "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode()); 5 @SuppressWarnings("deprecation") 6 final Consumer<K, V> consumer = 7 KafkaMessageListenerContainer.this.consumerFactory instanceof 8 org.springframework.kafka.core.ClientIdSuffixAware 9 ? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer 10 .this.consumerFactory) 11 .createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix) 12 : KafkaMessageListenerContainer.this.consumerFactory.createConsumer(); 13 14 this.theListener = listener == null ? ackListener : listener; 15 ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer); 16 17 if (KafkaMessageListenerContainer.this.topicPartitions == null) { 18 if (this.containerProperties.getTopicPattern() != null) { 19 consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener); 20 } 21 else { 22 consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener); 23 } 24 } 25 else { 26 List<TopicPartitionInitialOffset> topicPartitions = 27 Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions); 28 this.definedPartitions = new HashMap<>(topicPartitions.size()); 29 for (TopicPartitionInitialOffset topicPartition : topicPartitions) { 30 this.definedPartitions.put(topicPartition.topicPartition(), 31 new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent())); 32 } 33 consumer.assign(new ArrayList<>(this.definedPartitions.keySet())); 34 } 35 this.consumer = consumer; 36 GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler(); 37 this.genericListener = listener;
//1.
if (this.theListener instanceof BatchAcknowledgingMessageListener) { 38 this.listener = null; 39 this.batchListener = null; 40 this.acknowledgingMessageListener = null; 41 this.batchAcknowledgingMessageListener = (BatchAcknowledgingMessageListener<K, V>) this.theListener; 42 this.isBatchListener = true; 43 }//2. 44 else if (this.theListener instanceof AcknowledgingMessageListener) { 45 this.listener = null; 46 this.acknowledgingMessageListener = (AcknowledgingMessageListener<K, V>) this.theListener; 47 this.batchListener = null; 48 this.batchAcknowledgingMessageListener = null; 49 this.isBatchListener = false; 50 }//3. 51 else if (this.theListener instanceof BatchMessageListener) { 52 this.listener = null; 53 this.batchListener = (BatchMessageListener<K, V>) this.theListener; 54 this.acknowledgingMessageListener = null; 55 this.batchAcknowledgingMessageListener = null; 56 this.isBatchListener = true; 57 }//4. 58 else if (this.theListener instanceof MessageListener) { 59 this.listener = (MessageListener<K, V>) this.theListener; 60 this.batchListener = null; 61 this.acknowledgingMessageListener = null; 62 this.batchAcknowledgingMessageListener = null; 63 this.isBatchListener = false; 64 } 65 else { 66 throw new IllegalArgumentException("Listener must be one of 'MessageListener', " 67 + "'BatchMessageListener', 'AcknowledgingMessageListener', " 68 + "'BatchAcknowledgingMessageListener', not " + this.theListener.getClass().getName()); 69 } 70 if (this.isBatchListener) { 71 validateErrorHandler(true); 72 this.errorHandler = new LoggingErrorHandler(); 73 this.batchErrorHandler = errHandler == null ? new BatchLoggingErrorHandler() 74 : (BatchErrorHandler) errHandler; 75 } 76 else { 77 validateErrorHandler(false); 78 this.errorHandler = errHandler == null ? new LoggingErrorHandler() : (ErrorHandler) errHandler; 79 this.batchErrorHandler = new BatchLoggingErrorHandler(); 80 } 81 Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener"); 82 }
1.定義消費者訂閱topic或者指定分區
2.設置監聽器,支持4種:
1)BatchAcknowledgingMessageListener批量需確認消息監聽器
2)AcknowledgingMessageListener需確認消息監聽器
3)BatchMessageListener批量消息監聽器
4)MessageListener消息監聽器(用的最多,一次消費一條)
這裏咱們看一下任務Runnable接口的run方法,分兩種狀況
1.若是自定義了分區,不必再平衡分配分區了,直接回調
2.未指定分區,進入自旋消費
1 @Override 2 public void run() { 3 if (this.genericListener instanceof ConsumerSeekAware) { 4 ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this); 5 } 6 this.count = 0; 7 this.last = System.currentTimeMillis(); 8 if (isRunning() && this.definedPartitions != null) {// 1.若是運行中且自定義了分區,不必再平衡分配分區了,直接回調 9 initPartitionsIfNeeded();// 有須要就初始化分區 10 // 回調 13 if (!this.autoCommit) { 14 startInvoker(); 15 } 16 } 17 long lastReceive = System.currentTimeMillis(); 18 long lastAlertAt = lastReceive; 19 while (isRunning()) {//2.未指定分區,進入自旋消費 20 try { 21 if (!this.autoCommit) { 22 processCommits();// 若是手動提交,處理提交 23 } 24 processSeeks();// 從新定位偏移量,下一次消費時使用 25 if (this.logger.isTraceEnabled()) { 26 this.logger.trace("Polling (paused=" + this.paused + ")..."); 27 }// 1)拉取消費記錄 28 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout()); 29 if (records != null && this.logger.isDebugEnabled()) { 30 this.logger.debug("Received: " + records.count() + " records"); 31 } 32 if (records != null && records.count() > 0) { 33 if (this.containerProperties.getIdleEventInterval() != null) { 34 lastReceive = System.currentTimeMillis(); 35 }// 2)若是設置了自動提交,直接在當前線程執行 39 if (this.autoCommit) { 40 invokeListener(records); 41 } 42 else {// 3)不然發送消息進緩存隊列 43 if (sendToListener(records)) { 44 if (this.assignedPartitions != null) { 45 // avoid group management rebalance due to a slow 46 // consumer 47 this.consumer.pause(this.assignedPartitions); 48 this.paused = true; 49 this.unsent = records; 50 } 51 } 52 } 53 } 54 else { 55 if (this.containerProperties.getIdleEventInterval() != null) { 56 long now = System.currentTimeMillis(); 57 if (now > lastReceive + this.containerProperties.getIdleEventInterval() 58 && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) { 59 publishIdleContainerEvent(now - lastReceive); 60 lastAlertAt = now; 61 if (this.genericListener instanceof ConsumerSeekAware) { 62 seekPartitions(getAssignedPartitions(), true); 63 } 64 } 65 } 66 } 67 this.unsent = checkPause(this.unsent); 68 } 69 catch (WakeupException e) { 70 this.unsent = checkPause(this.unsent); 71 } 72 catch (Exception e) { 73 if (this.containerProperties.getGenericErrorHandler() != null) { 74 this.containerProperties.getGenericErrorHandler().handle(e, null); 75 } 76 else { 77 this.logger.error("Container exception", e); 78 } 79 } 80 } 81 if (this.listenerInvokerFuture != null) { 82 stopInvoker(); 83 commitManualAcks(); 84 } 85 try { 86 this.consumer.unsubscribe(); 87 } 88 catch (WakeupException e) { 89 // No-op. Continue process 90 } 91 this.consumer.close(); 92 if (this.logger.isInfoEnabled()) { 93 this.logger.info("Consumer stopped"); 94 } 95 }
1.若是用戶自定義了分區且非自動提交,那麼開啓異步線程執行ListenerInvoker任務,源碼以下:
1 private void startInvoker() { 2 ListenerConsumer.this.invoker = new ListenerInvoker(); 3 ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor() 4 .submit(ListenerConsumer.this.invoker); 5 }
執行ListenerInvoker的run方法,實際上就執行一遍,由於CountDownLatch初始化爲1
1 private final class ListenerInvoker implements SchedulingAwareRunnable { 2 3 private final CountDownLatch exitLatch = new CountDownLatch(1); 4 5 private volatile boolean active = true; 6 7 private volatile Thread executingThread; 8 9 ListenerInvoker() { 10 super(); 11 } 12 13 @Override 14 public void run() { 15 Assert.isTrue(this.active, "This instance is not active anymore"); 16 if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) { 17 ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this); 18 } 19 try { 20 this.executingThread = Thread.currentThread(); 21 while (this.active) { 22 try {// 從阻塞隊列LinkedBlockingQueue recordsToProcess中拉取 待消費記錄 23 ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1, 24 TimeUnit.SECONDS); 25 if (this.active) { 26 if (records != null) { 27 invokeListener(records);// 消費 28 } 29 else { 30 if (ListenerConsumer.this.logger.isTraceEnabled()) { 31 ListenerConsumer.this.logger.trace("No records to process"); 32 } 33 } 34 } 35 } 36 catch (InterruptedException e) { 37 if (!this.active) { 38 Thread.currentThread().interrupt(); 39 } 40 else { 41 ListenerConsumer.this.logger.debug("Interrupt ignored"); 42 } 43 } 44 } 45 } 46 finally { 47 this.active = false; 48 this.exitLatch.countDown(); 49 } 50 } 51 52 @Override 53 public boolean isLongLived() { 54 return true; 55 } 581 }
1 private void invokeListener(final ConsumerRecords<K, V> records) { 2 if (this.isBatchListener) { 3 invokeBatchListener(records); 4 } 5 else { 6 invokeRecordListener(records); 7 } 8 }
如上圖,從阻塞隊列中取得待消費記錄,用迭代器iterator消費,根據自定義消費類型,用不一樣listener來執行onMessage方法(用戶自定義MessageListener接口的onMessage方法,實現用戶本身的消費業務邏輯)
1 private void invokeRecordListener(final ConsumerRecords<K, V> records) { 2 Iterator<ConsumerRecord<K, V>> iterator = records.iterator(); 3 while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) { 4 final ConsumerRecord<K, V> record = iterator.next(); 5 if (this.logger.isTraceEnabled()) { 6 this.logger.trace("Processing " + record); 7 } 8 try { 9 if (this.acknowledgingMessageListener != null) { 10 this.acknowledgingMessageListener.onMessage(record,// 終極核心方法,用戶自定義的MessageListener接口的onMessage方法 11 this.isAnyManualAck 12 ? new ConsumerAcknowledgment(record, this.isManualImmediateAck) 13 : null); 14 } 15 else { 16 this.listener.onMessage(record);// 終極核心方法,用戶自定義的MessageListener接口的onMessage方法 17 } 18 if (!this.isAnyManualAck && !this.autoCommit) { 19 this.acks.add(record); 20 } 21 } 22 catch (Exception e) { 23 if (this.containerProperties.isAckOnError() && !this.autoCommit) { 24 this.acks.add(record); 25 } 26 try { 27 this.errorHandler.handle(e, record); 28 } 29 catch (Exception ee) { 30 this.logger.error("Error handler threw an exception", ee); 31 } 32 catch (Error er) { //NOSONAR 33 this.logger.error("Error handler threw an error", er); 34 throw er; 35 } 36 } 37 } 38 }
2.未指定分區,進入自旋
// 1)拉取消費記錄 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
2)若是設置了自動提交,直接在當前線程執行
invokeListener(records);
// 3)不然發送消息進緩存隊列 sendToListener(records)
1)在每一個輪詢中,消費者將嘗試使用最後一個被使用的偏移量做爲起始偏移量,並按順序提取。最後一個被消費的偏移量能夠經過 seek(TopicPartition,long)或自動設置爲最後一個被訂閱的分區列表的偏移量得到。
1 @Override 2 public ConsumerRecords<K, V> poll(long timeout) { 3 acquire(); 4 try { 5 if (timeout < 0) 6 throw new IllegalArgumentException("Timeout must not be negative"); 7 8 if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) 9 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); 10 11 // poll for new data until the timeout expires 12 long start = time.milliseconds(); 13 long remaining = timeout; 14 do { 15 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining); 16 if (!records.isEmpty()) { 23 fetcher.sendFetches();// 在返回所獲取的記錄以前,咱們能夠發送下一輪的fetches並避免阻塞等待它們的響應,以便在用戶處理獲取的記錄時進行流水線操做。 24 client.pollNoWakeup();//因爲已經更新了所使用的位置,因此咱們不容許在返回所獲取的記錄以前觸發wakeups或任何其餘錯誤。 25 26 if (this.interceptors == null) 27 return new ConsumerRecords<>(records); 28 else// 若是存在消費者攔截器執行攔截 29 return this.interceptors.onConsume(new ConsumerRecords<>(records)); 30 } 31 32 long elapsed = time.milliseconds() - start; 33 remaining = timeout - elapsed; 34 } while (remaining > 0); 35 36 return ConsumerRecords.empty(); 37 } finally { 38 release(); 39 } 40 }
pollOnce:
1 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) { 2 coordinator.poll(time.milliseconds()); 3 4 // ,若是有未知偏移量(分區的),那麼更新。涉及coordinator刷新已提交分區偏移量+fetcher更新獲取位置 6 if (!subscriptions.hasAllFetchPositions()) 7 updateFetchPositions(this.subscriptions.missingFetchPositions()); 8 9 // 返回已獲取到的記錄 10 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords(); 11 if (!records.isEmpty()) 12 return records; 13 14 // 發送fetch請求 15 fetcher.sendFetches(); 16 17 long now = time.milliseconds(); 18 long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout); 19 // 執行IO,拉取數據 20 client.poll(pollTimeout, now, new PollCondition() { 21 @Override 22 public boolean shouldBlock() { 23 // since a fetch might be completed by the background thread, we need this poll condition 24 // to ensure that we do not block unnecessarily in poll() 25 return !fetcher.hasCompletedFetches(); 26 } 27 }); 31 if (coordinator.needRejoin()) 32 return Collections.emptyMap(); 33 34 return fetcher.fetchedRecords(); 35 }遍歷全部的TopicPartition
好吧,再往下涉及到通訊IO層了,這裏再也不多說。未來補全了kafka通訊協議相關文章後再加上飛機票。
2)invokeListener和分支1同樣,最終調用的是用戶自定義的MessageListener接口的onMessage方法,再也不重複。
3) sendToListener,這裏塞進緩存隊列LinkedBlockingQueue<ConsumerRecords<K, V>> recordsToProcess,塞進隊列後,什麼時候再消費?ListenerInvoker的run方法執行了recordsToProcess.poll進行了消費,