kafka原理和實踐(四)spring-kafka消費者源碼

系列目錄html

kafka原理和實踐(一)原理:10分鐘入門java

kafka原理和實踐(二)spring-kafka簡單實踐git

kafka原理和實踐(三)spring-kafka生產者源碼github

kafka原理和實踐(四)spring-kafka消費者源碼spring

kafka原理和實踐(五)spring-kafka配置詳解apache

kafka原理和實踐(六)總結昇華bootstrap

 

 

==============正文分割線=====================數組

1、kafkaConsumer消費者模型

如上圖所示,spring-kafka消費者模型主要流程:緩存

1.容器啓動,輪詢執行消費。安全

2.kafkaConsumer拉取消息流程:

1)Fetcher請求獲取器獲取請求並存儲在unset中

2)ConsumerNetworkClient網絡客戶端執行poll(),調用NetWlrikClient的send()方法從unset中獲取ClientRequest請求轉成RequestSend最終塞進Selector的KafkaChannel通道中,Seletcor.send()從kafka集羣拉取待消費數據ConsumerRecords

3. 消費者監聽器MessageListener.onMessage()執行用戶自定義的實際消費業務邏輯。

1、kafkaConsumer構造

  1 @SuppressWarnings("unchecked")
  2     private KafkaConsumer(ConsumerConfig config,
  3                           Deserializer<K> keyDeserializer,
  4                           Deserializer<V> valueDeserializer) {
  5         try {
  6             log.debug("Starting the Kafka consumer");
  7             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
  8             int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
  9             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
 10             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
 11                 throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
 12             this.time = new SystemTime();
 13 
 14             String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
 15             if (clientId.length() <= 0)
 16                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
 17             this.clientId = clientId;
 18             Map<String, String> metricsTags = new LinkedHashMap<>();
 19             metricsTags.put("client-id", clientId);
 20             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
 21                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
 22                     .tags(metricsTags);
 23             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
 24                     MetricsReporter.class);
 25             reporters.add(new JmxReporter(JMX_PREFIX));
 26             this.metrics = new Metrics(metricConfig, reporters, time);
 27             this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 28 
 29             // load interceptors and make sure they get clientId
 30             Map<String, Object> userProvidedConfigs = config.originals();
 31             userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
 32             List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
 33                     ConsumerInterceptor.class);
 34             this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
 35             if (keyDeserializer == null) {
 36                 this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 37                         Deserializer.class);
 38                 this.keyDeserializer.configure(config.originals(), true);
 39             } else {
 40                 config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 41                 this.keyDeserializer = keyDeserializer;
 42             }
 43             if (valueDeserializer == null) {
 44                 this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 45                         Deserializer.class);
 46                 this.valueDeserializer.configure(config.originals(), false);
 47             } else {
 48                 config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
 49                 this.valueDeserializer = valueDeserializer;
 50             }
 51             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
 52             this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
 53             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
 54             this.metadata.update(Cluster.bootstrap(addresses), 0);
 55             String metricGrpPrefix = "consumer";
 56             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
 57             NetworkClient netClient = new NetworkClient(
 58                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
 59                     this.metadata,
 60                     clientId,
 61                     100, // a fixed large enough value will suffice
 62                     config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
 63                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
 64                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
 65                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
 66             this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
 67                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
 68             OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
 69             this.subscriptions = new SubscriptionState(offsetResetStrategy);
 70             List<PartitionAssignor> assignors = config.getConfiguredInstances(
 71                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 72                     PartitionAssignor.class);
 73             this.coordinator = new ConsumerCoordinator(this.client,
 74                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
 75                     config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
 76                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
 77                     config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
 78                     assignors,
 79                     this.metadata,
 80                     this.subscriptions,
 81                     metrics,
 82                     metricGrpPrefix,
 83                     this.time,
 84                     retryBackoffMs,
 85                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
 86                     config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 87                     config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
 88                     this.interceptors,
 89                     config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
 90             this.fetcher = new Fetcher<>(this.client,
 91                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
 92                     config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
 93                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
 94                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
 95                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
 96                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
 97                     this.keyDeserializer,
 98                     this.valueDeserializer,
 99                     this.metadata,
100                     this.subscriptions,
101                     metrics,
102                     metricGrpPrefix,
103                     this.time,
104                     this.retryBackoffMs);
105 
106             config.logUnused();
107             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
108 
109             log.debug("Kafka consumer created");
110         } catch (Throwable t) {
111             // call close methods if internal objects are already constructed
112             // this is to prevent resource leak. see KAFKA-2121
113             close(true);
114             // now propagate the exception
115             throw new KafkaException("Failed to construct kafka consumer", t);
116         }
117     }

 從KafkaConsumer構造函數來看,核心組件有:

