帶你瞭解下Kafka的客戶端緩衝池技術

最近看kafka源碼,着實被它的客戶端緩衝池技術優雅到了。忍不住要寫篇文章讚美一下(哈哈)。java

注:本文用到的源碼來自kafka2.2.2版本。數據庫

背景

當咱們應用程序調用kafka客戶端 producer發送消息的時候,在kafka客戶端內部,會把屬於同一個topic分區的消息先彙總起來,造成一個batch。真正發往kafka服務器的消息都是以batch爲單位的。以下圖所示:api

在這裏插入圖片描述

這麼作的好處顯而易見。客戶端和服務端經過網絡通訊,這樣批量發送能夠減小網絡帶來的性能開銷,提升吞吐量。緩存

這個Batch的管理就很是值得探討了。可能有人會說,這不簡單嗎?用的時候分配一個塊內存,發送完了釋放不就好了嗎。服務器

kafka是用java語言編寫的(新版本大部分都是用java實現的了),用上面的方案就是使用的時候new一個空間而後賦值給一個引用,釋放的時候把引用置爲null等JVM GC處理就能夠了。網絡

看起來彷佛也沒啥問題。可是在併發量比較高的時候就會頻繁的進行GC。咱們都知道GC的時候有個stop the world,儘管最新的GC技術這個時間已經很是短,依然有可能成爲生產環境的性能瓶頸。併發

kafka的設計者固然能考慮到這一層。下面咱們就來學習下kafka是如何對batch進行管理的。app

緩衝池技術原理解析

kafka客戶端使用了緩衝池的概念,預先分配好真實的內存塊,放在池子裏。ide

每一個batch其實都對應了緩衝池中的一個內存空間,發送完消息以後,batch再也不使用了,就把內存塊歸還給緩衝池。oop

聽起來是否是很耳熟啊?不錯,數據庫鏈接池,線程池等池化技術其實差很少都是這樣的原理。經過池化技術下降建立和銷燬帶來的開銷,提高執行效率。

在這裏插入圖片描述

代碼是最好的文檔,,下面咱們就來擼下源碼。

咱們擼代碼的步驟採用的是從上往下的原則,先帶你看看緩衝池在哪裏使用,而後再深刻到緩存池內部深刻分析。

下面的代碼作了一些刪減,值保留了跟本文相關的部分便於分析。

public class KafkaProducer<K, V> implements Producer<K, V> {

    private final Logger log;
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.producer";
    public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
    
    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
    
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
                    ...
                    
    }

當咱們調用客戶端的發送消息的時候,底層會調用doSend,而後裏面使用一個記錄累計器RecordAccumulator把消息append進來。咱們繼續往下看看,

public final class RecordAccumulator {

    private final Logger log;
    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final int lingerMs;
    private final long retryBackoffMs;
    private final int deliveryTimeoutMs;
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Map<TopicPartition, Long> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;
    private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.
    
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        buffer = free.allocate(size, maxTimeToBlock);
        
        synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addLast(batch);
        ...

RecordAccumulator其實就是管理一個batch隊列,咱們看到append方法實現實際上是調用BufferPool的free方法申請(allocate)了一塊內存空間(ByteBuffer), 而後把這個內存空空間包裝成batch添加到隊列後面。

當消息發送完成不在使用batch的時候,RecordAccumulator會調用deallocate方法歸還內存,內部實際上是調用BufferPooldeallocate方法。

public void deallocate(ProducerBatch batch) {
        incomplete.remove(batch);
        // Only deallocate the batch if it is not a split batch because split batch are allocated outside the
        // buffer pool.
        if (!batch.isSplitBatch())
            free.deallocate(batch.buffer(), batch.initialCapacity());
    }

很明顯,BufferPool就是緩衝池管理的類,也是咱們今天要討論的重點。咱們先來看看分配內存塊的方法。

public class BufferPool {

    static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time";

    private final long totalMemory;
    private final int poolableSize;
    private final ReentrantLock lock;
    private final Deque<ByteBuffer> free;
    private final Deque<Condition> waiters;
    /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
    private long nonPooledAvailableMemory;
    private final Metrics metrics;
    private final Time time;
    private final Sensor waitTime;

    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        ByteBuffer buffer = null;
        this.lock.lock();
        try {
            // check if we have a free buffer of the right size pooled
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // now check if the request is immediately satisfiable with the
            // memory on hand or if we need to block
            int freeListSize = freeSize() * this.poolableSize;
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                // we have enough unallocated or pooled memory to immediately
                // satisfy the request, but need to allocate the buffer
                freeUp(size);
                this.nonPooledAvailableMemory -= size;
            } else {
                // we are out of memory and will have to block
                int accumulated = 0;
                Condition moreMemory = this.lock.newCondition();
                try {
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    this.waiters.addLast(moreMemory);
                    // loop over and over until we have a buffer or have reserved
                    // enough memory to allocate one
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            recordWaitTime(timeNs);
                        }

                        if (waitingTimeElapsed) {
                            throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }

                        remainingTimeToBlockNs -= timeNs;

                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // just grab a buffer from the free list
                            buffer = this.free.pollFirst();
                            accumulated = size;
                        } else {
                            // we'll need to allocate memory, but we may only get
                            // part of what we need on this iteration
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            this.nonPooledAvailableMemory -= got;
                            accumulated += got;
                        }
                        ...

首先整個方法是加鎖操做的,因此支持併發分配內存。

邏輯是這樣的,當申請的內存大小等於poolableSize,則從緩存池中獲取。這個poolableSize能夠理解成是緩衝池的頁大小,做爲緩衝池分配的基本單位。從緩存池獲取其實就是從ByteBuffer隊列取出一個元素返回。

若是申請的內存不等於特定的數值,則向非緩存池申請。同時會從緩衝池中取一些內存併入到非緩衝池中。這個nonPooledAvailableMemory指的就是非緩衝池的可用內存大小。非緩衝池分配內存,其實就是調用ByteBuffer.allocat分配真實的JVM內存。

在這裏插入圖片描述

緩存池的內存通常都不多回收。而非緩存池的內存是使用後丟棄,而後等待GC回收。

繼續來看看batch釋放的代碼,

public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                this.nonPooledAvailableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

很簡單,也是分爲兩種狀況。要麼直接歸還緩衝池,要麼就是更新非緩衝池部分的能夠內存。而後通知等待隊列裏的第一個元素。

相關文章
相關標籤/搜索