源碼分析Kafka 消息拉取流程(文末兩張流程圖)

本節重點討論 Kafka 的消息拉起流程。java

@TOCnode

1、KafkaConsumer poll 詳解

消息拉起主要入口爲:KafkaConsumer#poll方法,其聲明以下:算法

public ConsumerRecords<k, v> poll(final Duration timeout) {  // [@1](https://my.oschina.net/u/1198)
    return poll(time.timer(timeout), true);                                     // @2
}

代碼@1:參數爲超時時間,使用 java 的 Duration 來定義。 代碼@2:調用內部的 poll 方法。api

KafkaConsumer#poll緩存

private ConsumerRecords<k, v> poll(final Timer timer, final boolean includeMetadataInTimeout) {  // [@1](https://my.oschina.net/u/1198)
    acquireAndEnsureOpen();                                                                                                               // @2
    try {
        if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {                                                  // [@3](https://my.oschina.net/u/2648711)
            throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
        }

        // poll for new data until the timeout expires
        do {                                       // @4
            client.maybeTriggerWakeup();                                                                                               //@5

            if (includeMetadataInTimeout) {                                           // @6                                                                               
                if (!updateAssignmentMetadataIfNeeded(timer)) {
                    return ConsumerRecords.empty();
                }
            } else {
                while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {        
                    log.warn("Still waiting for metadata");
                }
            }

            final Map<topicpartition, list<consumerrecord<k, v>&gt;&gt; records = pollForFetches(timer);   // @7
            if (!records.isEmpty()) {                                                                                                           
                if (fetcher.sendFetches() &gt; 0 || client.hasPendingRequests()) {                                           // @8
                    client.pollNoWakeup();
                }
                return this.interceptors.onConsume(new ConsumerRecords&lt;&gt;(records));                         // @9
            }
        } while (timer.notExpired());                                                                                                         

        return ConsumerRecords.empty();
    } finally {
        release();
    }
}

代碼@1:首先先對其參數含義進行講解。安全

  • boolean includeMetadataInTimeout 拉取消息的超時時間是否包含更新元數據的時間,默認爲true,即包含。

代碼@2:檢查是否能夠拉取消息,其主要判斷依據以下:網絡

  • KafkaConsumer 是否有其餘線程再執行,若是有,則拋出異常,由於 - KafkaConsumer 是線程不安全的,同一時間只能一個線程執行。
  • KafkaConsumer 沒有被關閉。

代碼@3:若是當前消費者未訂閱任何主題或者沒有指定隊列,則拋出錯誤,結束本次消息拉取。session

代碼@4:使用 do while 結構循環拉取消息,直到超時或拉取到消息。數據結構

代碼@5:避免在禁止禁用wakeup時,有請求想喚醒時則拋出異常,例如在下面的@8時,會禁用wakeup。架構

代碼@6:更新相關元數據,爲真正向 broker 發送消息拉取請求作好準備,該方法將在下面詳細介紹,如今先簡單介紹其核心實現點:

  • 若有必要,先向 broker 端拉取最新的訂閱信息(包含消費組內的在線的消費客戶端)。
  • 執行已完成(異步提交)的 offset 提交請求的回調函數。
  • 維護與 broker 端的心跳請求,確保不會被「踢出」消費組。
  • 更新元信息。
  • 若是是自動提交消費偏移量,則自動提交偏移量。
  • 更新各個分區下次待拉取的偏移量。

這裏會有一個更新元數據是否佔用消息拉取的超時時間,默認爲 true。

代碼@7:調用 pollForFetches 向broker拉取消息,該方法將在下文詳細介紹。

代碼@8:若是拉取到的消息集合不爲空,再返回該批消息以前,若是還有擠壓的拉取請求,能夠繼續發送拉取請求,但此時會禁用warkup,主要的目的是用戶在處理消息時,KafkaConsumer 還能夠繼續向broker 拉取消息。

代碼@9:執行消費攔截器。

接下來對上文提到的代碼@6、@7進行詳細介紹。

1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 詳解

KafkaConsumer#updateAssignmentMetadataIfNeeded

boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
    if (coordinator != null &amp;&amp; !coordinator.poll(timer)) {                            // @1
        return false;
    }
    return updateFetchPositions(timer);                                                  // @2
}

要理解這個方法實現的用途,咱們就必須依次對 coordinator.poll 方法與 updateFetchPositions 方法。