1.Metadata:封裝了元數據的一些邏輯的類。元數據僅保留一個主題的子集,隨着時間的推移能夠添加。當咱們請求一個主題的元數據時,咱們沒有任何元數據會觸發元數據更新。若是對元數據啓用了主題過時,那麼在更新以後,在過時時間間隔內未使用的任何主題都將從元數據刷新集中刪除。

2.ConsumerNetworkClient:高等級消費者訪問網絡層,爲請求Future任務提供基本支持。這個類是線程安全的,可是不提供響應回調的同步。這保證在調用它們時不會持有鎖。

3.SubscriptionState:訂閱的TopicPartition的offset狀態維護

4.ConsumerCoordinator:消費者的協調者,負責partitiion的分配,reblance

5.Fetcher:從brokers上按照配置獲取消息。

2、消費者容器啓動流程

kafka消費者有兩種常見的實現方式:

1.xml配置文件

2.基於註解實現

其實,無論哪一種方式,本質只是生成Spring Bean的方式不一樣而已。咱們就以xml的實現方式來追蹤源碼。

基於xml的整體配置以下:

 1 <!-- 1.定義consumer的參數 -->
 2 <bean id="consumerProperties" class="java.util.HashMap">  3 <constructor-arg>  4 <map>  5 <entry key="bootstrap.servers" value="${bootstrap.servers}" />  6 <entry key="group.id" value="${group.id}" />  7 <entry key="enable.auto.commit" value="${enable.auto.commit}" />  8 <entry key="session.timeout.ms" value="${session.timeout.ms}" />  9 <entry key="key.deserializer" 10 value="org.apache.kafka.common.serialization.StringDeserializer" /> 11 <entry key="value.deserializer" 12 value="org.apache.kafka.common.serialization.StringDeserializer" /> 13 </map> 14 </constructor-arg> 15 </bean> 16 17 <!-- 2.建立consumerFactory bean --> 18 <bean id="consumerFactory" 19 class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" > 20 <constructor-arg> 21 <ref bean="consumerProperties" /> 22 </constructor-arg> 23 </bean> 24 25 <!-- 3.定義消費實現類 --> 26 <bean id="kafkaConsumerService" class="xxx.service.impl.KafkaConsumerSerivceImpl" /> 27 28 <!-- 4.消費者容器配置信息 --> 29 <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> 30 <!-- topic --> 31 <constructor-arg name="topics"> 32 <list> 33 <value>${kafka.consumer.topic.credit.for.lease}</value> 34 <value>${loan.application.feedback.topic}</value> 35 <value>${templar.agreement.feedback.topic}</value> 36 <value>${templar.aggrement.active.feedback.topic}</value> 37 <value>${templar.aggrement.agreementRepaid.topic}</value> 38 <value>${templar.aggrement.agreementWithhold.topic}</value> 39 <value>${templar.aggrement.agreementRepayRemind.topic}</value> 40 </list> 41 </constructor-arg> 42 <property name="messageListener" ref="kafkaConsumerService" /> 43 </bean> 44 <!-- 5.消費者併發消息監聽容器,執行doStart()方法 --> 45 <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" > 46 <constructor-arg ref="consumerFactory" /> 47 <constructor-arg ref="containerProperties" /> 48 <property name="concurrency" value="${concurrency}" /> 49 </bean>

 分爲5個步驟:

 2.1.定義消費參數bean

consumerProperties ,就是個map<key,value>

2.2.建立consumerFactory bean

DefaultKafkaConsumerFactory 實現了ConsumerFactory接口,提供建立消費者判斷是否自動提交2個方法。經過consumerProperties做爲參數構造。
1 public interface ConsumerFactory<K, V> {
2 
3     Consumer<K, V> createConsumer();
4 
5     boolean isAutoCommit();
6 
7 
8 }

2.3.定義消費實現類

自定義一個類實現MessageListener接口,接口設計以下:

實現onMessage方法,去消費接收到的消息。兩種方案:

1)MessageListener 消費完消息後自動提交offset(enable.auto.commit=true時),可提升效率,存在消費失敗但移動了偏移量的風險。

2)AcknowledgingMessageListener 消費完消息後手動提交offset(enable.auto.commit=false時)效率下降,無消費失敗但移動偏移量的風險。

