producer內存管理分析

1 概述

kafka producer調用RecordAccumulator#append來將消息存到本地內存。消息以TopicPartition爲key分組存放,每一個TopicPartition對應一個Deque ;RcordBatch的消息實際存儲在MemoryRecords中;MemoryRecords有Compressor和ByteBuffer兩個主要的屬性,消息就是存儲在ByteBuffer中,Compressor用來將消息寫進到ByteBuffer中。消息在生產內存中的模型大體以下: java

producer內存模型

2 RecordAccumulator#append

producer調用send方法的時候,調用RecordAccumulator#append將消息存放到內存中。這裏須要注意的是,append獲取了兩次鎖,這樣作是爲了減小鎖的範圍。緩存

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {

        appendsInProgress.incrementAndGet();
        try {
            Deque<RecordBatch> dq = getOrCreateDeque(tp); // 獲取tp對應的Deque<RecordBatch>

            synchronized (dq) { // 關鍵, 獲取Deque<RecordBatch>的鎖才操做
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
            Deque<RecordBatch> dq = getOrCreateDeque(tp); // 獲取tp對應的Deque<RecordBatch>
                RecordBatch last = dq.peekLast(); // 往最後一個RecordBatch添加
                if (last != null) { // 嘗試添加,後面會詳細講
                    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                    if (future != null) //添加成功就返回了
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                }
            }

            // 沒有添加成功,說明最後一個RecordBatch空間不足或者last == null

            // 關鍵, 若是消息體大於batchsize,那麼會建立消息體大小的RecordBatch,即RecordBatch不必定和batchsize相等
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            
            // 從BufferPool中分配內存,後面會詳細講
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) { // 從新獲取鎖,由於allocate的時候不須要鎖dq,這裏也是儘可能減小鎖粒度的一種思想
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordBatch last = dq.peekLast();
                if (last != null) { // 可能在從新獲取鎖以前其餘線程釋放了內存,因此這裏從新獲取下
                    FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
                    if (future != null) {
                        free.deallocate(buffer);
                        return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false);
                    }
                }

                // 尚未獲取到RecordBatch則申請內存建立新的RecordBatch
                MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }

3 Compressor

上述代碼調用RecordBatch#tryAppend嘗試將消息放到RecordBatch,而RecordBatch#tryAppend又調用MemoryRecords#append。app

public long append(long offset, long timestamp, byte[] key, byte[] value) {
        if (!writable)
            throw new IllegalStateException("Memory records is not writable");

        int size = Record.recordSize(key, value);
        compressor.putLong(offset);
        compressor.putInt(size);
        long crc = compressor.putRecord(timestamp, key, value);
        compressor.recordWritten(size + Records.LOG_OVERHEAD);
        return crc;
}

這裏的關鍵是compressor,來分析下Compressor,以putInt爲例,其實是調用了DataOutputStream#writeInt方法this

public void putInt(final int value) {
        try {
            appendStream.writeInt(value); // appendStream是DataOutputStream類型
        } catch (IOException e) {
            throw new KafkaException("I/O exception when writing to the append stream, closing", e);
        }
    }

看下Compressor是如何初始化的:線程

public Compressor(ByteBuffer buffer, CompressionType type) {
        
        // ...
        // create the stream
        bufferStream = new ByteBufferOutputStream(buffer);
        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
        try {
            switch (type) {
                case NONE:
                    return new DataOutputStream(buffer); // 封裝了ByteBufferOutputStream
                case GZIP:
                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                case SNAPPY:
                    try {
                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                case LZ4:
                    try {
                        OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer);
                        return new DataOutputStream(stream);
                    } catch (Exception e) {
                        throw new KafkaException(e);
                    }
                default:
                    throw new IllegalArgumentException("Unknown compression type: " + type);
            }
        } catch (IOException e) {
            throw new KafkaException(e);
        }
    }

從上面代碼能夠看到,和ByteBuffer直接關聯的是ByteBufferOutputStream;而DataOutputStream封裝了ByteBufferOutputStream,負責處理壓縮數據,直觀上來看以下圖:code

Compressor

4 BufferPool

BufferPool用於管理producer緩存池,使用配置項buffer.memory來指定緩存池的大小,默認是32M。blog

4.1 allocate

BufferPool#allocate用於從緩存池中申請內存。BufferPool維護了一個ByteBuffer的雙端隊列free,表示空閒的ByteBuffer,只有大小爲batch.size的內存申請纔會從free中去拿去,也就是說free中維護的ByteBuffer都是batch.size大小。隊列