1.1.1 ConsumerCoordinator#poll
public boolean poll(Timer timer) {
    invokeCompletedOffsetCommitCallbacks();  // @1
    if (subscriptions.partitionsAutoAssigned()) {  // @2
        pollHeartbeat(timer.currentTimeMs());       // @21
        if (coordinatorUnknown() &amp;&amp; !ensureCoordinatorReady(timer)) {   //@22
            return false;
        }
        if (rejoinNeededOrPending()) {                                                       // @23
            if (subscriptions.hasPatternSubscription()) {                              // @231
                if (this.metadata.timeToAllowUpdate(time.milliseconds()) == 0) {  
                    this.metadata.requestUpdate();
                }
                if (!client.ensureFreshMetadata(timer)) {                                  
                    return false;
                }
            }
            if (!ensureActiveGroup(timer)) {                                                // @232
                return false;
            }
        }
    } else {                                                            // @3
        if (metadata.updateRequested() &amp;&amp; !client.hasReadyNodes(timer.currentTimeMs())) {
            client.awaitMetadataUpdate(timer);
        }
    }
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());   // @4
    return true;
}

代碼@1:執行已完成的 offset (消費進度)提交請求的回調函數。

代碼@2:隊列負載算法爲自動分配(即 Kafka 根據消費者個數與分區書動態負載分區)的相關的處理邏輯。其實現關鍵點以下:

  • 代碼@21:更新發送心跳相關的時間,例如heartbeatTimer、sessionTimer、pollTimer 分別表明發送最新發送心跳的時間、會話最新活躍時間、最新拉取消息。
  • 代碼@22:若是不存在協調器或協調器已斷開鏈接,則返回 false,結束本次拉取。若是協調器就緒,則繼續往下走。
  • 代碼@23:判斷是否須要觸發重平衡,即消費組內的全部消費者從新分配topic中的分區信息,例如元數據發送變化,判斷是否須要從新重平衡的關鍵點以下:
    • 若是隊列負載是經過用戶指定的,則返回 false,表示無需重平衡。
    • 若是隊列是自動負載,topic 隊列元數據發生了變化,則須要重平衡。
    • 若是隊列是自動負載,訂閱關係發生了變化,則須要重平衡。 若是須要重重平衡,則同步更新元數據,此過程會阻塞。詳細的重平衡將單獨重點介紹,這裏暫時不深刻展開。

代碼@3:用戶手動爲消費組指定負載的隊列的相關處理邏輯,其實現關鍵以下:

  • 若是須要更新元數據,而且尚未分區準備好,則同步阻塞等待元數據更新完畢。

代碼@4:若是開啓了自動提交消費進度,而且已到下一次提交時間,則提交。Kafka 消費者能夠經過設置屬性 enable.auto.commit 來開啓自動提交,該參數默認爲 true,則默認會每隔 5s 提交一次消費進度,提交間隔能夠經過參數 auto.commit.interval.ms 設置。

接下來繼續探討 updateAssignmentMetadataIfNeeded (更新元數據)的第二個步驟,更新拉取位移。

1.1.2 updateFetchPositions 詳解

KafkaConsumer#updateFetchPositions

private boolean updateFetchPositions(final Timer timer) {
    cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();  
    if (cachedSubscriptionHashAllFetchPositions) {           // @1
        return true;
    }
    if (coordinator != null &amp;&amp; !coordinator.refreshCommittedOffsetsIfNeeded(timer))   // @2
        return false;
    subscriptions.resetMissingPositions();                         // @3
    fetcher.resetOffsetsIfNeeded();                                    // @4
    return true;
}

代碼@1:若是訂閱關係中的全部分區都有有效的位移,則返回 true。

代碼@2:若是存在任意一個分區沒有有效的位移信息,則須要向 broker 發送請求,從broker 獲取該消費組,該分區的消費進度。相關的實現細節將在後續文章【Kafka 消費進度】專題文章中詳細介紹。

代碼@3:若是通過第二步,訂閱關係中還某些分區仍是沒有獲取到有效的偏移量,則使用偏移量重置策略進行重置,若是未配置,則拋出異常。

代碼@4:發送一個異步請求去重置那些正等待重置位置的分區。有關 Kafka 消費消費進度、重平衡等知識將會在後續文章中深刻探討,本文只需瞭解 poll 消息的核心處理流程。

從 KafkaConsumer#poll 中流程能夠看到,經過 updateAssignmentMetadataIfNeeded 對元數據、重平衡,更新拉取偏移量等工做處理完成後,下一步就是須要向 broker 拉取消息了,其實現入口爲:KafkaConsumer 的 pollForFetches 方法。

1.2 消息拉取