2.4.監聽容器配置信息

ContainerProperties:包含了一個監聽容器的運行時配置信息,主要定義了監聽的主題、分區、初始化偏移量,還有消息監聽器。
  1 public class ContainerProperties {
  2 
  3     private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
  4 
  5     private static final int DEFAULT_QUEUE_DEPTH = 1;
  6 
  7     private static final int DEFAULT_PAUSE_AFTER = 10000;
  8 
  9     /**
 10      * Topic names.監聽的主題字符串數組
 11      */
 12     private final String[] topics;
 13 
 14     /**
 15      * Topic pattern.監聽的主題模板
 16      */
 17     private final Pattern topicPattern;
 18 
 19     /**
 20      * Topics/partitions/initial offsets.
 21      */
 22     private final TopicPartitionInitialOffset[] topicPartitions;
 23 
 24     /**
 25      * 確認模式(自動確認屬性爲false時使用)
 26      * <ul>
 27      * <li>1.RECORD逐條確認: 每條消息被髮送給監聽者後確認</li>
 28      * <li>2.BATCH批量確認: 當批量消息記錄被消費者接收到並傳送給監聽器時確認</li>
 30      * <li>3.TIME超時確認:當超過設置的超時時間毫秒數時確認(should be greater than
 31      * {@code #setPollTimeout(long) pollTimeout}.</li>
 32      * <li>4.COUNT計數確認: 當接收到指定數量以後確認</li>
 33      * <li>5.MANUAL手動確認:由監聽器負責確認(AcknowledgingMessageListener</ul>
 36      */
 37     private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
 38 
 39     /**
 40      * The number of outstanding record count after which offsets should be
 41      * committed when {@link AckMode#COUNT} or {@link AckMode#COUNT_TIME} is being
 42      * used.
 43      */
 44     private int ackCount;
 45 
 46     /**
 47      * The time (ms) after which outstanding offsets should be committed when
 48      * {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
 49      * larger than
 50      */
 51     private long ackTime;
 52 
 53     /**
 54      * 消息監聽器,必須是 MessageListener或者AcknowledgingMessageListener二者中的一個 55      * 56      */
 57     private Object messageListener;
 58 
 59     /**
 60      * The max time to block in the consumer waiting for records.
 61      */
 62     private volatile long pollTimeout = 1000;
 63 
 64     /**
 65      * 線程執行器:輪詢消費者
 66      */
 67     private AsyncListenableTaskExecutor consumerTaskExecutor;
 68 
 69     /**
 70      * 線程執行器:調用監聽器
 71      */
 72     private AsyncListenableTaskExecutor listenerTaskExecutor;
 73 
 74     /**
 75      * 錯誤回調,當監聽器拋出異常時
 76      */
 77     private GenericErrorHandler<?> errorHandler;
 78 
 79     /**
 80      * When using Kafka group management and {@link #setPauseEnabled(boolean)} is
 81      * true, the delay after which the consumer should be paused. Default 10000.
 82      */
 83     private long pauseAfter = DEFAULT_PAUSE_AFTER;
 84 
 85     /**
 86      * When true, avoids rebalancing when this consumer is slow or throws a
 87      * qualifying exception - pauses the consumer. Default: true.
 88      * @see #pauseAfter
 89      */
 90     private boolean pauseEnabled = true;
 91 
 92     /**
 93      * Set the queue depth for handoffs from the consumer thread to the listener
 94      * thread. Default 1 (up to 2 in process).
 95      */
 96     private int queueDepth = DEFAULT_QUEUE_DEPTH;
 97 
 98     /**
 99      * 中止容器超時時間    */
103     private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
104 
105     /**
106      * 用戶定義的消費者再平衡監聽器實現類     */
108     private ConsumerRebalanceListener consumerRebalanceListener;
109 
110     /**
111      * 提交回調,默認記錄日誌。      */
114     private OffsetCommitCallback commitCallback;
115 
116     /**
117      * Whether or not to call consumer.commitSync() or commitAsync() when the
118      * container is responsible for commits. Default true. See
119      * https://github.com/spring-projects/spring-kafka/issues/62 At the time of
120      * writing, async commits are not entirely reliable.
121      */
122     private boolean syncCommits = true;
123 
124     private boolean ackOnError = true;
125 
126     private Long idleEventInterval;
127 
128     public ContainerProperties(String... topics) {
129         Assert.notEmpty(topics, "An array of topicPartitions must be provided");
130         this.topics = Arrays.asList(topics).toArray(new String[topics.length]);
131         this.topicPattern = null;
132         this.topicPartitions = null;
133     }
134 
135     public ContainerProperties(Pattern topicPattern) {
136         this.topics = null;
137         this.topicPattern = topicPattern;
138         this.topicPartitions = null;
139     }
140 
141     public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {
142         this.topics = null;
143         this.topicPattern = null;
144         Assert.notEmpty(topicPartitions, "An array of topicPartitions must be provided");
145         this.topicPartitions = new LinkedHashSet<>(Arrays.asList(topicPartitions))
146                 .toArray(new TopicPartitionInitialOffset[topicPartitions.length]);
147     }
148 ...省略各類set、get
149 
150 }

 

