kafka生產者的消息發送機制

開篇一張圖,讀者更幸福,很少說上架構圖。
kafka生產者的消息發送機制
這個架構圖咱們在前面一篇文章《kafka生產者的蓄水池機制》裏面介紹過,上一篇咱們是介紹了這個圖裏面的消息收集過程(咱們成爲「蓄水池」機制),這裏咱們就介紹它的另一部分,消息的發送機制。
kafka生產者的消息發送機制 node

Sender運行過程

全部的消息發送,都是從Sender線程開始,它是一個守護線程,因此咱們首先就須要來看一下Sender的run方法,最外層的run方式是一個主循環不斷調用具體邏輯運行方法run,咱們看下它的具體邏輯處理run方法:api

void run(long now) {
    //生產者事務管理相關處理,本章節不作具體分析,後面專門章節再作分析,你們先了解一下
        if (transactionManager != null) {
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();

                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }
    //實際的數據發送請求,並處理服務端響應
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

接下來咱們從兩個層面來看,一個是消息發送,一個是消息返回響應處理。緩存

kafka生產者的消息發送機制
消息的發送markdown

先看下sendProducerData的具體邏輯:網絡

private long sendProducerData(long now) {
    //獲取集羣信息
        Cluster cluster = metadata.fetch();

        // 獲取那些能夠發送消息的分區列表信息
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // 若是這些分區沒有對應的leader,就須要強制對metadata信息進行更新
        if (!result.unknownLeaderTopics.isEmpty()) {
            // 沒有leader的場景例如leader選舉,或者topic已失效,這些都須要將topic從新加入,發送到服務端請求更新,由於如今還須要往這些topic發送消息
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 遍歷全部獲取到的網絡節點,基於網絡鏈接狀態來檢測這些節點是否可用,若是不可用則剔除
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
      //節點鏈接狀態檢查,若是容許鏈接,則從新建立鏈接
            if (!this.client.ready(node, now)) {
          //未準備好的節點刪除
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // 獲取全部待發送的批量消息以及其對應的leader節點集合
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);

    //若是須要保證消息的強順序性,則緩存對應 topic 分區對象,防止同一時間往同一個 topic 分區發送多條處於未完成狀態的消息
        if (guaranteeMessageOrder) {
            // 將每一個batch的分區對象信息加入到mute集合,採起Set實現,重複的topicpartition信息不會被加入
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        // 獲取本地過時的消息,返回 TimeoutException,並釋放空間
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
        // 過時的batch消息處理
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }
    //更新度量信息
        sensors.updateProduceRequestMetrics(batches);

        // 設置pollTimeout,若是存在待發送的消息,則設置 pollTimeout 等於 0,這樣能夠當即發送請求,從而可以縮短剩餘消息的緩存時間,避免堆積
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
    //調用NetWorkClient將消息發送到服務端
        sendProduceRequests(batches, now);

        return pollTimeout;
}

概括起來sendProducerData的核心流程以下:架構

1.經過accumulator.ready方法獲取可發送的分區列表信息;併發

2.調用client.ready對獲取到的全部網絡節點進行連通性檢測;負載均衡

3.經過.accumulator.drain獲取全部待發送的批量消息以及其對應的leader節點集合;ide

4.在須要保障分區消息的強順序性的場景下調用accumulator.mutePartition將分區信息添加到mute集合;函數

5.調用sendProduceRequests發送生產消息請求。

下面逐個流程講解:

經過accumulator.ready方法獲取可發送的分區列表信息:

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        //可接受消息的節點集合
        Set<Node> readyNodes = new HashSet<>();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
    //記錄未找到leader副本的Topic信息集合
        Set<String> unknownLeaderTopics = new HashSet<>();

    // 是否有線程在等待 BufferPool 分配空間
        boolean exhausted = this.free.queued() > 0;
    //遍歷待發送的batch裏面的每一個分區信息,對其leader執行斷定
        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
            TopicPartition part = entry.getKey();
            Deque<ProducerBatch> deque = entry.getValue();

      // 獲取當前 topic 分區 leader 副本所在的節點
            Node leader = cluster.leaderFor(part);
            synchronized (deque) {
                if (leader == null && !deque.isEmpty()) {
                    // 該分區下的leader未知,可是存在往該分區發送的消息,須要記錄下,在後面的流程當發現有未知leader須要強制向服務端發送metadata的信息更新請求
                    unknownLeaderTopics.add(part.topic());
                } 
        //全部可發送的節點須要不能在mute集合裏面,保障消息有序性,當mute裏面還有消息未發送完成不能繼續追加發送
        else if (!readyNodes.contains(leader) && !muted.contains(part)) {
                    ProducerBatch batch = deque.peekFirst();
                    if (batch != null) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs);
            //是否處於重試操做判斷
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                        long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                        boolean full = deque.size() > 1 || batch.isFull();
                        boolean expired = waitedTimeMs >= timeToWaitMs;
            //標記當前leader是否可發送
                        boolean sendable = full // 1. 隊列中有多個 RecordBatch,或第一個 RecordBatch 已滿
            || expired // 2. 當前等待重試的時間過長
            || exhausted // 3. 有其餘線程在等待 BufferPoll 分配空間,即本地消息緩存已滿
            || closed // 4. producer 已經關閉
            || flushInProgress();// 5. 有線程正在等待 flush 操做完成
                        if (sendable && !backingOff) {
            //知足可發送狀態,而且沒有處於重試操做的狀態下,將當前leader加入可發送節點
                            readyNodes.add(leader);
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
              // 更新下次執行 ready 斷定的時間間隔
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }
    //返回檢查結果
        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

調用client.ready對獲取到的全部網絡節點進行連通性檢測:

public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);
    //connectionStates已就緒,直接返回可鏈接
        if (isReady(node, now))
            return true;

    //鏈接狀態顯示可鏈接
        if (connectionStates.canConnect(node.idString(), now))
            // 則調用selector初始化該鏈接
            initiateConnect(node, now);

        return false;
}

經過.accumulator.drain獲取全部待發送的批量消息以及其對應的leader節點集合:

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster,
                                                   Set<Node> nodes,
                                                   int maxSize,
                                                   long now) {
        if (nodes.isEmpty())
            return Collections.emptyMap();

        //返回的nodeid對應的發送消息batch信息
        Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    //遍歷每個可連通的node
        for (Node node : nodes) {
            int size = 0;
            List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
            List<ProducerBatch> ready = new ArrayList<>();
            /* drainIndex 用於記錄上次發送中止的位置,本次繼續從當前位置開始發送,
      * 若是每次都是從 0 位置開始,可能會致使排在後面的分區餓死,這是一個簡單的負載均衡策略
      */
            int start = drainIndex = drainIndex % parts.size();
            do {
                PartitionInfo part = parts.get(drainIndex);
                TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                //若是是須要保障消息的強順序性,則不能將消息添加進目標分區,不然會致使消息亂序
                if (!muted.contains(tp)) {
          // 獲取當前分區對應的 RecordBatch 集合
                    Deque<ProducerBatch> deque = getDeque(tp);
                    if (deque != null) {
                        synchronized (deque) {
                            ProducerBatch first = deque.peekFirst();
                            if (first != null) {
                //當前第一個batch是否處於重試狀態或者已重試過
                                boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
                                // 沒有重試過,或者重試已超時
                                if (!backoff) {
                                    if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
                                         // 單次消息數據量已達到上限,結束循環,通常對應一個請求的大小,防止請求消息過大
                                        break;
                                    } 
                  //處理處於重試狀態下的消息
                  else {
                                        //省略在重試狀態下的事務處理流程
                    //遍歷每一個節點,節點的起始位置也以一個輪訓方式來遍歷,而且每一個隊列裏面的batch也都是隻取第一個,每一個隊列輪訓着取,全部這些操做都是爲了對消息發送的均衡處理,保障消息公平發送
                    ProducerBatch batch = deque.pollFirst();
                    //close表明着消息batch通道被關閉,只能讀取,沒法寫入
                                        batch.close();
                                        size += batch.records().sizeInBytes();
                                        ready.add(batch);
                                        batch.drained(now);
                                    }
                                }
                            }
                        }
                    }
                }
        //更新本次drainIndex
                this.drainIndex = (this.drainIndex + 1) % parts.size();
            } while (start != drainIndex);
            batches.put(node.id(), ready);
        }
        return batches;
}