KafkaConsumer#pollForFetches

private Map<topicpartition, list<consumerrecord<k, v>&gt;&gt; pollForFetches(Timer timer) {
        long pollTimeout = coordinator == null ? timer.remainingMs() :
                Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());   // @1
        // if data is available already, return it immediately
        final Map<topicpartition, list<consumerrecord<k, v>&gt;&gt; records = fetcher.fetchedRecords();    // @2
        if (!records.isEmpty()) {
            return records;
        }
        fetcher.sendFetches();                               // @3
        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure
        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions &amp;&amp; pollTimeout &gt; retryBackoffMs) {   // @4
            pollTimeout = retryBackoffMs;
        }
        Timer pollTimer = time.timer(pollTimeout);
        client.poll(pollTimer, () -&gt; {
            return !fetcher.hasCompletedFetches();
        });         // @5
        timer.update(pollTimer.currentTimeMs());   // @6
        if (coordinator != null &amp;&amp; coordinator.rejoinNeededOrPending()) {  // @7
            return Collections.emptyMap();
        }
        return fetcher.fetchedRecords();   // @8
    }

代碼@1:計算本次拉取的超時時間,其計算邏輯以下:

  • 若是協調器爲空,則返回當前定時器剩餘時間便可。
  • 若是協調器不爲空,其邏輯較爲複雜,爲下面返回的超時間與當前定時器剩餘時間相比取最小值。
  • 若是不開啓自動提交位移而且未加入消費組,則超時時間爲Long.MAX_VALUE。
  • 若是不開啓自動提交位移而且已加入消費組,則返回距離下一次發送心跳包還剩多少時間。
  • 若是開啓自動提交位移,則返回 距離下一次自動提交位移所需時間 與 距離下一次發送心跳包所需時間 之間的最小值。

代碼@2:若是數據已經拉回到本地,直接返回數據。將在下文詳細介紹 Fetcher 的 fetchedRecords 方法。

代碼@3:組裝發送請求,並將存儲在待發送請求列表中。

代碼@4:若是已緩存的分區信息中存在某些分區缺乏偏移量,若是拉取的超時時間大於失敗重試須要阻塞的時間,則更新這次拉取的超時時間爲失敗重試須要的間隔時間,主要的目的是不但願在 poll 過程當中被阻塞【後續會詳細介紹 Kafka 拉取消息的線程模型,再來回顧一下這裏】。

代碼@5:經過調用NetworkClient 的 poll 方法發起消息拉取操做(觸發網絡讀寫)。

代碼@6:更新本次拉取的時間。

代碼@7:檢查是須要重平衡。

代碼@8:將從 broker 讀取到的數據返回(即封裝成消息)。

從上面消息拉取流程來看,有幾個比較重要的方法,例如 Fetcher 類相關的方法,NetworkClient 的 poll 方法,那咱們接下來來重點探討。

咱們先用一張流程圖總結一下消息拉取的全過程: 在這裏插入圖片描述

接下來咱們將重點看一下 KafkaConsumer 的 pollForFetches 詳細過程,也就是須要詳細探究 Fetcher 類的實現細節。

二、Fetcher 類詳解

Fetcher 封裝消息拉取的方法,能夠當作是消息拉取的門面類。

2.1 類圖

在這裏插入圖片描述

咱們首先一一介紹一下 Fetcher 的核心屬性與核心方法。

  • ConsumerNetworkClient client 消費端網絡客戶端,Kafka 負責網絡通信實現類。
  • int minBytes 一次消息拉取須要拉取的最小字節數,若是不組,會阻塞,默認值爲1字節,若是增大這個值會增大吞吐,但會增長延遲,能夠通參數 fetch.min.bytes 改變其默認值。
  • int maxBytes 一次消息拉取容許拉取的最大字節數,但這不是絕對的,若是一個分區的第一批記錄超過了該值,也會返回。默認爲50M,可經過參數 fetch.max.bytes 改變其默認值。同時不能超過 broker的配置參數(message.max.bytes) 和 主題級別的配置(max.message.bytes)。
  • int maxWaitMs 在 broker 若是符合拉取條件的數據小於 minBytes 時阻塞的時間,默認爲 500ms ,可通屬性 fetch.max.wait.ms 進行定製。
  • int fetchSize 每個分區返回的最大消息字節數,若是分區中的第一批消息大於 fetchSize 也會返回。
  • long retryBackoffMs 失敗重試後須要阻塞的時間,默認爲 100 ms,可經過參數 retry.backoff.ms 定製。
  • long requestTimeoutMs 客戶端向 broker 發送請求最大的超時時間,默認爲 30s,能夠經過 request.timeout.ms 參數定製。
  • int maxPollRecords 單次拉取返回的最大記錄數,默認值 500,可經過參數 max.poll.records 進行定製。
  • boolean checkCrcs 是否檢查消息的 crcs 校驗和,默認爲 true,可經過參數 check.crcs 進行定製。
  • Metadata metadata 元數據。
  • FetchManagerMetrics sensors 消息拉取的統計服務類。
  • SubscriptionState subscriptions 訂閱信息狀態。
  • ConcurrentLinkedQueue< CompletedFetch> completedFetches 已完成的 Fetch 的請求結果,待消費端從中取出數據。
  • Deserializer< K> keyDeserializer key 的反序列化器。
  • Deserializer< V> valueDeserializer value 的飯序列化器。
  • IsolationLevel isolationLevel Kafka的隔離級別(與事務消息相關),後續在研究其事務相關時再進行探討。
  • Map<integer, fetchsessionhandler> sessionHandlers 拉取會話監聽器。