2.5.啓動併發消息監聽容器

核心類ConcurrentMessageListenerContainer,繼承自抽象類AbstractMessageListenerContainer,類圖以下:

 

看上圖可知AbstractMessageListenerContainer有2個實現類分別對應單線程和多線程,建議採用多線程消費。下面分析一下主要ConcurrentMessageListenerContainer類,注意2個方法:

1.構造函數,入參:消費者工廠ConsumerFactory+容器配置ContainerProperties

2.doStart():核心方法KafkaMessageListenerContainer的start()方法。源碼以下:

  1 public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
  2 
  3     private final ConsumerFactory<K, V> consumerFactory;
  4 
  5     private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
  6 
  7     private int concurrency = 1;
  8 
  9     /**
 10      * Construct an instance with the supplied configuration properties.
 11      * The topic partitions are distributed evenly across the delegate
 12      * {@link KafkaMessageListenerContainer}s.
 13      * @param consumerFactory the consumer factory.
 14      * @param containerProperties the container properties.
 15      */
 16     public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
 17             ContainerProperties containerProperties) {
 18         super(containerProperties);
 19         Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
 20         this.consumerFactory = consumerFactory;
 21     }
 22 
 23     public int getConcurrency() {
 24         return this.concurrency;
 25     }
 26 
 27     /**
 28      * The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
 29      * Messages from within the same partition will be processed sequentially.
 30      * @param concurrency the concurrency.
 31      */
 32     public void setConcurrency(int concurrency) {
 33         Assert.isTrue(concurrency > 0, "concurrency must be greater than 0");
 34         this.concurrency = concurrency;
 35     }
 36 
 37     /**
 38      * Return the list of {@link KafkaMessageListenerContainer}s created by
 39      * this container.
 40      * @return the list of {@link KafkaMessageListenerContainer}s created by
 41      * this container.
 42      */
 43     public List<KafkaMessageListenerContainer<K, V>> getContainers() {
 44         return Collections.unmodifiableList(this.containers);
 45     }
 46 
 47     /*
 48      * Under lifecycle lock.
 49      */
 50     @Override
 51     protected void doStart() {
 52         if (!isRunning()) {
 53             ContainerProperties containerProperties = getContainerProperties();
 54             TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
 55             if (topicPartitions != null//校驗併發數>分區數,報錯。
 56                     && this.concurrency > topicPartitions.length) {
 57                 this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
 58                         + "equal to the number of partitions; reduced from " + this.concurrency + " to "
 59                         + topicPartitions.length);
 60                 this.concurrency = topicPartitions.length;//併發數最大隻能=分區數
 61             }
 62             setRunning(true);
 63             //遍歷建立監聽器容器
 64             for (int i = 0; i < this.concurrency; i++) {
 65                 KafkaMessageListenerContainer<K, V> container;
 66                 if (topicPartitions == null) {
 67                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);
 68                 }
 69                 else {
 70                     container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,
 71                             partitionSubset(containerProperties, i));
 72                 }
 73                 if (getBeanName() != null) {
 74                     container.setBeanName(getBeanName() + "-" + i);
 75                 }
 76                 if (getApplicationEventPublisher() != null) {
 77                     container.setApplicationEventPublisher(getApplicationEventPublisher());
 78                 }
 79                 container.setClientIdSuffix("-" + i);
 80                 container.start();//核心方法,啓動容器
 81                 this.containers.add(container);
 82             }
 83         }
 84     }146 ...省略
