kafka生產者分區優化

通過前面幾篇kafka生產者專題講解,咱們還能夠找出哪些地方進一步來對它進行優化的嗎?答案是確定的,這裏咱們介紹一個kafka當前最新版本2.4.0合入的一個KIP-480,它的核心邏輯就是當存在無key的序列消息時,咱們消息發送的分區優先保持粘連,若是當前分區下的batch已經滿了或者 linger.ms延遲時間已到開始發送,就會從新啓動一個新的分區(邏輯仍是按照Round-Robin模式),咱們先把兩種模式的示意圖整理以下:
kafka生產者分區優化
那咱們也來看下這種模式的源碼實現:算法

它的源碼實現是從Partitioner的接口開始修改的,以前的版本這個接口只有兩個方法:緩存

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

最新的Partitioner接口添加了一個onNewBatch方法,用來在新建了一個Batch的場景下進行觸發,它的源碼以下:markdown

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

    /**
     * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
     * this method can change the chosen sticky partition for the new batch. 
     * @param topic The topic name
     * @param cluster The current cluster metadata
     * @param prevPartition The partition previously selected for the record that triggered a new batch
     */
    default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

老的分區模式,咱們在以前已經講解過,這邊主要講解下這個新的方法實現:架構

public class StickyPartitionCache {
    private final ConcurrentMap<String, Integer> indexCache;
    public StickyPartitionCache() {
      //用來緩存全部的分區信息
        this.indexCache = new ConcurrentHashMap<>();
    }

    public int partition(String topic, Cluster cluster) {
      //若是緩存能夠獲取,說明以前已經有過該topic的分區信息
        Integer part = indexCache.get(topic);
        if (part == null) {
    //不然觸發獲取新的分區算法
            return nextPartition(topic, cluster, -1);
        }
        return part;
    }

    public int nextPartition(String topic, Cluster cluster, int prevPartition) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        Integer oldPart = indexCache.get(topic);
        Integer newPart = oldPart;
        // 因爲該方法有兩種觸發場景,一種是該topic下沒有任何分區緩存信息(例如新增topic);另一種就是新的Batch產生了,須要觸發新的分區,因此它的進入條件也是這兩種模式
        if (oldPart == null || oldPart == prevPartition) {
        //接下來全部分區邏輯採起的是和老的Roud-Robin模式一致,邏輯不一樣的地方是在於這裏都是無Key的場景
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() < 1) {
                Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                newPart = random % partitions.size();
            } else if (availablePartitions.size() == 1) {
                newPart = availablePartitions.get(0).partition();
            } else {
                while (newPart == null || newPart.equals(oldPart)) {
                    Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                    newPart = availablePartitions.get(random % availablePartitions.size()).partition();
                }
            }
            // 當時新增topic分區場景,那就直接添加,不然就是更換分區場景,將新的分區替換老的分區
            if (oldPart == null) {
                indexCache.putIfAbsent(topic, newPart);
            } else {
                indexCache.replace(topic, prevPartition, newPart);
            }
            return indexCache.get(topic);
        }
        return indexCache.get(topic);
    }
}

瞭解完新的分區模式邏輯以後,咱們會有一個疑問,那是在何時觸發的新分區邏輯呢?是在KafkaProducer的doSend方法裏面有以下一段邏輯:併發

//嘗試向以前的分區裏面append消息
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, true);
//因爲須要新建立Batch append沒有成功
if (result.abortForNewBatch) {
                int prevPartition = partition;
        //觸發新的分區
                partitioner.onNewBatch(record.topic(), cluster, prevPartition);
        //再次獲取新的分區值
                partition = partition(record, serializedKey, serializedValue, cluster);
        //封裝TopicPartition
                tp = new TopicPartition(record.topic(), partition);
                if (log.isTraceEnabled()) {
                    log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                }
                // producer callback will make sure to call both 'callback' and interceptor callback
                interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
          //再次append消息
                result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs, false);
            }

這種模式一個最大的優點在於能夠最大限度的保障每一個batch的消息足夠多,而且不至於會有過多的空batch提早申請,由於默認分區模式下,一組序列消息老是會被分散到各個分區中,會致使每一個batch的消息不夠大,最終會致使客戶端請求頻次過多,而Sticky的模式能夠下降請求頻次,提高總體發送遲延。以下兩個圖示官方壓測時延對比:
kafka生產者分區優化
kafka生產者分區優化
Note:本公衆號全部kafka系列的架構及源碼分析文章都是基於1.1.2版本,若有特殊會進行額外聲明。app

推薦閱讀

kafka系列:dom

  1. kafka是如何作到百萬級高併發低遲延的?
  2. kafka生產者的蓄水池機制
  3. kafka生產者的消息發送機制

kafka生產者分區優化
kafka生產者分區優化
掃碼關注咱們
互聯網架構師之路ide

過濾技術雜質,只爲精品呈現高併發

若是喜歡,請關注加星喔源碼分析

相關文章
相關標籤/搜索