Kafka 源碼解析之 Producer 單 Partition 順序性實現及配置說明(五)

歡迎你們關注 github.com/hsfxuebao/j… ,但願對你們有所幫助,要是以爲能夠的話麻煩給點一下Star哈html

今天把 Kafka Producer 最後一部分給講述一下,Producer 大部份內容都已經在前面幾篇文章介紹過了,這裏簡單作個收尾,但並非對前面的總結,本文從兩塊來說述:RecordAccumulator 類的實現、Kafka Producer 如何保證其順序性以及 Kafka Producer 的配置說明,每一個 Producer 線程都會有一個 RecordAccumulator 對象,它負責緩存要發送 RecordBatch、記錄發送的狀態而且進行相應的處理,這裏會詳細講述 Kafka Producer 如何保證單 Partition 的有序性。最後,簡單介紹一下 Producer 的參數配置說明,只有正確地理解 Producer 相關的配置參數,才能更好地使用 Producer,發揮其相應的做用。java

RecordAccumulator

這裏再看一下 RecordAccumulator 的數據結構,以下圖所示,每一個 topic-partition 都有一個對應的 deque,deque 中存儲的是 RecordBatch,它是發送的基本單位,只有這個 topic-partition 的 RecordBatch 達到大小或時間要求才會觸發發送操做(但並非只有達到這兩個條件之一纔會被髮送,這點要理解清楚)。node

RecordAccumulator 模型RecordAccumulator 模型git

再看一下 RecordAccumulator 類的主要方法介紹,以下圖所示。github

RecordAccumulator 主要方法及其說明RecordAccumulator 主要方法及其說明apache

這張圖基本上涵蓋了 RecordAccumulator 的主要方法,下面會選擇其中幾個方法詳細講述,會圍繞着 Kafka Producer 如何實現單 Partition 順序性這個主題來說述。緩存

mutePartition() 與 unmutePartition()

先看下 mutePartition()unmutePartition() 這兩個方法,它們是保證有序性關鍵之一,其主要作用就是將指定的 topic-partition 從 muted 集合中加入或刪除,後面會看到它們的做用。markdown

private final Set<TopicPartition> muted;

public void mutePartition(TopicPartition tp) {
    muted.add(tp);
}

public void unmutePartition(TopicPartition tp) {
    muted.remove(tp);
}
複製代碼

這裏先說一下這兩個方法調用的條件,這樣的話,下面在介紹其餘方法時纔會更容易理解:數據結構

  • mutePartition():若是要求保證順序性,那麼這個 tp 對應的 RecordBatch 若是要開始發送,就將這個 tp 加入到 muted 集合中;
  • unmutePartition():若是 tp 對應的 RecordBatch 發送完成,tp 將會從 muted 集合中移除。

也就是說,muted 是用來記錄這個 tp 是否有還有未完成的 RecordBatch。less

ready()

ready() 是在 Sender 線程中調用的,其做用選擇那些能夠發送的 node,也就是說,若是這個 tp 對應的 batch 能夠發送(達到時間或大小要求),就把 tp 對應的 leader 選出來。

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
        TopicPartition part = entry.getKey();
        Deque<RecordBatch> deque = entry.getValue();

        Node leader = cluster.leaderFor(part);
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !muted.contains(part)) {//note: part 若是 mute 就不會遍歷
                RecordBatch batch = deque.peekFirst();
                if (batch != null) {
                    boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
                    //note: 是不是在重試
                    long waitedTimeMs = nowMs - batch.lastAttemptMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                    boolean full = deque.size() > 1 || batch.isFull(); //note: batch 滿了
                    boolean expired = waitedTimeMs >= timeToWaitMs; //note: batch 超時
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {
                        readyNodes.add(leader);// note: 將能夠發送的 leader 添加到集合中
                    } else {
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                    }
                }
            }
        }
    }

    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
複製代碼

能夠看到這一行 (!readyNodes.contains(leader) && !muted.contains(part)),若是 muted 集合包含這個 tp,那麼在遍歷時將不會處理它對應的 deque,也就是說,若是一個 tp 加入了 muted 集合中,即便它對應的 RecordBatch 能夠發送了,也不會觸發引發其對應的 leader 被選擇出來。

drain()

drain() 是用來遍歷可發送請求的 node,而後再遍歷在這個 node 上全部 tp,若是 tp 對應的 deque 有數據,將會被選擇出來直到超過一個請求的最大長度(max.request.size)爲止,也就說說即便 RecordBatch 沒有達到條件,但爲了保證每一個 request 儘快多地發送數據提升發送效率,這個 RecordBatch 依然會被提早選出來並進行發送。

