通過前面幾篇kafka生產者專題講解,咱們還能夠找出哪些地方進一步來對它進行優化的嗎?答案是確定的,這裏咱們介紹一個kafka當前最新版本2.4.0合入的一個KIP-480,它的核心邏輯就是當存在無key的序列消息時,咱們消息發送的分區優先保持粘連,若是當前分區下的batch已經滿了或者 linger.ms延遲時間已到開始發送,就會從新啓動一個新的分區(邏輯仍是按照Round-Robin模式),咱們先把兩種模式的示意圖整理以下:
那咱們也來看下這種模式的源碼實現:算法
它的源碼實現是從Partitioner的接口開始修改的,以前的版本這個接口只有兩個方法:緩存
public interface Partitioner extends Configurable, Closeable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); }
最新的Partitioner接口添加了一個onNewBatch方法,用來在新建了一個Batch的場景下進行觸發,它的源碼以下:markdown
public interface Partitioner extends Configurable, Closeable { /** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes The serialized key to partition on( or null if no key) * @param value The value to partition on or null * @param valueBytes The serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); /** * This is called when partitioner is closed. */ public void close(); /** * Notifies the partitioner a new batch is about to be created. When using the sticky partitioner, * this method can change the chosen sticky partition for the new batch. * @param topic The topic name * @param cluster The current cluster metadata * @param prevPartition The partition previously selected for the record that triggered a new batch */ default public void onNewBatch(String topic, Cluster cluster, int prevPartition) { } }
老的分區模式,咱們在以前已經講解過,這邊主要講解下這個新的方法實現:架構
public class StickyPartitionCache { private final ConcurrentMap<String, Integer> indexCache; public StickyPartitionCache() { //用來緩存全部的分區信息 this.indexCache = new ConcurrentHashMap<>(); } public int partition(String topic, Cluster cluster) { //若是緩存能夠獲取,說明以前已經有過該topic的分區信息 Integer part = indexCache.get(topic); if (part == null) { //不然觸發獲取新的分區算法 return nextPartition(topic, cluster, -1); } return part; } public int nextPartition(String topic, Cluster cluster, int prevPartition) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); Integer oldPart = indexCache.get(topic); Integer newPart = oldPart; // 因爲該方法有兩種觸發場景,一種是該topic下沒有任何分區緩存信息(例如新增topic);另一種就是新的Batch產生了,須要觸發新的分區,因此它的進入條件也是這兩種模式 if (oldPart == null || oldPart == prevPartition) { //接下來全部分區邏輯採起的是和老的Roud-Robin模式一致,邏輯不一樣的地方是在於這裏都是無Key的場景 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() < 1) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = random % partitions.size(); } else if (availablePartitions.size() == 1) { newPart = availablePartitions.get(0).partition(); } else { while (newPart == null || newPart.equals(oldPart)) { Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt()); newPart = availablePartitions.get(random % availablePartitions.size()).partition(); } } // 當時新增topic分區場景,那就直接添加,不然就是更換分區場景,將新的分區替換老的分區 if (oldPart == null) { indexCache.putIfAbsent(topic, newPart); } else { indexCache.replace(topic, prevPartition, newPart); } return indexCache.get(topic); } return indexCache.get(topic); } }
瞭解完新的分區模式邏輯以後,咱們會有一個疑問,那是在何時觸發的新分區邏輯呢?是在KafkaProducer的doSend方法裏面有以下一段邏輯:併發
//嘗試向以前的分區裏面append消息 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true); //因爲須要新建立Batch append沒有成功 if (result.abortForNewBatch) { int prevPartition = partition; //觸發新的分區 partitioner.onNewBatch(record.topic(), cluster, prevPartition); //再次獲取新的分區值 partition = partition(record, serializedKey, serializedValue, cluster); //封裝TopicPartition tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); //再次append消息 result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); }
這種模式一個最大的優點在於能夠最大限度的保障每一個batch的消息足夠多,而且不至於會有過多的空batch提早申請,由於默認分區模式下,一組序列消息老是會被分散到各個分區中,會致使每一個batch的消息不夠大,最終會致使客戶端請求頻次過多,而Sticky的模式能夠下降請求頻次,提高總體發送遲延。以下兩個圖示官方壓測時延對比:
Note:本公衆號全部kafka系列的架構及源碼分析文章都是基於1.1.2版本,若有特殊會進行額外聲明。app
kafka系列:dom
掃碼關注咱們
互聯網架構師之路ide
過濾技術雜質,只爲精品呈現高併發
若是喜歡,請關注加星喔源碼分析