147 }

 繼續追蹤,調用AbstractMessageListenerContainer的doStart(),值得注意的是start()和stop方法加了同一把鎖,用於鎖住生命週期。

 1 private final Object lifecycleMonitor = new Object();
 2 
 3 @Override
 4     public final void start() {
 5         synchronized (this.lifecycleMonitor) {
 6             Assert.isTrue(
 7                     this.containerProperties.getMessageListener() instanceof KafkaDataListener,
 8                     "A " + KafkaDataListener.class.getName() + " implementation must be provided");
 9  doStart();
10         }
11     }
12 
13     protected abstract void doStart();

最終調用的是KafkaMessageListenerContainer的doStart()

 1 @Override
 2     protected void doStart() {
 3         if (isRunning()) {
 4             return;
 5         }
 6         ContainerProperties containerProperties = getContainerProperties();
 7 
 8         if (!this.consumerFactory.isAutoCommit()) {
 9             AckMode ackMode = containerProperties.getAckMode();
10             if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
11                 Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
12             }
13             if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
14                     && containerProperties.getAckTime() == 0) {
15                 containerProperties.setAckTime(5000);
16             }
17         }
18 
19         Object messageListener = containerProperties.getMessageListener();
20         Assert.state(messageListener != null, "A MessageListener is required");
21         if (messageListener instanceof GenericAcknowledgingMessageListener) {
22             this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;
23         }
24         else if (messageListener instanceof GenericMessageListener) {
25             this.listener = (GenericMessageListener<?>) messageListener;
26         }
27         else {
28             throw new IllegalStateException("messageListener must be 'MessageListener' "
29                     + "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());
30         }
31         if (containerProperties.getConsumerTaskExecutor() == null) {
32             SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
33                     (getBeanName() == null ? "" : getBeanName()) + "-C-");
34             containerProperties.setConsumerTaskExecutor(consumerExecutor);
35         }
36         if (containerProperties.getListenerTaskExecutor() == null) {
37             SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor(
38                     (getBeanName() == null ? "" : getBeanName()) + "-L-");
39             containerProperties.setListenerTaskExecutor(listenerExecutor);
40         }//1.構建 監聽消費者
41         this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);
42         setRunning(true);
      //2.異步提交 監聽消費者任務,返回Future並賦值。
43 this.listenerConsumerFuture = containerProperties 44 .getConsumerTaskExecutor() 45 .submitListenable(this.listenerConsumer); 46 }

 

doStart主要包含2個操做:構建內部類ListenerConsumer提交 監聽消費者任務,返回Future並賦值。

1.構建內部類ListenerConsumer

ListenerConsumer類圖以下:

ListenerConsumer構造函數源碼以下:

 1 @SuppressWarnings("unchecked")
 2         ListenerConsumer(GenericMessageListener<?> listener, GenericAcknowledgingMessageListener<?> ackListener) {
 3             Assert.state(!this.isAnyManualAck || !this.autoCommit,
 4                     "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
 5             @SuppressWarnings("deprecation")
 6             final Consumer<K, V> consumer =
 7                     KafkaMessageListenerContainer.this.consumerFactory instanceof
 8                                     org.springframework.kafka.core.ClientIdSuffixAware
 9                             ? ((org.springframework.kafka.core.ClientIdSuffixAware<K, V>) KafkaMessageListenerContainer
10                                     .this.consumerFactory)
11                                         .createConsumer(KafkaMessageListenerContainer.this.clientIdSuffix)
12                             : KafkaMessageListenerContainer.this.consumerFactory.createConsumer();
13 
14             this.theListener = listener == null ? ackListener : listener;
15             ConsumerRebalanceListener rebalanceListener = createRebalanceListener(consumer);
16 
17             if (KafkaMessageListenerContainer.this.topicPartitions == null) {
18                 if (this.containerProperties.getTopicPattern() != null) {
19                     consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
20                 }
21                 else {
22                     consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
23                 }
24             }
25             else {
26                 List<TopicPartitionInitialOffset> topicPartitions =
27                         Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
28                 this.definedPartitions = new HashMap<>(topicPartitions.size());
29                 for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
30                     this.definedPartitions.put(topicPartition.topicPartition(),
31                             new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent()));
32                 }
33                 consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
34             }
35             this.consumer = consumer;
36             GenericErrorHandler<?> errHandler = this.containerProperties.getGenericErrorHandler();
37             this.genericListener = listener; 
//1.
if (this.theListener instanceof BatchAcknowledgingMessageListener) { 38 this.listener = null; 39 this.batchListener = null; 40 this.acknowledgingMessageListener = null; 41 this.batchAcknowledgingMessageListener = (BatchAcknowledgingMessageListener<K, V>) this.theListener; 42 this.isBatchListener = true; 43 }//2. 44 else if (this.theListener instanceof AcknowledgingMessageListener) { 45 this.listener = null; 46 this.acknowledgingMessageListener = (AcknowledgingMessageListener<K, V>) this.theListener; 47 this.batchListener = null; 48 this.batchAcknowledgingMessageListener = null; 49 this.isBatchListener = false; 50 }//3. 51 else if (this.theListener instanceof BatchMessageListener) { 52 this.listener = null; 53 this.batchListener = (BatchMessageListener<K, V>) this.theListener; 54 this.acknowledgingMessageListener = null; 55 this.batchAcknowledgingMessageListener = null; 56 this.isBatchListener = true; 57 }//4. 58 else if (this.theListener instanceof MessageListener) { 59 this.listener = (MessageListener<K, V>) this.theListener; 60 this.batchListener = null; 61 this.acknowledgingMessageListener = null; 62 this.batchAcknowledgingMessageListener = null; 63 this.isBatchListener = false; 64 } 65 else { 66 throw new IllegalArgumentException("Listener must be one of 'MessageListener', " 67 + "'BatchMessageListener', 'AcknowledgingMessageListener', " 68 + "'BatchAcknowledgingMessageListener', not " + this.theListener.getClass().getName()); 69 } 70 if (this.isBatchListener) { 71 validateErrorHandler(true); 72 this.errorHandler = new LoggingErrorHandler(); 73 this.batchErrorHandler = errHandler == null ? new BatchLoggingErrorHandler() 74 : (BatchErrorHandler) errHandler; 75 } 76 else { 77 validateErrorHandler(false); 78 this.errorHandler = errHandler == null ? new LoggingErrorHandler() : (ErrorHandler) errHandler; 79 this.batchErrorHandler = new BatchLoggingErrorHandler(); 80 } 81 Assert.state(!this.isBatchListener || !this.isRecordAck, "Cannot use AckMode.RECORD with a batch listener"); 82 }