接下來咱們將按照消息流程,一塊兒來看一下 Fetcher 的核心方法。

2.2 Fetcher 核心方法

2.2.1 Fetcher#fetchedRecords

Fetcher#fetchedRecords

public Map<topicpartition, list<consumerrecord<k, v>&gt;&gt; fetchedRecords() {
    Map<topicpartition, list<consumerrecord<k, v>&gt;&gt; fetched = new HashMap&lt;&gt;();   // @1
    int recordsRemaining = maxPollRecords;                                                              
    try {
        while (recordsRemaining &gt; 0) {                                                                                  // @2
            if (nextInLineRecords == null || nextInLineRecords.isFetched) {                           // @3
                CompletedFetch completedFetch = completedFetches.peek();
                if (completedFetch == null) break;
                try {
                    nextInLineRecords = parseCompletedFetch(completedFetch);
                } catch (Exception e) {
                    FetchResponse.PartitionData partition = completedFetch.partitionData;
                    if (fetched.isEmpty() &amp;&amp; (partition.records == null || partition.records.sizeInBytes() == 0)) {
                        completedFetches.poll();
                    }
                    throw e;
                }
                completedFetches.poll();
             } else {                                                                                                                         // @4
                List<consumerrecord<k, v>&gt; records = fetchRecords(nextInLineRecords, recordsRemaining);
                TopicPartition partition = nextInLineRecords.partition;
                if (!records.isEmpty()) {
                    List<consumerrecord<k, v>&gt; currentRecords = fetched.get(partition);
                    if (currentRecords == null) {
                        fetched.put(partition, records);
                    } else {
                        List<consumerrecord<k, v>&gt; newRecords = new ArrayList&lt;&gt;(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        fetched.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }
    } catch (KafkaException e) {
        if (fetched.isEmpty())
            throw e;
    }
    return fetched;
}

代碼@1:首先先解釋兩個局部變量的含義:

  • Map<topicpartition, list<consumerrecord<k, v>>> fetched 按分區存放已拉取的消息,返回給客戶端進行處理。
  • recordsRemaining:剩餘可拉取的消息條數。

代碼@2:循環去取已經完成了 Fetch 請求的消息,該 while 循環有兩個跳出條件:

  • 若是拉取的消息已經達到一次拉取的最大消息條數,則跳出循環。
  • 緩存中全部拉取結果已處理。

代碼@三、@4 主要完成從緩存中解析數據的兩個步驟,初次運行的時候,會進入分支@3,而後從 調用 parseCompletedFetch 解析成 PartitionRecords 對象,而後代碼@4的職責就是從解析 PartitionRecords ,將消息封裝成 ConsumerRecord,返回給消費端線程處理。

代碼@3的實現要點以下:

  • 首先從 completedFetches (Fetch請求的返回結果) 列表中獲取一個 Fetcher 請求,主要使用的 Queue 的 peek()方法,並不會從該隊列中移除該元素。
  • 而後調用 parseCompletedFetch 對處理結果進行解析返回 PartitionRecords。
  • 處理成功後,調用 Queue 的方法將已處理過的 Fetcher結果移除。

從上面可知,上述方法的核心方法是:parseCompletedFetch。

代碼@4的實現要點無非就是調用 fetchRecords 方法,按分區組裝成 Map<topicpartition, list<consumerrecord<k, v>>>,供消費者處理,例如供業務處理。

接下來將重點探討上述兩個方法的實現細節。

2.2.1.1 Fetcher#parseCompletedFetch

在嘗試探討該方法以前,咱們首先對其入參進行一個梳理,特別是先認識其主要數據結構。

一、CompletedFetch 相關類圖 在這裏插入圖片描述

從上圖能夠看出,CompleteFetch 核心屬性主要以下:

  • TopicPartition partition 分區信息,返回結果都是以分區爲緯度。
  • long fetchedOffset 本次拉取的開始偏移量。
  • FetchResponse.PartitionData partitionData 返回的分區數據。
  • FetchResponseMetricAgregator metricAggregator 統計指標相關。
  • short responseVersion broker 端的版本號。

分區的數據是使用 PartitionData 來進行封裝的。咱們也來簡單的瞭解一下其內部數據結果。

  • Errors error 分區拉取的相應結果,Errors.NONE 表示請求成功。
  • long highWatermark broker 端關於該分區的高水位線,即小於該偏移量的消息對於消費端是可見的。
  • long lastStableOffset 分區中小於該偏移量的消息的事務狀態已獲得確認,要麼是已提交,要麼是已回滾,與事務相關,後面會專門探討。
  • List< AbortedTransaction> abortedTransactions 已拒絕的事物。
  • T records 分區數據,是 BaseRecords 的子類。

二、parseCompletedFetch 詳解

private PartitionRecords parseCompletedFetch(CompletedFetch completedFetch) {
    TopicPartition tp = completedFetch.partition;
    FetchResponse.PartitionData<records> partition = completedFetch.partitionData;
    long fetchOffset = completedFetch.fetchedOffset;
    PartitionRecords partitionRecords = null;
    Errors error = partition.error;
    try {
        if (!subscriptions.isFetchable(tp)) {       // @1
            log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
        } else if (error == Errors.NONE) {         // @2
            Long position = subscriptions.position(tp);
            if (position == null || position != fetchOffset) {    // @21
                log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                            "the expected offset {}", tp, fetchOffset, position);
                return null;
            }
            log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
                        partition.records.sizeInBytes(), tp, position);
            Iterator<!--? extends RecordBatch--> batches = partition.records.batches().iterator();   // @22
            partitionRecords = new PartitionRecords(tp, completedFetch, batches);

            if (!batches.hasNext() &amp;&amp; partition.records.sizeInBytes() &gt; 0) {   // @23
                if (completedFetch.responseVersion &lt; 3) {
                    Map<topicpartition, long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
                    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " +
                                recordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize +
                                " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or " +
                                "newer to avoid this issue. Alternately, increase the fetch size on the client (using " +
                                ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")",
                                recordTooLargePartitions);
                } else {
                    // This should not happen with brokers that support FetchRequest/Response V3 or higher (i.e. KIP-74)
                    throw new KafkaException("Failed to make progress reading messages at " + tp + "=" +
                            fetchOffset + ". Received a non-empty fetch response from the server, but no " +
                            "complete records were found.");
               }
            }

            if (partition.highWatermark &gt;= 0) {   // @24
                log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
                subscriptions.updateHighWatermark(tp, partition.highWatermark);
            }

            if (partition.logStartOffset &gt;= 0) {    // @25
                log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
            }

            if (partition.lastStableOffset &gt;= 0) { // @26
                log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
            }
        } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                       error == Errors.REPLICA_NOT_AVAILABLE ||
                       error == Errors.KAFKA_STORAGE_ERROR) {                       // @3
                log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
            this.metadata.requestUpdate();
        } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {          // @4
            log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
            this.metadata.requestUpdate();
        } else if (error == Errors.OFFSET_OUT_OF_RANGE) {                        // @5
            if (fetchOffset != subscriptions.position(tp)) {
                log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
                            "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
            } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
                    subscriptions.requestOffsetReset(tp);
            } else {
                throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
            }
        } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {             // @6
            log.warn("Not authorized to read from topic {}.", tp.topic());
                throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
        } else if (error == Errors.UNKNOWN_SERVER_ERROR) {                
            log.warn("Unknown error fetching data for topic-partition {}", tp);
        } else {
            throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
        }
    } finally {   // @7
        if (partitionRecords == null)
            completedFetch.metricAggregator.record(tp, 0, 0);

        if (error != Errors.NONE)
           // we move the partition to the end if there was an error. This way, it's more likely that partitions for
           // the same topic can remain together (allowing for more efficient serialization).
           subscriptions.movePartitionToEnd(tp);
    }
    return partitionRecords;
}

