kafka源碼解析4:RecordAccumulator的相關組件BufferPool,CopyOnWriteMap(上)

概述

上節解析了生產者發送消息時RecordAccumulator的相關操做,本節解析下RecordAccumulator用到的其餘組件BufferPool,CopyOnWriteMap數據庫

BufferPool緩衝池

上節講到RecordAccumulator在append的時候會申請緩衝區,每一個批次ProducerBatch在封裝消息時MemoryRecordsBuilder會用到緩衝區ByteBuffer(MemoryRecordsBuilder和消息壓縮這一塊我以爲太邊緣了,策略調整之後只解析關鍵流程和收穫比較多的部分,其餘的有機會再仔細分享)。
若是高頻率的建立ByteBuffer有可能會形成虛擬機的頻繁GC,這裏kafka使用了BufferPool,BufferPool和數據庫鏈接池的設計理念同樣,是一個池子的概念,池子受一個最大使用內存的限制,須要就從池子申請一個ByteBuffer,不夠用了就阻塞等待,用好了再還給池子並清空。api

private final long totalMemory; //buffer.memory buffer容量
    private final int poolableSize; //batch.size 批量大小
    private final ReentrantLock lock; //防止多線程申請空間出現併發問題的重入鎖
    private final Deque<ByteBuffer> free; //空閒buffer隊列
    private final Deque<Condition> waiters; //等待分配buffer的隊列
    /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
    private long nonPooledAvailableMemory;//沒分配到空閒隊列的可用容量,totalMemory = poolableSize * free

allocate和deallocate

既然是個緩衝池,那麼BufferPool主要提供兩個api:借(allocate)和還(deallocate),這兩個api流程比較簡單。多線程

allocate

  1. 對lock進行加鎖。
  2. 若是空閒隊列free里正好有同長度的,就直接出隊並返回ByteBuffer。
  3. 若是空閒隊列裏全部的長度總和與nonPooledAvailableMemory大於申請的長度,循環釋放掉空閒隊列的ByteBuffer直到nonPooledAvailableMemory大於申請長度,而且從新初始化一個須要長度的ByteBuffer並返回。
  4. 若是全部可用(空閒隊列裏全部的長度總和與nonPooledAvailableMemory)仍是小於申請長度,即不符合2的條件,那麼就要阻塞等待了。這裏先從lock初始化了一個條件變量Condition並放入等待隊列,而後調用Condition的await阻塞等待,注意await會釋放掉鎖讓其餘線程拿鎖執行直到接收到signal通知,接到通知後會繼續執行,若是還不知足條件那麼重複這一流程循環阻塞,直到拿到須要長度的ByteBuffer或者超時。
  5. 若是仔細分析一下,要是一個線程申請空間很大,其阻塞的時候其餘線程就會一直執行而且用掉剩餘空間,這樣會產生飢餓啊,我我的理解kafka就是想經過這種機制來保證大容量ByteBuffer申請時不會影響正常容量ByteBuffer的申請。若是有頻繁的大容量ByteBuffer申請,那考慮調整buffer.memory,batch.size這兩個參數或者業務層面規避掉。
  6. 前面加了鎖,finally千萬別忘記釋放掉。
  7. 前面只是保證容量夠用,最後會初始化ByteBuffer。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        //申請的容量比pool總容量還大,拋異常
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");

        ByteBuffer buffer = null;
        this.lock.lock();
        try {
            // 有分配好的直接返回
            if (size == poolableSize && !this.free.isEmpty())
                return this.free.pollFirst();

            // 看看剩餘空閒容量多少
            int freeListSize = freeSize() * this.poolableSize;
            if (this.nonPooledAvailableMemory + freeListSize >= size) {
                // 還夠的話釋放掉空閒的
                freeUp(size);
                // 沒分配的容量先預扣掉
                this.nonPooledAvailableMemory -= size;
            } else {
                // 剩餘也不夠
                // accumulated 用來標記如今已經準備好了多少,若是異常了已經準備多少了得在finally里加回去,要不就泄露了
                int accumulated = 0;
                // 設置一個條件變量,用來線程間同步用
                Condition moreMemory = this.lock.newCondition();
                try {
                    //一直不夠也不會一直等,設置一個最大等待時間
                    long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                    this.waiters.addLast(moreMemory);
                    // loop over and over until we have a buffer or have reserved
                    // 循環等待
                    while (accumulated < size) {
                        long startWaitNs = time.nanoseconds();
                        long timeNs;
                        boolean waitingTimeElapsed;
                        try {
                            //不夠就先釋放鎖,而後阻塞
                            waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                        } finally {
                            long endWaitNs = time.nanoseconds();
                            timeNs = Math.max(0L, endWaitNs - startWaitNs);
                            this.waitTime.record(timeNs, time.milliseconds());
                        }

                        if (waitingTimeElapsed) {
                            throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
                        }

                        remainingTimeToBlockNs -= timeNs;

                        // check if we can satisfy this request from the free list,
                        // otherwise allocate memory
                        if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                            // 若是有線程正好釋放掉了,那就直接從free彈出來一個
                            buffer = this.free.pollFirst();
                            // accumulated也得標記一下拿到了
                            accumulated = size;
                        } else {
                            // free不夠,可是總剩餘夠了,先釋放掉free裏的
                            freeUp(size - accumulated);
                            int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                            // 剩餘的先扣除掉
                            this.nonPooledAvailableMemory -= got;
                            // 標記一下,拿到了須要的容量
                            accumulated += got;
                        }
                    }
                    // Don't reclaim memory on throwable since nothing was thrown
                    //重置爲0
                    accumulated = 0;
                } finally {
                    // When this loop was not able to successfully terminate don't loose available memory
                    //異常狀況每分出去,準備好的要加回去,若是每一場的話accumulated是0
                    this.nonPooledAvailableMemory += accumulated;
                    //把放在隊列的條件變量彈出去,該下一個條件變量準備分配了
                    this.waiters.remove(moreMemory);
                }
            }
        } finally {
            // signal any additional waiters if there is more memory left
            // over for them
            try {
                //要是如今剩餘容量沒了那也別費勁signal下一個了,繼續阻塞着吧
                //這樣會不會致使其餘阻塞線程一直等下去呢?不會,由於歸還的時候也會signal的
                if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
                    this.waiters.peekFirst().signal();
            } finally {
                // Another finally... otherwise find bugs complains
                lock.unlock();
            }
        }

        if (buffer == null)
            return safeAllocateByteBuffer(size);
        else
            return buffer;
    }

deallocate

還實在是沒啥可解析的,簡單點。併發

  1. 別忘加鎖,多線程環境下不加鎖確定池子要泄露的。
  2. clear清空,若是size和poolableSize相等的話先不釋放掉放在free隊列裏,萬一下次有申請的直接複用了不須要初始化,若是不相等的話nonPooledAvailableMemory直接容量加回去就行了調用方會把引用指向null,虛擬機會回收的。
  3. 通知在等待的線程,若是都是大容量的申請這裏是個公平隊列哈先來的先分配,後來的後分配。
  4. 解鎖別忘了。
public void deallocate(ByteBuffer buffer, int size) {
        lock.lock();
        try {
            if (size == this.poolableSize && size == buffer.capacity()) {
                buffer.clear();
                this.free.add(buffer);
            } else {
                this.nonPooledAvailableMemory += size;
            }
            Condition moreMem = this.waiters.peekFirst();
            if (moreMem != null)
                moreMem.signal();
        } finally {
            lock.unlock();
        }
    }

結束

臥槽有點累了,這節分個上下吧,CopyOnWriteMap下節講。app

相關文章
相關標籤/搜索