1.定義消費者訂閱topic或者指定分區

2.設置監聽器,支持4種:

  1)BatchAcknowledgingMessageListener批量需確認消息監聽器

  2)AcknowledgingMessageListener需確認消息監聽器

  3)BatchMessageListener批量消息監聽器

  4)MessageListener消息監聽器(用的最多,一次消費一條)

 

2.提交 監聽消費者任務(ListenerConsumer),返回Future並賦值。

這裏咱們看一下任務Runnable接口的run方法,分兩種狀況

1.若是自定義了分區,不必再平衡分配分區了,直接回調

2.未指定分區,進入自旋消費

 1 @Override
 2         public void run() {
 3             if (this.genericListener instanceof ConsumerSeekAware) {
 4                 ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
 5             }
 6             this.count = 0;
 7             this.last = System.currentTimeMillis();
 8             if (isRunning() && this.definedPartitions != null) {// 1.若是運行中且自定義了分區,不必再平衡分配分區了,直接回調
 9                 initPartitionsIfNeeded();// 有須要就初始化分區
10                 // 回調
13                 if (!this.autoCommit) {
14  startInvoker();
15                 }
16             }
17             long lastReceive = System.currentTimeMillis();
18             long lastAlertAt = lastReceive;
19             while (isRunning()) {//2.未指定分區,進入自旋消費 20                 try {
21                     if (!this.autoCommit) {
22                         processCommits();// 若是手動提交,處理提交
23                     }
24                     processSeeks();// 從新定位偏移量,下一次消費時使用
25                     if (this.logger.isTraceEnabled()) {
26                         this.logger.trace("Polling (paused=" + this.paused + ")...");
27                     }// 1)拉取消費記錄
28                     ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
29                     if (records != null && this.logger.isDebugEnabled()) {
30                         this.logger.debug("Received: " + records.count() + " records");
31                     }
32                     if (records != null && records.count() > 0) {
33                         if (this.containerProperties.getIdleEventInterval() != null) {
34                             lastReceive = System.currentTimeMillis();
35                         }// 2)若是設置了自動提交,直接在當前線程執行
39                         if (this.autoCommit) {
40  invokeListener(records);
41                         }
42                         else {// 3)不然發送消息進緩存隊列
43                             if (sendToListener(records)) {
44                                 if (this.assignedPartitions != null) {
45                                     // avoid group management rebalance due to a slow
46                                     // consumer
47                                     this.consumer.pause(this.assignedPartitions);
48                                     this.paused = true;
49                                     this.unsent = records;
50                                 }
51                             }
52                         }
53                     }
54                     else {
55                         if (this.containerProperties.getIdleEventInterval() != null) {
56                             long now = System.currentTimeMillis();
57                             if (now > lastReceive + this.containerProperties.getIdleEventInterval()
58                                     && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
59                                 publishIdleContainerEvent(now - lastReceive);
60                                 lastAlertAt = now;
61                                 if (this.genericListener instanceof ConsumerSeekAware) {
62                                     seekPartitions(getAssignedPartitions(), true);
63                                 }
64                             }
65                         }
66                     }
67                     this.unsent = checkPause(this.unsent);
68                 }
69                 catch (WakeupException e) {
70                     this.unsent = checkPause(this.unsent);
71                 }
72                 catch (Exception e) {
73                     if (this.containerProperties.getGenericErrorHandler() != null) {
74                         this.containerProperties.getGenericErrorHandler().handle(e, null);
75                     }
76                     else {
77                         this.logger.error("Container exception", e);
78                     }
79                 }
80             }
81             if (this.listenerInvokerFuture != null) {
82                 stopInvoker();
83                 commitManualAcks();
84             }
85             try {
86                 this.consumer.unsubscribe();
87             }
88             catch (WakeupException e) {
89                 // No-op. Continue process
90             }
91             this.consumer.close();
92             if (this.logger.isInfoEnabled()) {
93                 this.logger.info("Consumer stopped");
94             }
95         }