上面的代碼雖然比較長,其實總體仍是比較簡單,只是須要針對各類異常處理,打印對應的日誌,接下來詳細介紹該方法的實現關鍵點。

代碼@1:判斷該分區是否可拉取,若是不可拉取,則忽略這批拉取的消息,判斷是可拉取的要點以下:

  • 當前消費者負載的隊列包含該分區。
  • 當前消費者針對該隊列並無被用戶設置爲暫停(消費端限流)。
  • 當前消費者針對該隊列有有效的拉取偏移量。

代碼@2:該分支是處理正常返回的相關邏輯。其關鍵點以下:

  • 若是當前針對該隊列的消費位移 與 發起 fetch 請求時的 偏移量不一致,則認爲本次拉取非法,直接返回 null ,如代碼@21。
  • 從返回結構中獲取本次拉取的數據,使用數據迭代器,其基本數據單位爲 RecordBatch,即一個發送批次,如代碼@22。
  • 若是返回結果中沒有包含至少一個批次的消息,可是 sizeInBytes 又大於0,則直接拋出錯誤,根據服務端的版本,其錯誤信息有所不一樣,但主要是建議咱們如何處理,若是 broker 的版本低於 0.10.1.0,則建議升級 broker 版本,或增大客戶端的 fetch size,這種錯誤是由於一個批次的消息已經超過了本次拉取容許的最大拉取消息大小,如代碼@23。
  • 依次更新消費者本地關於該隊列的訂閱緩存信息的 highWatermark、logStartOffset、lastStableOffset。