調用accumulator.mutePartition將分區信息添加到mute集合,這個過程比較簡單就是將遍歷待發送的batch消息,若是設置了保障消息時序強一致性,那就將這個分區信息保存在mute集合之中,每次發送消息以前都會去檢查這個隊列是否包含已有的分區,若是有則本次不作發送,每發送完成以後都會調用mute集合去除所在的分區信息,以即可以放入下一個消息進行發送。

調用sendProduceRequests發送生產消息請求:

private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())
            return;

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // 遍歷全部batch消息,找到最小的版本號信息
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        // 遍歷 RecordBatch 集合,整理成 produceRecordsByPartition 和 recordsByPartition
        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // 進行消息的向下兼容轉換操做,例如分區消息的遷移,從一個高版本遷移到低版本,就須要額外從新構造MemoryRecords
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        }

    // 建立 ProduceRequest 請求構造器,produceRecordsByPartition用於構造請求器
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
    // 建立回調對象,用於處理響應,recordsByPartition用於響應回調處理
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);
    // 建立 ClientRequest 請求對象,若是 acks 不等於 0 則表示會等待服務端的響應
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    //調用NetWorkClient發送消息
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

接下來就須要瞭解一下NetWorkClient的發送流程,它的發送最終都是調用doSend函數完成:

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        //獲取目標節點id
    String nodeId = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        //省略日誌信息打印
        Send send = request.toSend(nodeId, header);
    //新建InFlightRequest,並將請求添加進去
        InFlightRequest inFlightRequest = new InFlightRequest(
                header,
                clientRequest.createdTimeMs(),
                clientRequest.destination(),
                clientRequest.callback(),
                clientRequest.expectResponse(),
                isInternalRequest,
                request,
                send,
                now);
        this.inFlightRequests.add(inFlightRequest);
    //網絡消息發送
        selector.send(inFlightRequest.send);
}