BufferPool幾個關鍵屬性內存

private final long totalMemory;
    private final int poolableSize; // 一塊連續內存的大小,等於batch.size
    private final ReentrantLock lock; 
    private final Deque<ByteBuffer> free; // 空閒的ByteBuffer列表,每一個ByteBuffer都是batch.size大小,只有申請的內存等於batch.size大小纔會從free中獲取
    private final Deque<Condition> waiters;
    private long availableMemory; // 還有多少內存能夠用,即buffer.memory-已用內存
    // ...
}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        this.lock.lock();
        try {
            if (size == poolableSize && !this.free.isEmpty()) // 關鍵,只有大小等於batch.size的時候纔會從free中獲取
                return this.free.pollFirst();

            int freeListSize = this.free.size() * this.poolableSize;

            // 剩餘總內存夠用,可是不能從free中獲取,則將free釋放一些,而後申請對應大小的內存
            if (this.availableMemory + freeListSize >= size) {
                freeUp(size); // 釋放
                this.availableMemory -= size;
                lock.unlock();
                return ByteBuffer.allocate(size);
            } else {
                // 關鍵,剩餘總內存不夠了,則會阻塞,直到有足夠的內存
                int accumulated = 0;
                ByteBuffer buffer = null;
                Condition moreMemory = this.lock.newCondition();
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory); // 添加到等待隊列尾
                while (accumulated < size) {
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try {
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); // 阻塞
                    } catch (InterruptedException e) {
                        this.waiters.remove(moreMemory);
                        throw e;
                    } finally {
                        long endWaitNs = time.nanoseconds();
                        timeNs = Math.max(0L, endWaitNs - startWaitNs);
                        this.waitTime.record(timeNs, time.milliseconds());
                    }

                    if (waitingTimeElapsed) {
                        this.waiters.remove(moreMemory);
                        throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                    }

                    remainingTimeToBlockNs -= timeNs;
                    // check if we can satisfy this request from the free list,
                    // otherwise allocate memory
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        // just grab a buffer from the free list
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else {
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.availableMemory);
                        this.availableMemory -= got;
                        accumulated += got;
                    }
                }

                Condition removed = this.waiters.removeFirst(); // 從頭部獲取,後面詳細講
                if (removed != moreMemory)
                    throw new IllegalStateException("Wrong condition: this shouldn't happen.");

                // 若是分配後還有剩餘空間,即喚醒後續的等待線程
                if (this.availableMemory > 0 || !this.free.isEmpty()) {
                    if (!this.waiters.isEmpty())
                        this.waiters.peekFirst().signal(); // 喚醒頭部
                }

                // unlock and return the buffer
                lock.unlock();
                if (buffer == null)
                    return ByteBuffer.allocate(size);
                else
                    return buffer;
            }
        } finally {
            if (lock.isHeldByCurrentThread())
                lock.unlock();
        }
    }

對於allocate有幾點須要注意ci

  1. 只有大小爲batch.size的內存申請纔會從free中獲取,因此消息大小盡可能不要大於batch.size,這樣才能充分利用緩存池。爲何申請的內存會不等於batch.size呢,緣由是在RecordAccumulator#append中有一句 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)), 即若是消息大小大於batch.size則會使用消息的大小申請內存。
  2. 下面代碼可能有點疑惑, moreMemory是添加到waiters的尾部的,爲何獲取的時候是從頭部獲取呢?這個緣由是,線程喚醒只會喚醒waiters頭部的線程,因此當線程被喚醒後,他確定是已經在waiters頭部了,也就是說排在他前面的線程都已經在他以前被喚醒並移除waiters了。
    Condition removed = this.waiters.removeFirst(); // 從頭部獲取,後面詳細講
                 if (removed != moreMemory)
                     throw new IllegalStateException("Wrong condition: this shouldn't happen.");
  3. 申請的總內存查過buffer.memory的時候會阻塞或者拋出異常

4.2 deallocate

BufferPool#deallocate用於將內存釋放並放回到緩存池。同allocate同樣,只有大小等於batch.size的內存塊纔會放到free中。

public void deallocate(ByteBuffer buffer) {
        deallocate(buffer, buffer.capacity());
}

public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);// 只有大小等於batch.size的內存塊纔會放到free中
            } else { // 不然的話只是availableMemory改變,無用的ByteBuffer會被GC清理掉
                this.availableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst(); // 喚醒waiters的頭結點
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
}
相關文章
相關標籤/搜索