從代碼@3到@8 是多種異常信息的處理。 代碼@3:若是出現以下3種錯誤碼,則使用 debug 打印錯誤日誌,而且向服務端請求元數據並更新本地緩存。

  • NOT_LEADER_FOR_PARTITION 請求的節點上不是該分區的 Leader 分區。
  • REPLICA_NOT_AVAILABLE 該分區副本之間沒法複製
  • KAFKA_STORAGE_ERROR 存儲異常。

Kafka 認爲上述錯誤是可恢復的,並且對消費不會形成太大影響,故只是用 debug 打印日誌,而後更新本地緩存便可。

代碼@4:若是出現 UNKNOWN_TOPIC_OR_PARTITION 未知主題與分區時,則使用 warn 級別輸出錯誤日誌,並更新元數據。

代碼@5:針對 OFFSET_OUT_OF_RANGE 偏移量超過範圍異常的處理邏輯,其實現關鍵點以下:

  • 若是這次拉取的開始偏移量與消費者本地緩存的偏移量不一致,則丟棄,說明該消息已過時,打印錯誤日誌。
  • 若是這次拉取的開始偏移量與消費者本地緩存的偏移量一致,說明此時的偏移量非法,若是有配置重置偏移量策略,則使用重置偏移量,不然拋出 OffsetOutOfRangeException 錯誤。

代碼@6:若是是 TOPIC_AUTHORIZATION_FAILED 沒有權限(ACL)則拋出異常。

代碼@7:若是本次拉取的結果不是NONE(成功),而且是可恢復的,將該隊列的訂閱關係移動到消費者緩存列表的末尾。若是成功,則返回拉取到的分區數據,其封裝對象爲 PartitionRecords。

接下來咱們再來看看 2.1.1 fetchedRecords 中的另一個核心方法。

2.2.1.2 fetchRecords()

在介紹該方法以前一樣先來看一下參數 PartitionRecords 的內部結構。

一、PartitionRecords 類圖 在這裏插入圖片描述

主要的核心屬性以下:

  • TopicPartition partition 分區信息。
  • CompletedFetch completedFetch Fetch請求完成結果
  • Iterator<!--? extends RecordBatch--> batches 本次 Fetch 操做獲取的結果集。
  • Set< Long> abortedProducerIds 與事物相關,後續會專門的章節詳細介紹。
  • PriorityQueue<fetchresponse.abortedtransaction> abortedTransactions 與事物相關,後續會專門的章節詳細介紹。
  • int recordsRead 已讀取的記錄條數。
  • int bytesRead 已讀取的記錄字節數。
  • RecordBatch currentBatch 當前遍歷的批次。
  • Record lastRecord 該迭代器最後一條消息。
  • long nextFetchOffset 下次待拉取的偏移量。

二、fetchRecords 詳解