至此,咱們消息發送講解完成,接下來說解一下消息的響應拉取過程。

kafka生產者的消息發送機制

消息的響應拉取

消息的響應拉取是從NetworkClient的poll方法開始的,它的邏輯解析以下:

public List<ClientResponse> poll(long timeout, long now) {
        ensureActive();

        if (!abortedSends.isEmpty()) {
            // 當鏈接斷開,或者版本不支持,須要優先處理這些響應
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }
    //metada信息的響應處理
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
      //該poll過程處理全部的網絡鏈接、斷開鏈接,初始化新的發送以及處理過程總的發送和接收請求,接收的信息最終會放到completedReceives中
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // 處理全部的完成操做及響應
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleInitiateApiVersionRequests(updatedNow);
        handleTimedOutRequests(responses, updatedNow);
        completeResponses(responses);

        return responses;
}

響應操做的核心處理幾個函數就是handle*的幾個函數,咱們分別介紹一下:

handleCompletedSends該方法遍歷全部發送完成的對象,對於那些不但願接收響應的請求,建立本地響應隊列並添加進去:

private void handleCompletedSends(List<ClientResponse> responses, long now) {
        // 遍歷全部的發送完成的send對象
        for (Send send : this.selector.completedSends()) {
        //找出最近一次在inFlightRequests的發送請求信息
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
      //對於發送成功,可是不指望服務端響應的請求,建立本地響應隊列並將其添加進去
            if (!request.expectResponse) {
        //inFlightRequests在發送的時候添加,接收完成後去除
                this.inFlightRequests.completeLastSent(send.destination());
        // 添加到本地響應隊列中
                responses.add(request.completed(null, now));
            }
        }
    }

handleCompletedReceives該方法獲取服務端響應,並依據響應分類處理,分別是metadata、apiversion

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
      //從completedReceives中遍歷全部的接收信息,completedReceives中的信息是在上一層的selector.poll中添加進去的
        for (NetworkReceive receive : this.selector.completedReceives()) {
        //獲取返回響應的節點 ID
            String source = receive.source();
      //從 inFlightRequests 集合中獲取緩存的 request 對象
            InFlightRequest req = inFlightRequests.completeNext(source);
      //解析響應信息
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
           //省略日誌
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            if (req.isInternalRequest && body instanceof MetadataResponse)
          //處理metadata的更新響應信息
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
           // 若是是更新 API 版本的響應,則更新本地緩存的目標節點支持的 API 版本信息
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
          //添加到本地響應隊列
                responses.add(req.completed(body, now));
        }
    }

handleDisconnections該方法會最終調用 Selector#disconnected方法獲取斷開鏈接的節點 ID 集合,並更新相應節點的鏈接狀態爲 DISCONNECTED,同時會清空本地緩存的與該節點相關的數據,最終建立一個 disconnected 類型的 ClientResponse 對象添加到結果集合中。若是這一步確實發現了已斷開的鏈接,則標記須要更新本地緩存的節點元數據信息。

handleConnections該方法會調用Selector#connected方法獲取鏈接正常的節點 ID 集合,若是當前節點是第一次創建鏈接,則需獲取節點支持的 API 版本信息,方法會將當前節點的鏈接狀態設置爲CHECKING_API_VERSIONS,並將節點 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch 集合中,對於其它節點,則更新相應鏈接狀態爲 READY。

handleInitiateApiVersionRequests該方法用於處理NetworkClient#handleConnections 方法中標記的須要獲取支持的API版本信息的節點,即記錄到 NetworkClient#nodesNeedingApiVersionsFetch 集合中的節點。方法會遍歷處理集合中的節點,並在判斷目標節點容許接收請求的狀況下,構建 ApiVersionsRequest 請求以獲取目標節點支持的 API 版本信息,該請求會被包裝成 ClientRequest 對象,並在下次 Selector#poll操做時一併送出。

handleTimedOutRequests該方法會遍歷緩存在 inFlightRequests 中已經超時的相關請求對應的節點集合,針對此類節點將其視做斷開鏈接進行處理。方法會建立一個 disconnected 類型的 ClientResponse 對象添加到結果集合中,並標記須要更新本地緩存的集羣元數據信息。

最後一個是completeResponses,它的流程很簡單,觸發生產者的回調函數,通知服務端的響應信息:

private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
        //遍歷以前全部階段的handle*處理過程當中添加的response,並回調其callback方法,這樣生產者就收到服務端響應信息了
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

Note:本公衆號全部kafka系列的架構及源碼分析文章都是基於1.1.2版本,若有特殊會進行額外聲明。

推薦閱讀

kafka系列:

  1. kafka是如何作到百萬級高併發低遲延的?
  2. kafka生產者的蓄水池機制

kafka生產者的消息發送機制
kafka生產者的消息發送機制
掃碼關注咱們
互聯網架構師之路

過濾技術雜質,只爲精品呈現

若是喜歡,請關注加星喔

相關文章
相關標籤/搜索