本節重點討論 Kafka 的消息拉起流程。java
@TOCnode
消息拉起主要入口爲:KafkaConsumer#poll方法,其聲明以下:算法
public ConsumerRecords<k, v> poll(final Duration timeout) { // [@1](https://my.oschina.net/u/1198) return poll(time.timer(timeout), true); // @2 }
代碼@1:參數爲超時時間,使用 java 的 Duration 來定義。 代碼@2:調用內部的 poll 方法。api
KafkaConsumer#poll緩存
private ConsumerRecords<k, v> poll(final Timer timer, final boolean includeMetadataInTimeout) { // [@1](https://my.oschina.net/u/1198) acquireAndEnsureOpen(); // @2 try { if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { // [@3](https://my.oschina.net/u/2648711) throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); } // poll for new data until the timeout expires do { // @4 client.maybeTriggerWakeup(); //@5 if (includeMetadataInTimeout) { // @6 if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn("Still waiting for metadata"); } } final Map<topicpartition, list<consumerrecord<k, v>>> records = pollForFetches(timer); // @7 if (!records.isEmpty()) { if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) { // @8 client.pollNoWakeup(); } return this.interceptors.onConsume(new ConsumerRecords<>(records)); // @9 } } while (timer.notExpired()); return ConsumerRecords.empty(); } finally { release(); } }
代碼@1:首先先對其參數含義進行講解。安全
代碼@2:檢查是否能夠拉取消息,其主要判斷依據以下:網絡
代碼@3:若是當前消費者未訂閱任何主題或者沒有指定隊列,則拋出錯誤,結束本次消息拉取。session
代碼@4:使用 do while 結構循環拉取消息,直到超時或拉取到消息。數據結構
代碼@5:避免在禁止禁用wakeup時,有請求想喚醒時則拋出異常,例如在下面的@8時,會禁用wakeup。架構
代碼@6:更新相關元數據,爲真正向 broker 發送消息拉取請求作好準備,該方法將在下面詳細介紹,如今先簡單介紹其核心實現點:
這裏會有一個更新元數據是否佔用消息拉取的超時時間,默認爲 true。
代碼@7:調用 pollForFetches 向broker拉取消息,該方法將在下文詳細介紹。
代碼@8:若是拉取到的消息集合不爲空,再返回該批消息以前,若是還有擠壓的拉取請求,能夠繼續發送拉取請求,但此時會禁用warkup,主要的目的是用戶在處理消息時,KafkaConsumer 還能夠繼續向broker 拉取消息。
代碼@9:執行消費攔截器。
接下來對上文提到的代碼@6、@7進行詳細介紹。
KafkaConsumer#updateAssignmentMetadataIfNeeded
boolean updateAssignmentMetadataIfNeeded(final Timer timer) { if (coordinator != null && !coordinator.poll(timer)) { // @1 return false; } return updateFetchPositions(timer); // @2 }
要理解這個方法實現的用途,咱們就必須依次對 coordinator.poll 方法與 updateFetchPositions 方法。
public boolean poll(Timer timer) { invokeCompletedOffsetCommitCallbacks(); // @1 if (subscriptions.partitionsAutoAssigned()) { // @2 pollHeartbeat(timer.currentTimeMs()); // @21 if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { //@22 return false; } if (rejoinNeededOrPending()) { // @23 if (subscriptions.hasPatternSubscription()) { // @231 if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) { this.metadata.requestUpdate(); } if (!client.ensureFreshMetadata(timer)) { return false; } } if (!ensureActiveGroup(timer)) { // @232 return false; } } } else { // @3 if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); } } maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); // @4 return true; }
代碼@1:執行已完成的 offset (消費進度)提交請求的回調函數。
代碼@2:隊列負載算法爲自動分配(即 Kafka 根據消費者個數與分區書動態負載分區)的相關的處理邏輯。其實現關鍵點以下:
代碼@3:用戶手動爲消費組指定負載的隊列的相關處理邏輯,其實現關鍵以下:
代碼@4:若是開啓了自動提交消費進度,而且已到下一次提交時間,則提交。Kafka 消費者能夠經過設置屬性 enable.auto.commit 來開啓自動提交,該參數默認爲 true,則默認會每隔 5s 提交一次消費進度,提交間隔能夠經過參數 auto.commit.interval.ms 設置。
接下來繼續探討 updateAssignmentMetadataIfNeeded (更新元數據)的第二個步驟,更新拉取位移。
KafkaConsumer#updateFetchPositions
private boolean updateFetchPositions(final Timer timer) { cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions(); if (cachedSubscriptionHashAllFetchPositions) { // @1 return true; } if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) // @2 return false; subscriptions.resetMissingPositions(); // @3 fetcher.resetOffsetsIfNeeded(); // @4 return true; }
代碼@1:若是訂閱關係中的全部分區都有有效的位移,則返回 true。
代碼@2:若是存在任意一個分區沒有有效的位移信息,則須要向 broker 發送請求,從broker 獲取該消費組,該分區的消費進度。相關的實現細節將在後續文章【Kafka 消費進度】專題文章中詳細介紹。
代碼@3:若是通過第二步,訂閱關係中還某些分區仍是沒有獲取到有效的偏移量,則使用偏移量重置策略進行重置,若是未配置,則拋出異常。
代碼@4:發送一個異步請求去重置那些正等待重置位置的分區。有關 Kafka 消費消費進度、重平衡等知識將會在後續文章中深刻探討,本文只需瞭解 poll 消息的核心處理流程。
從 KafkaConsumer#poll 中流程能夠看到,經過 updateAssignmentMetadataIfNeeded 對元數據、重平衡,更新拉取偏移量等工做處理完成後,下一步就是須要向 broker 拉取消息了,其實現入口爲:KafkaConsumer 的 pollForFetches 方法。
KafkaConsumer#pollForFetches
private Map<topicpartition, list<consumerrecord<k, v>>> pollForFetches(Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); // @1 // if data is available already, return it immediately final Map<topicpartition, list<consumerrecord<k, v>>> records = fetcher.fetchedRecords(); // @2 if (!records.isEmpty()) { return records; } fetcher.sendFetches(); // @3 // We do not want to be stuck blocking in poll if we are missing some positions // since the offset lookup may be backing off after a failure // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call // updateAssignmentMetadataIfNeeded before this method. if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) { // @4 pollTimeout = retryBackoffMs; } Timer pollTimer = time.timer(pollTimeout); client.poll(pollTimer, () -> { return !fetcher.hasCompletedFetches(); }); // @5 timer.update(pollTimer.currentTimeMs()); // @6 if (coordinator != null && coordinator.rejoinNeededOrPending()) { // @7 return Collections.emptyMap(); } return fetcher.fetchedRecords(); // @8 }
代碼@1:計算本次拉取的超時時間,其計算邏輯以下:
代碼@2:若是數據已經拉回到本地,直接返回數據。將在下文詳細介紹 Fetcher 的 fetchedRecords 方法。
代碼@3:組裝發送請求,並將存儲在待發送請求列表中。
代碼@4:若是已緩存的分區信息中存在某些分區缺乏偏移量,若是拉取的超時時間大於失敗重試須要阻塞的時間,則更新這次拉取的超時時間爲失敗重試須要的間隔時間,主要的目的是不但願在 poll 過程當中被阻塞【後續會詳細介紹 Kafka 拉取消息的線程模型,再來回顧一下這裏】。
代碼@5:經過調用NetworkClient 的 poll 方法發起消息拉取操做(觸發網絡讀寫)。
代碼@6:更新本次拉取的時間。
代碼@7:檢查是須要重平衡。
代碼@8:將從 broker 讀取到的數據返回(即封裝成消息)。
從上面消息拉取流程來看,有幾個比較重要的方法,例如 Fetcher 類相關的方法,NetworkClient 的 poll 方法,那咱們接下來來重點探討。
咱們先用一張流程圖總結一下消息拉取的全過程:
接下來咱們將重點看一下 KafkaConsumer 的 pollForFetches 詳細過程,也就是須要詳細探究 Fetcher 類的實現細節。
Fetcher 封裝消息拉取的方法,能夠當作是消息拉取的門面類。
咱們首先一一介紹一下 Fetcher 的核心屬性與核心方法。
接下來咱們將按照消息流程,一塊兒來看一下 Fetcher 的核心方法。
Fetcher#fetchedRecords
public Map<topicpartition, list<consumerrecord<k, v>>> fetchedRecords() { Map<topicpartition, list<consumerrecord<k, v>>> fetched = new HashMap<>(); // @1 int recordsRemaining = maxPollRecords; try { while (recordsRemaining > 0) { // @2 if (nextInLineRecords == null || nextInLineRecords.isFetched) { // @3 CompletedFetch completedFetch = completedFetches.peek(); if (completedFetch == null) break; try { nextInLineRecords = parseCompletedFetch(completedFetch); } catch (Exception e) { FetchResponse.PartitionData partition = completedFetch.partitionData; if (fetched.isEmpty() && (partition.records == null || partition.records.sizeInBytes() == 0)) { completedFetches.poll(); } throw e; } completedFetches.poll(); } else { // @4 List<consumerrecord<k, v>> records = fetchRecords(nextInLineRecords, recordsRemaining); TopicPartition partition = nextInLineRecords.partition; if (!records.isEmpty()) { List<consumerrecord<k, v>> currentRecords = fetched.get(partition); if (currentRecords == null) { fetched.put(partition, records); } else { List<consumerrecord<k, v>> newRecords = new ArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); fetched.put(partition, newRecords); } recordsRemaining -= records.size(); } } } } catch (KafkaException e) { if (fetched.isEmpty()) throw e; } return fetched; }
代碼@1:首先先解釋兩個局部變量的含義:
代碼@2:循環去取已經完成了 Fetch 請求的消息,該 while 循環有兩個跳出條件:
代碼@三、@4 主要完成從緩存中解析數據的兩個步驟,初次運行的時候,會進入分支@3,而後從 調用 parseCompletedFetch 解析成 PartitionRecords 對象,而後代碼@4的職責就是從解析 PartitionRecords ,將消息封裝成 ConsumerRecord,返回給消費端線程處理。
代碼@3的實現要點以下:
從上面可知,上述方法的核心方法是:parseCompletedFetch。
代碼@4的實現要點無非就是調用 fetchRecords 方法,按分區組裝成 Map<topicpartition, list<consumerrecord<k, v>>>,供消費者處理,例如供業務處理。
接下來將重點探討上述兩個方法的實現細節。
在嘗試探討該方法以前,咱們首先對其入參進行一個梳理,特別是先認識其主要數據結構。
一、CompletedFetch 相關類圖
從上圖能夠看出,CompleteFetch 核心屬性主要以下:
分區的數據是使用 PartitionData 來進行封裝的。咱們也來簡單的瞭解一下其內部數據結果。
二、parseCompletedFetch 詳解
private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) { TopicPartition tp = completedFetch.partition; FetchResponse.PartitionData<records> partition = completedFetch.partitionData; long fetchOffset = completedFetch.fetchedOffset; PartitionRecords partitionRecords = null; Errors error = partition.error; try { if (!subscriptions.isFetchable(tp)) { // @1 log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); } else if (error == Errors.NONE) { // @2 Long position = subscriptions.position(tp); if (position == null || position != fetchOffset) { // @21 log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " + "the expected offset {}", tp, fetchOffset, position); return null; } log.trace("Preparing to read {} bytes of data for partition {} with offset {}", partition.records.sizeInBytes(), tp, position); Iterator<!--? extends RecordBatch--> batches = partition.records.batches().iterator(); // @22 partitionRecords = new PartitionRecords(tp, completedFetch, batches); if (!batches.hasNext() && partition.records.sizeInBytes() > 0) { // @23 if (completedFetch.responseVersion < 3) { Map<topicpartition, long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset); throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " + "newer to avoid this issue. Alternately, increase the fetch size on the client (using " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", recordTooLargePartitions); } else { // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74) throw new KafkaException("Failed to make progress reading messages at " + tp + "=" + fetchOffset + ". Received a non-empty fetch response from the server, but no " + "complete records were found."); } } if (partition.highWatermark >= 0) { // @24 log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark); subscriptions.updateHighWatermark(tp, partition.highWatermark); } if (partition.logStartOffset >= 0) { // @25 log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset); subscriptions.updateLogStartOffset(tp, partition.logStartOffset); } if (partition.lastStableOffset >= 0) { // @26 log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); } } else if (error == Errors.NOT_LEADER_FOR_PARTITION || error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR) { // @3 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); this.metadata.requestUpdate(); } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { // @4 log.warn("Received unknown topic or partition error in fetch for partition {}", tp); this.metadata.requestUpdate(); } else if (error == Errors.OFFSET_OUT_OF_RANGE) { // @5 if (fetchOffset != subscriptions.position(tp)) { log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " + "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp)); } else if (subscriptions.hasDefaultOffsetResetPolicy()) { log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp); subscriptions.requestOffsetReset(tp); } else { throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset)); } } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { // @6 log.warn("Not authorized to read from topic {}.", tp.topic()); throw new TopicAuthorizationException(Collections.singleton(tp.topic())); } else if (error == Errors.UNKNOWN_SERVER_ERROR) { log.warn("Unknown error fetching data for topic-partition {}", tp); } else { throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data"); } } finally { // @7 if (partitionRecords == null) completedFetch.metricAggregator.record(tp, 0, 0); if (error != Errors.NONE) // we move the partition to the end if there was an error. This way, it's more likely that partitions for // the same topic can remain together (allowing for more efficient serialization). subscriptions.movePartitionToEnd(tp); } return partitionRecords; }
上面的代碼雖然比較長,其實總體仍是比較簡單,只是須要針對各類異常處理,打印對應的日誌,接下來詳細介紹該方法的實現關鍵點。
代碼@1:判斷該分區是否可拉取,若是不可拉取,則忽略這批拉取的消息,判斷是可拉取的要點以下:
代碼@2:該分支是處理正常返回的相關邏輯。其關鍵點以下:
從代碼@3到@8 是多種異常信息的處理。 代碼@3:若是出現以下3種錯誤碼,則使用 debug 打印錯誤日誌,而且向服務端請求元數據並更新本地緩存。
Kafka 認爲上述錯誤是可恢復的,並且對消費不會形成太大影響,故只是用 debug 打印日誌,而後更新本地緩存便可。
代碼@4:若是出現 UNKNOWN_TOPIC_OR_PARTITION 未知主題與分區時,則使用 warn 級別輸出錯誤日誌,並更新元數據。
代碼@5:針對 OFFSET_OUT_OF_RANGE 偏移量超過範圍異常的處理邏輯,其實現關鍵點以下:
代碼@6:若是是 TOPIC_AUTHORIZATION_FAILED 沒有權限(ACL)則拋出異常。
代碼@7:若是本次拉取的結果不是NONE(成功),而且是可恢復的,將該隊列的訂閱關係移動到消費者緩存列表的末尾。若是成功,則返回拉取到的分區數據,其封裝對象爲 PartitionRecords。
接下來咱們再來看看 2.1.1 fetchedRecords 中的另一個核心方法。
在介紹該方法以前一樣先來看一下參數 PartitionRecords 的內部結構。
一、PartitionRecords 類圖
主要的核心屬性以下:
二、fetchRecords 詳解
private List<consumerrecord<k, v>> fetchRecords(PartitionRecords partitionRecords, int maxRecords) { if (!subscriptions.isAssigned(partitionRecords.partition)) { // @1 // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition); } else if (!subscriptions.isFetchable(partitionRecords.partition)) { // @2 // this can happen when a partition is paused before fetched records are returned to the consumer's // poll call or if the offset is being reset log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition); } else { long position = subscriptions.position(partitionRecords.partition); // @3 if (partitionRecords.nextFetchOffset == position) { // @4 List<consumerrecord<k, v>> partRecords = partitionRecords.fetchRecords(maxRecords); long nextOffset = partitionRecords.nextFetchOffset; log.trace("Returning fetched records at offset {} for assigned partition {} and update " + "position to {}", position, partitionRecords.partition, nextOffset); subscriptions.position(partitionRecords.partition, nextOffset); Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel); if (partitionLag != null) this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag); Long lead = subscriptions.partitionLead(partitionRecords.partition); if (lead != null) { this.sensors.recordPartitionLead(partitionRecords.partition, lead); } return partRecords; } else { // @5 // these records aren't next in line based on the last consumed position, ignore them // they must be from an obsolete request log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", partitionRecords.partition, partitionRecords.nextFetchOffset, position); } } partitionRecords.drain(); return emptyList(); }
代碼@1:從 PartitionRecords 中提取消息以前,再次判斷訂閱消息中是否包含當前分區,若是不包含,則使用 debug 打印日誌,頗有多是發生了重平衡。
代碼@2:是否容許拉取,若是用戶主動暫停消費,則忽略本次拉取的消息。備註:Kafka 消費端若是消費太快,能夠進行限流。
代碼@3:從本地消費者緩存中獲取該隊列已消費的偏移量,在發送拉取消息時,就是從該偏移量開始拉取的。
代碼@4:若是本地緩存已消費偏移量與從服務端拉回的起始偏移量相等的話,則認爲是一個有效拉取,不然則認爲是一個過時的拉取,該批消息已被消費,見代碼@5。若是是一個有效請求,則使用 sensors 收集統計信息,並返回拉取到的消息, 返回結果被封裝在 List<consumerrecord<k, v>> 。
「發送」 fetch 請求,注意這裏並不會觸發網絡操做,而是組裝拉取請求,將其放入網絡緩存區。
Fetcher#sendFetches
public synchronized int sendFetches() { Map<node, fetchsessionhandler.fetchrequestdata> fetchRequestMap = prepareFetchRequests(); // @1 for (Map.Entry<node, fetchsessionhandler.fetchrequestdata> entry : fetchRequestMap.entrySet()) { // @2 final Node fetchTarget = entry.getKey(); final FetchSessionHandler.FetchRequestData data = entry.getValue(); final FetchRequest.Builder request = FetchRequest.Builder .forConsumer(this.maxWaitMs, this.minBytes, data.toSend()) .isolationLevel(isolationLevel) .setMaxBytes(this.maxBytes) .metadata(data.metadata()) .toForget(data.toForget()); // @3 client.send(fetchTarget, request) // @4 .addListener(new RequestFutureListener<clientresponse>() { @Override public void onSuccess(ClientResponse resp) { // @5 synchronized (Fetcher.this) { @SuppressWarnings("unchecked") FetchResponse<records> response = (FetchResponse<records>) resp.responseBody(); FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler == null) { log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", fetchTarget.id()); return; } if (!handler.handleResponse(response)) { return; } Set<topicpartition> partitions = new HashSet<>(response.responseData().keySet()); FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions); for (Map.Entry<topicpartition, fetchresponse.partitiondata<records>> entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); long fetchOffset = data.sessionPartitions().get(partition).fetchOffset; FetchResponse.PartitionData<records> fetchData = entry.getValue(); completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator, resp.requestHeader().apiVersion())); } // @6 sensors.fetchLatency.record(resp.requestLatencyMs()); } } public void onFailure(RuntimeException e) { // @7 synchronized (Fetcher.this) { FetchSessionHandler handler = sessionHandler(fetchTarget.id()); if (handler != null) { handler.handleError(e); } } } }); } return fetchRequestMap.size(); } ~~~java 上面的方法比較長,其實現的關鍵點以下: 代碼@1:經過調用 Fetcher 的 prepareFetchRequests 方法按節點組裝拉取請求,將在後面詳細介紹。 代碼@2:遍歷上面的待發請求,進一步組裝請求。下面就是分節點發送拉取請求。 代碼@3:構建 FetchRequest 拉取請求對象。 代碼@4:調用 NetworkClient 的 send 方法將其發送到發送緩存區,本文不會詳細介紹網絡方面的實現,但下文會截圖說明拉取請求發送緩存區的一個關鍵點。 代碼@5:這裏會註冊事件監聽器,當消息從 broker 拉取到本地後觸發回調,即消息拉取請求收到返回結果後會將返回結果放入到completedFetches 中(代碼@6),這就和上文消息拉取時 Fetcher 的 fetchedRecords 方法造成閉環。 代碼@7:消息拉取一次處理。 接下來詳細介紹 prepareFetchRequests 方法。 ###### 2.2.2.1 Fetcher prepareFetchRequests 方法詳解 ~~~java private Map<node, fetchsessionhandler.fetchrequestdata> prepareFetchRequests() { Map<node, fetchsessionhandler.builder> fetchable = new LinkedHashMap<>(); for (TopicPartition partition : fetchablePartitions()) { // @1 Node node = metadata.partitionInfoIfCurrent(partition).map(PartitionInfo::leader).orElse(null); // @2 if (node == null) { // @3 metadata.requestUpdate(); } else if (client.isUnavailable(node)) { // @4 client.maybeThrowAuthFailure(node); log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); } else if (client.hasPendingRequests(node)) { // @5 log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } else { // if there is a leader and no in-flight requests, issue a new fetch FetchSessionHandler.Builder builder = fetchable.get(node); // @7 if (builder == null) { FetchSessionHandler handler = sessionHandler(node.id()); if (handler == null) { handler = new FetchSessionHandler(logContext, node.id()); sessionHandlers.put(node.id(), handler); } builder = handler.newBuilder(); fetchable.put(node, builder); } long position = this.subscriptions.position(partition); builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, Optional.empty())); log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel, partition, position, node); } } Map<node, fetchsessionhandler.fetchrequestdata> reqs = new LinkedHashMap<>(); for (Map.Entry<node, fetchsessionhandler.builder> entry : fetchable.entrySet()) { reqs.put(entry.getKey(), entry.getValue().build()); } return reqs; }
代碼@1:首先經過調用 fetchablePartitions() 獲取可發起拉取任務的分區信息,下文簡單介紹一下。
代碼@2:若是該分區在客戶端本地緩存中獲取該分區的 Leader 節點信息。
代碼@3:若是其 Leader 節點信息爲空,則發起更新元數據請求,本次拉取任務將不會包含該分區。
代碼@4:若是客戶端與該分區的 Leader 鏈接爲完成,若是是由於權限的緣由則拋出ACL相關異常,不然打印日誌,本次拉取請求不會包含該分區。
代碼@5:判斷該節點是否有掛起的拉取請求,即發送緩存區中是待發送的請求,若是有,本次將不會被拉取。
代碼@6:構建拉取請求,分節點組織請求。
NetworkClient 的 send 方法只是將其放入 unsent 中。
與上文的 client.hasPendingRequests(node) 方法遙相呼應。
三、總結 上面的源碼分析有點長,也有點枯燥,咱們仍是畫一張流程圖來進行總結。
Kafka 的消息拉取流程仍是比較複雜的,後面會基於上述流程,重點進行拆解,例如消費進度提交,負載隊列重平衡等等。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。
</node,></node,></node,></node,></records></topicpartition,></topicpartition></records></records></clientresponse></node,></node,></consumerrecord<k,></consumerrecord<k,></consumerrecord<k,></fetchresponse.abortedtransaction></topicpartition,></records></topicpartition,></topicpartition,></consumerrecord<k,></consumerrecord<k,></consumerrecord<k,></topicpartition,></topicpartition,></integer,></topicpartition,></topicpartition,></topicpartition,></k,></k,>