private List<consumerrecord<k, v>&gt; fetchRecords(PartitionRecords partitionRecords, int maxRecords) {
    if (!subscriptions.isAssigned(partitionRecords.partition)) {   // @1
            // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
        log.debug("Not returning fetched records for partition {} since it is no longer assigned",
                    partitionRecords.partition);
    } else if (!subscriptions.isFetchable(partitionRecords.partition)) { // @2
        // this can happen when a partition is paused before fetched records are returned to the consumer's
        // poll call or if the offset is being reset
        log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable",
                    partitionRecords.partition);
    } else {
        long position = subscriptions.position(partitionRecords.partition);       // @3
        if (partitionRecords.nextFetchOffset == position) {      // @4
            List<consumerrecord<k, v>&gt; partRecords = partitionRecords.fetchRecords(maxRecords);
            long nextOffset = partitionRecords.nextFetchOffset;
            log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
                        "position to {}", position, partitionRecords.partition, nextOffset);
            subscriptions.position(partitionRecords.partition, nextOffset);

            Long partitionLag = subscriptions.partitionLag(partitionRecords.partition, isolationLevel);  
            if (partitionLag != null)
                this.sensors.recordPartitionLag(partitionRecords.partition, partitionLag);

            Long lead = subscriptions.partitionLead(partitionRecords.partition);
            if (lead != null) {
                this.sensors.recordPartitionLead(partitionRecords.partition, lead);
            }

            return partRecords;
        } else {   // @5
            // these records aren't next in line based on the last consumed position, ignore them
            // they must be from an obsolete request
            log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                        partitionRecords.partition, partitionRecords.nextFetchOffset, position);
        }
    }

    partitionRecords.drain();
    return emptyList();
}

代碼@1:從 PartitionRecords 中提取消息以前,再次判斷訂閱消息中是否包含當前分區,若是不包含,則使用 debug 打印日誌,頗有多是發生了重平衡。

代碼@2:是否容許拉取,若是用戶主動暫停消費,則忽略本次拉取的消息。備註:Kafka 消費端若是消費太快,能夠進行限流。

代碼@3:從本地消費者緩存中獲取該隊列已消費的偏移量,在發送拉取消息時,就是從該偏移量開始拉取的。

代碼@4:若是本地緩存已消費偏移量與從服務端拉回的起始偏移量相等的話,則認爲是一個有效拉取,不然則認爲是一個過時的拉取,該批消息已被消費,見代碼@5。若是是一個有效請求,則使用 sensors 收集統計信息,並返回拉取到的消息, 返回結果被封裝在 List<consumerrecord<k, v>> 。

2.2.2 sendFetches

「發送」 fetch 請求,注意這裏並不會觸發網絡操做,而是組裝拉取請求,將其放入網絡緩存區。

Fetcher#sendFetches

public synchronized int sendFetches() {
    Map<node, fetchsessionhandler.fetchrequestdata> fetchRequestMap = prepareFetchRequests();  // @1
    for (Map.Entry<node, fetchsessionhandler.fetchrequestdata> entry : fetchRequestMap.entrySet()) {   // @2
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        final FetchRequest.Builder request = FetchRequest.Builder
            .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
            .isolationLevel(isolationLevel)
            .setMaxBytes(this.maxBytes)
            .metadata(data.metadata())
            .toForget(data.toForget());   // @3
 
        client.send(fetchTarget, request)    // @4
            .addListener(new RequestFutureListener<clientresponse>() {
                @Override
                public void onSuccess(ClientResponse resp) {  // @5
                    synchronized (Fetcher.this) {
                        @SuppressWarnings("unchecked")
                        FetchResponse<records> response = (FetchResponse<records>) resp.responseBody();
                        FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                        if (handler == null) {
                            log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
                                fetchTarget.id());
                            return;
                        }
                        if (!handler.handleResponse(response)) {
                            return;
                        }

                        Set<topicpartition> partitions = new HashSet&lt;&gt;(response.responseData().keySet());
                        FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
                        for (Map.Entry<topicpartition, fetchresponse.partitiondata<records>&gt; entry : 
                                 response.responseData().entrySet()) {
                            TopicPartition partition = entry.getKey();
                            long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
                            FetchResponse.PartitionData<records> fetchData = entry.getValue();
                            completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                resp.requestHeader().apiVersion()));
                            }    // @6

                            sensors.fetchLatency.record(resp.requestLatencyMs());
                        }
                  }
                  public void onFailure(RuntimeException e) {  // @7
                    synchronized (Fetcher.this) {
                        FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                        if (handler != null) {
                            handler.handleError(e);
                        }
                    }
                  }
        });
    }
    return fetchRequestMap.size();
}
~~~java
上面的方法比較長,其實現的關鍵點以下:
代碼@1:經過調用 Fetcher 的 prepareFetchRequests 方法按節點組裝拉取請求,將在後面詳細介紹。