//note: 返回該 node 對應的能夠發送的 RecordBatch 的 batches,並從 queue 中移除(最大的大小爲maxSize,超過的話,下次再發送)
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
                                             Set<Node> nodes,
                                             int maxSize,
                                             long now) {
    if (nodes.isEmpty())
        return Collections.emptyMap();

    Map<Integer, List<RecordBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        int size = 0;
        List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
        List<RecordBatch> ready = new ArrayList<>();
        /* to make starvation less likely this loop doesn't start at 0 */
        int start = drainIndex = drainIndex % parts.size();
        do {
            PartitionInfo part = parts.get(drainIndex);
            TopicPartition tp = new TopicPartition(part.topic(), part.partition());
            // Only proceed if the partition has no in-flight batches.
            if (!muted.contains(tp)) {//note: 被 mute 的 tp 依然不會被遍歷
                Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
                if (deque != null) {
                    synchronized (deque) {
                        RecordBatch first = deque.peekFirst();
                        if (first != null) {
                            boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
                            // Only drain the batch if it is not during backoff period.
                            if (!backoff) {
                                if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
                                    // there is a rare case that a single batch size is larger than the request size due
                                    // to compression; in this case we will still eventually send this batch in a single
                                    // request
                                    break;
                                } else {
                                    RecordBatch batch = deque.pollFirst();
                                    batch.close();
                                    size += batch.sizeInBytes();
                                    ready.add(batch);
                                    batch.drainedMs = now;
                                }
                            }
                        }
                    }
                }
            }
            this.drainIndex = (this.drainIndex + 1) % parts.size();
        } while (start != drainIndex);
        batches.put(node.id(), ready);
    }
    return batches;
}
複製代碼

在遍歷 node 的全部 tp 時,能夠看到是有條件的 —— !muted.contains(tp),若是這個 tp 被添加到 muted 集合中,那麼它將不會被遍歷,也就不會做爲 request 一部分被髮送出去,這也就保證了 tp 若是還有未完成的 RecordBatch,那麼其對應 deque 中其餘 RecordBatch 即便達到條件也不會被髮送,就保證了 tp 在任什麼時候刻只有一個 RecordBatch 在發送。

順序性如何保證?

是否保證順序性,仍是在 Sender 線程中實現的,mutePartition()unmutePartition() 也都是在 Sender 中調用的,這裏看一下 KafkaProducer 是如何初始化一個 Sender 對象的。

// from KafkaProducer
this.sender = new Sender(client,
                         this.metadata,
                         his.accumulator,
                         config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                         config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                         (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                         config.getInt(ProducerConfig.RETRIES_CONFIG),
                         this.metrics,
                         Time.SYSTEM,
                         this.requestTimeoutMs);//NOTE: Sender 實例,發送請求的後臺線程

// from Sender
public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              int requestTimeout) {
        this.client = client;
        this.accumulator = accumulator;
        this.metadata = metadata;
        this.guaranteeMessageOrder = guaranteeMessageOrder;
        this.maxRequestSize = maxRequestSize;
        this.running = true; //note: 默認爲 true
        this.acks = acks;
        this.retries = retries;
        this.time = time;
        this.sensors = new SenderMetrics(metrics);
        this.requestTimeout = requestTimeout;
}
複製代碼

對於上述過程能夠這樣進行解讀

this.guaranteeMessageOrder = (config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1)

若是 KafkaProducer 的 max.in.flight.requests.per.connection 設置爲1,那麼就能夠保證其順序性,不然的話,就不保證順序性,從下面這段代碼也能夠看出。

//from Sender
//note: max.in.flight.requests.per.connection 設置爲1時會保證
if (guaranteeMessageOrder) {
    // Mute all the partitions draine
    for (List<RecordBatch> batchList : batches.values()) {
         for (RecordBatch batch : batchList)
             this.accumulator.mutePartition(batch.topicPartition);
    }
}
複製代碼

也就是說,若是要保證單 Partition 的順序性,須要在 Producer 中配置 max.in.flight.requests.per.connection=1,而其實現機制則是在 RecordAccumulator 中實現的。

Producer Configs

這裏是關於 Kafka Producer 一些配置的說明,內容來自官方文檔Producer Configs以及本身的一些我的理解,這裏以官方文檔保持一致,按其重要性分爲三個級別進行講述(涉及到權限方面的參數,這裏先不介紹)。

high importance

medium importance

下面的這些參數雖然被描述爲 medium,但實際上對 Producer 的吞吐量等影響也一樣很大,在實踐中跟 high 參數的重要性基本同樣。

low importance

至此,Kafka Producer 部分的源碼分析已經結束,從下週開始將開始對 Kafka Consumer 部分進行分析。對於不一樣的場景,合理配置相應的 Kafka Producer 參數。

kafka源碼註釋分析

轉自:Kafka 源碼分析系列

相關文章
相關標籤/搜索