上節解析了生產者發送消息時RecordAccumulator的相關操做,本節解析下RecordAccumulator用到的其餘組件BufferPool,CopyOnWriteMap數據庫
上節講到RecordAccumulator在append的時候會申請緩衝區,每一個批次ProducerBatch在封裝消息時MemoryRecordsBuilder會用到緩衝區ByteBuffer(MemoryRecordsBuilder和消息壓縮這一塊我以爲太邊緣了,策略調整之後只解析關鍵流程和收穫比較多的部分,其餘的有機會再仔細分享)。
若是高頻率的建立ByteBuffer有可能會形成虛擬機的頻繁GC,這裏kafka使用了BufferPool,BufferPool和數據庫鏈接池的設計理念同樣,是一個池子的概念,池子受一個最大使用內存的限制,須要就從池子申請一個ByteBuffer,不夠用了就阻塞等待,用好了再還給池子並清空。api
private final long totalMemory; //buffer.memory buffer容量 private final int poolableSize; //batch.size 批量大小 private final ReentrantLock lock; //防止多線程申請空間出現併發問題的重入鎖 private final Deque<ByteBuffer> free; //空閒buffer隊列 private final Deque<Condition> waiters; //等待分配buffer的隊列 /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */ private long nonPooledAvailableMemory;//沒分配到空閒隊列的可用容量,totalMemory = poolableSize * free
既然是個緩衝池,那麼BufferPool主要提供兩個api:借(allocate)和還(deallocate),這兩個api流程比較簡單。多線程
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { //申請的容量比pool總容量還大,拋異常 if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // 有分配好的直接返回 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // 看看剩餘空閒容量多少 int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // 還夠的話釋放掉空閒的 freeUp(size); // 沒分配的容量先預扣掉 this.nonPooledAvailableMemory -= size; } else { // 剩餘也不夠 // accumulated 用來標記如今已經準備好了多少,若是異常了已經準備多少了得在finally里加回去,要不就泄露了 int accumulated = 0; // 設置一個條件變量,用來線程間同步用 Condition moreMemory = this.lock.newCondition(); try { //一直不夠也不會一直等,設置一個最大等待時間 long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // 循環等待 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { //不夠就先釋放鎖,而後阻塞 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // 若是有線程正好釋放掉了,那就直接從free彈出來一個 buffer = this.free.pollFirst(); // accumulated也得標記一下拿到了 accumulated = size; } else { // free不夠,可是總剩餘夠了,先釋放掉free裏的 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); // 剩餘的先扣除掉 this.nonPooledAvailableMemory -= got; // 標記一下,拿到了須要的容量 accumulated += got; } } // Don't reclaim memory on throwable since nothing was thrown //重置爲0 accumulated = 0; } finally { // When this loop was not able to successfully terminate don't loose available memory //異常狀況每分出去,準備好的要加回去,若是每一場的話accumulated是0 this.nonPooledAvailableMemory += accumulated; //把放在隊列的條件變量彈出去,該下一個條件變量準備分配了 this.waiters.remove(moreMemory); } } } finally { // signal any additional waiters if there is more memory left // over for them try { //要是如今剩餘容量沒了那也別費勁signal下一個了,繼續阻塞着吧 //這樣會不會致使其餘阻塞線程一直等下去呢?不會,由於歸還的時候也會signal的 if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } finally { // Another finally... otherwise find bugs complains lock.unlock(); } } if (buffer == null) return safeAllocateByteBuffer(size); else return buffer; }
還實在是沒啥可解析的,簡單點。併發
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
臥槽有點累了,這節分個上下吧,CopyOnWriteMap下節講。app