代碼@2:遍歷上面的待發請求,進一步組裝請求。下面就是分節點發送拉取請求。

代碼@3:構建 FetchRequest 拉取請求對象。

代碼@4:調用 NetworkClient 的 send 方法將其發送到發送緩存區,本文不會詳細介紹網絡方面的實現,但下文會截圖說明拉取請求發送緩存區的一個關鍵點。

代碼@5:這裏會註冊事件監聽器,當消息從 broker 拉取到本地後觸發回調,即消息拉取請求收到返回結果後會將返回結果放入到completedFetches 中(代碼@6),這就和上文消息拉取時 Fetcher 的 fetchedRecords 方法造成閉環。
代碼@7:消息拉取一次處理。

接下來詳細介紹 prepareFetchRequests 方法。

###### 2.2.2.1 Fetcher prepareFetchRequests 方法詳解
~~~java
private Map<node, fetchsessionhandler.fetchrequestdata> prepareFetchRequests() {
    Map<node, fetchsessionhandler.builder> fetchable = new LinkedHashMap&lt;&gt;();  
    for (TopicPartition partition : fetchablePartitions()) {    // @1
        Node node = metadata.partitionInfoIfCurrent(partition).map(PartitionInfo::leader).orElse(null);  // @2
        if (node == null) {    // @3
            metadata.requestUpdate();
        } else if (client.isUnavailable(node)) {   // @4
           client.maybeThrowAuthFailure(node);
           log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
        } else if (client.hasPendingRequests(node)) {   // @5
            log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
        } else {
            // if there is a leader and no in-flight requests, issue a new fetch
            FetchSessionHandler.Builder builder = fetchable.get(node);    // @7
            if (builder == null) {
                FetchSessionHandler handler = sessionHandler(node.id());
                if (handler == null) {
                    handler = new FetchSessionHandler(logContext, node.id());
                    sessionHandlers.put(node.id(), handler);
                }
                builder = handler.newBuilder();
                fetchable.put(node, builder);
            }
            long position = this.subscriptions.position(partition);
            builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
            this.fetchSize, Optional.empty()));
            log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,
                    partition, position, node);
        }
    }
    Map<node, fetchsessionhandler.fetchrequestdata> reqs = new LinkedHashMap&lt;&gt;();  
    for (Map.Entry<node, fetchsessionhandler.builder> entry : fetchable.entrySet()) {
        reqs.put(entry.getKey(), entry.getValue().build());
    }
    return reqs;
}

代碼@1:首先經過調用 fetchablePartitions() 獲取可發起拉取任務的分區信息,下文簡單介紹一下。

代碼@2:若是該分區在客戶端本地緩存中獲取該分區的 Leader 節點信息。

代碼@3:若是其 Leader 節點信息爲空,則發起更新元數據請求,本次拉取任務將不會包含該分區。

代碼@4:若是客戶端與該分區的 Leader 鏈接爲完成,若是是由於權限的緣由則拋出ACL相關異常,不然打印日誌,本次拉取請求不會包含該分區。

代碼@5:判斷該節點是否有掛起的拉取請求,即發送緩存區中是待發送的請求,若是有,本次將不會被拉取。

代碼@6:構建拉取請求,分節點組織請求。

2.2.2.2 NetworkClient send 方法關鍵點

在這裏插入圖片描述 NetworkClient 的 send 方法只是將其放入 unsent 中。 在這裏插入圖片描述

與上文的 client.hasPendingRequests(node) 方法遙相呼應。

三、總結 上面的源碼分析有點長,也有點枯燥,咱們仍是畫一張流程圖來進行總結。 在這裏插入圖片描述 在這裏插入圖片描述

Kafka 的消息拉取流程仍是比較複雜的,後面會基於上述流程,重點進行拆解,例如消費進度提交,負載隊列重平衡等等。


做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。

</node,></node,></node,></node,></records></topicpartition,></topicpartition></records></records></clientresponse></node,></node,></consumerrecord<k,></consumerrecord<k,></consumerrecord<k,></fetchresponse.abortedtransaction></topicpartition,></records></topicpartition,></topicpartition,></consumerrecord<k,></consumerrecord<k,></consumerrecord<k,></topicpartition,></topicpartition,></integer,></topicpartition,></topicpartition,></topicpartition,></k,></k,>

相關文章
相關標籤/搜索