1.若是用戶自定義了分區且非自動提交,那麼開啓異步線程執行ListenerInvoker任務,源碼以下:

1 private void startInvoker() {
2             ListenerConsumer.this.invoker = new ListenerInvoker();
3             ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
4                     .submit(ListenerConsumer.this.invoker);
5         }

執行ListenerInvoker的run方法,實際上就執行一遍,由於CountDownLatch初始化爲1

 1 private final class ListenerInvoker implements SchedulingAwareRunnable {
 2 
 3             private final CountDownLatch exitLatch = new CountDownLatch(1);
 4 
 5             private volatile boolean active = true;
 6 
 7             private volatile Thread executingThread;
 8 
 9             ListenerInvoker() {
10                 super();
11             }
12 
13             @Override
14             public void run() {
15                 Assert.isTrue(this.active, "This instance is not active anymore");
16                 if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {
17                     ((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);
18                 }
19                 try {
20                     this.executingThread = Thread.currentThread();
21                     while (this.active) {
22                         try {// 從阻塞隊列LinkedBlockingQueue recordsToProcess中拉取 待消費記錄
23                             ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,
24                                     TimeUnit.SECONDS);
25                             if (this.active) {
26                                 if (records != null) {
27  invokeListener(records);// 消費
28                                 }
29                                 else {
30                                     if (ListenerConsumer.this.logger.isTraceEnabled()) {
31                                         ListenerConsumer.this.logger.trace("No records to process");
32                                     }
33                                 }
34                             }
35                         }
36                         catch (InterruptedException e) {
37                             if (!this.active) {
38                                 Thread.currentThread().interrupt();
39                             }
40                             else {
41                                 ListenerConsumer.this.logger.debug("Interrupt ignored");
42                             }
43                         }
44                     }
45                 }
46                 finally {
47                     this.active = false;
48                     this.exitLatch.countDown();
49                 }
50             }
51 
52             @Override
53             public boolean isLongLived() {
54                 return true;
55             }
581         }
1 private void invokeListener(final ConsumerRecords<K, V> records) {
2             if (this.isBatchListener) {
3                 invokeBatchListener(records);
4             }
5             else {
6  invokeRecordListener(records);
7             }
8         }

如上圖,從阻塞隊列中取得待消費記錄,用迭代器iterator消費,根據自定義消費類型,用不一樣listener來執行onMessage方法(用戶自定義MessageListener接口的onMessage方法,實現用戶本身的消費業務邏輯

 1 private void invokeRecordListener(final ConsumerRecords<K, V> records) {
 2             Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
 3             while (iterator.hasNext() && (this.autoCommit || (this.invoker != null && this.invoker.active))) {
 4                 final ConsumerRecord<K, V> record = iterator.next();
 5                 if (this.logger.isTraceEnabled()) {
 6                     this.logger.trace("Processing " + record);
 7                 }
 8                 try {
 9                     if (this.acknowledgingMessageListener != null) {
10                         this.acknowledgingMessageListener.onMessage(record,// 終極核心方法,用戶自定義的MessageListener接口的onMessage方法
11                                 this.isAnyManualAck
12                                         ? new ConsumerAcknowledgment(record, this.isManualImmediateAck)
13                                         : null);
14                     }
15                     else {
16                         this.listener.onMessage(record);// 終極核心方法,用戶自定義的MessageListener接口的onMessage方法
17                     }
18                     if (!this.isAnyManualAck && !this.autoCommit) {
19                         this.acks.add(record);
20                     }
21                 }
22                 catch (Exception e) {
23                     if (this.containerProperties.isAckOnError() && !this.autoCommit) {
24                         this.acks.add(record);
25                     }
26                     try {
27                         this.errorHandler.handle(e, record);
28                     }
29                     catch (Exception ee) {
30                         this.logger.error("Error handler threw an exception", ee);
31                     }
32                     catch (Error er) { //NOSONAR
33                         this.logger.error("Error handler threw an error", er);
34                         throw er;
35                     }
36                 }
37             }
38         }

2.未指定分區,進入自旋

// 1)拉取消費記錄 ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
 2)若是設置了自動提交,直接在當前線程執行
invokeListener(records);
// 3)不然發送消息進緩存隊列 sendToListener(records)

1)在每一個輪詢中,消費者將嘗試使用最後一個被使用的偏移量做爲起始偏移量,並按順序提取。最後一個被消費的偏移量能夠經過 seek(TopicPartition,long)或自動設置爲最後一個被訂閱的分區列表的偏移量得到。

 1 @Override
 2     public ConsumerRecords<K, V> poll(long timeout) {
 3         acquire();
 4         try {
 5             if (timeout < 0)
 6                 throw new IllegalArgumentException("Timeout must not be negative");
 7 
 8             if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
 9                 throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
10 
11             // poll for new data until the timeout expires
12             long start = time.milliseconds();
13             long remaining = timeout;
14             do {
15                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
16                 if (!records.isEmpty()) {
23                     fetcher.sendFetches();// 在返回所獲取的記錄以前,咱們能夠發送下一輪的fetches並避免阻塞等待它們的響應,以便在用戶處理獲取的記錄時進行流水線操做。
24                     client.pollNoWakeup();//因爲已經更新了所使用的位置,因此咱們不容許在返回所獲取的記錄以前觸發wakeups或任何其餘錯誤。
25 
26                     if (this.interceptors == null)
27                         return new ConsumerRecords<>(records);
28                     else// 若是存在消費者攔截器執行攔截
29                         return this.interceptors.onConsume(new ConsumerRecords<>(records));
30                 }
31 
32                 long elapsed = time.milliseconds() - start;
33                 remaining = timeout - elapsed;
34             } while (remaining > 0);
35 
36             return ConsumerRecords.empty();
37         } finally {
38             release();
39         }
40     }

pollOnce:

 1 private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
 2         coordinator.poll(time.milliseconds());
 3 
 4         // ,若是有未知偏移量(分區的),那麼更新。涉及coordinator刷新已提交分區偏移量+fetcher更新獲取位置
 6         if (!subscriptions.hasAllFetchPositions())
 7             updateFetchPositions(this.subscriptions.missingFetchPositions());
 8 
 9         // 返回已獲取到的記錄
10         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
11         if (!records.isEmpty())
12             return records;
13 
14         // 發送fetch請求
15         fetcher.sendFetches();
16 
17         long now = time.milliseconds();
18         long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
19         // 執行IO,拉取數據
20         client.poll(pollTimeout, now, new PollCondition() {
21             @Override
22             public boolean shouldBlock() {
23                 // since a fetch might be completed by the background thread, we need this poll condition
24                 // to ensure that we do not block unnecessarily in poll()
25                 return !fetcher.hasCompletedFetches();
26             }
27         });
31         if (coordinator.needRejoin())
32             return Collections.emptyMap();
33 
34         return fetcher.fetchedRecords();
35     }遍歷全部的TopicPartition

 好吧,再往下涉及到通訊IO層了,這裏再也不多說。未來補全了kafka通訊協議相關文章後再加上飛機票。

2)invokeListener和分支1同樣最終調用的是用戶自定義的MessageListener接口的onMessage方法,再也不重複。

3) sendToListener,這裏塞進緩存隊列LinkedBlockingQueue<ConsumerRecords<K, V>> recordsToProcess,塞進隊列後,什麼時候再消費?ListenerInvoker的run方法執行了recordsToProcess.poll進行了消費,

相關文章
相關標籤/搜索