深度剖析 Kafka Producer 的緩衝池機制【圖解 + 源碼分析】

如下文章來源於【後端進階】做者張乘輝java

上次跟你們分享的文章「Kafka Producer 異步發送消息竟然也會阻塞?」中提到了緩衝池,後面再通過一番閱讀源碼後,發現了這個緩衝池設計的很棒,被它的設計思想優雅到了,因此忍不住跟你們繼續分享一波。
web

在新版的 Kafka Producer 中,設計了一個消息緩衝池,在建立 Producer 時會默認建立一個大小爲 32M 的緩衝池,也能夠經過 buffer.memory 參數指定緩衝池的大小,同時緩衝池被切分紅多個內存塊,內存塊的大小就是咱們建立 Producer 時傳的 batch.size 大小,默認大小 16384,而每一個 Batch 都會包含一個 batch.size 大小的內存塊,消息就是存放在內存塊當中。整個緩衝池的結構以下圖所示:apache

客戶端將消息追加到對應主題分區的某個 Batch 中,若是 Batch 已經滿了,則會新建一個 Batch,同時向緩衝池(RecordAccumulator)申請一塊大小爲 batch.size 的內存塊用於存儲消息。後端

當 Batch 的消息被髮到了 Broker 後,Kafka Producer 就會移除該 Batch,既然 Batch 持有某個內存塊,那必然就會涉及到 GC 問題,以下:微信

以上,頻繁的申請內存,用完後就丟棄,必然致使頻繁的 GC,形成嚴重的性能問題。那麼,Kafka 是怎麼作到避免頻繁 GC 的呢?app

前面說過了,緩衝池在設計邏輯上面被切分紅一個個大小相等的內存塊,當消息發送完畢,歸還給緩衝池不就能夠避免被回收了嗎?框架

緩衝池的內存持有類是 BufferPool,咱們先來看下 BufferPool 都有哪些成員:異步

public class BufferPool {
  // 總的內存大小
  private final long totalMemory;
  // 每一個內存塊大小,即 batch.size
  private final int poolableSize;
  // 申請、歸還內存的方法的同步鎖
  private final ReentrantLock lock;
  // 空閒的內存塊
  private final Deque<ByteBuffer> free;
  // 須要等待空閒內存塊的事件
  private final Deque<Condition> waiters;
  /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */
  // 緩衝池還未分配的空閒內存,新申請的內存塊就是從這裏獲取內存值
  private long nonPooledAvailableMemory;
 // ...
}

從 BufferPool 的成員可看出,緩衝池實際上由一個個 ByteBuffer 組成的,BufferPool 持有這些內存塊,並保存在成員 free 中,free 的總大小由 totalMemory 做限制,而 nonPooledAvailableMemory 則表示還剩下緩衝池還剩下多少內存還未被分配。編輯器

當 Batch 的消息發送完畢後,就會將它持有的內存塊歸還到 free 中,以便後面的 Batch 申請內存塊時再也不建立新的 ByteBuffer,從 free 中取就能夠了,從而避免了內存塊被 JVM 回收的問題。分佈式

接下來跟你們一塊兒分析申請內存和歸還內存是如何實現的。

一、申請內存

申請內存的入口:

org.apache.kafka.clients.producer.internals.BufferPool#allocate

1)內存足夠的狀況

當用戶請求申請內存時,若是發現 free 中有空閒的內存,則直接從中取:

if (size == poolableSize && !this.free.isEmpty()){
  return this.free.pollFirst(); 
}

這裏的 size 即申請的內存大小,它等於 Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

即若是你的消息大小小於 batchSize,則申請的內存大小爲 batchSize,那麼上面的邏輯就是若是申請的內存大小等於 batchSize 而且 free 不空閒,則直接從 free 中獲取。

咱們不妨想一下,爲何 Kafka 必定要申請內存大小等於 batchSize,才能從 free 獲取空閒的內存塊呢?

前面也說過,緩衝池的內存塊大小是固定的,它等於 batchSize,若是申請的內存比 batchSize 還大,說明一條消息所須要存放的內存空間比內存塊的內存空間還要大,所以不知足需求,不滿組需求怎麼辦呢?咱們接着往下分析:

// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
if (this.nonPooledAvailableMemory + freeListSize >= size) {
  // we have enough unallocated or pooled memory to immediately
  // satisfy the request, but need to allocate the buffer
  freeUp(size);
  this.nonPooledAvailableMemory -= size;
}

freeListSize:指的是 free 中已經分配好而且已經回收的空閒內存塊總大小;

nonPooledAvailableMemory:緩衝池還未分配的空閒內存,新申請的內存塊就是從這裏獲取內存值;

this.nonPooledAvailableMemory + freeListSize:即緩衝池中總的空閒內存空間。

若是緩衝池的內存空間比申請內存大小要大,則調用  freeUp(size); 方法,接着將空閒的內存大小減去申請的內存大小。

private void freeUp(int size) {
  while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
    this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}

freeUp 這個方法頗有趣,它的思想是這樣的:

若是未分配的內存大小比申請的內存還要小,那隻能從已分配的內存列表 free 中將內存空間要回來,直到 nonPooledAvailableMemory 比申請內存大爲止。

2)內存不足的狀況

在個人「Kafka Producer 異步發送消息竟然也會阻塞?」這篇文章當中也提到了,當緩衝池的內存塊用完後,消息追加調用將會被阻塞,直到有空閒的內存塊。

阻塞等待的邏輯是怎麼實現的呢?

// we are out of memory and will have to block
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
  long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
  this.waiters.addLast(moreMemory);
  // loop over and over until we have a buffer or have reserved
  // enough memory to allocate one
  while (accumulated < size) {
    long startWaitNs = time.nanoseconds();
    long timeNs;
    boolean waitingTimeElapsed;
    try {
      waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    } finally {
      long endWaitNs = time.nanoseconds();
      timeNs = Math.max(0L, endWaitNs - startWaitNs);
      recordWaitTime(timeNs);
    }

    if (waitingTimeElapsed) {
      throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
    }

    remainingTimeToBlockNs -= timeNs;

    // check if we can satisfy this request from the free list,
    // otherwise allocate memory
    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
      // just grab a buffer from the free list
      buffer = this.free.pollFirst();
      accumulated = size;
    } else {
      // we'll need to allocate memory, but we may only get
      // part of what we need on this iteration
      freeUp(size - accumulated);
      int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
      this.nonPooledAvailableMemory -= got;
      accumulated += got;
    }
  }

以上源碼的大體邏輯:

首先建立一個本次等待 Condition,而且把它添加到類型爲 Deque 的 waiters 中(後面在歸還內存中會喚醒),while 循環不斷收集空閒的內存,直到內存比申請內存大時退出,在 while 循環過程當中,調用 Condition#await 方法進行阻塞等待,歸還內存時會被喚醒,喚醒後會判斷當前申請內存是否大於 batchSize,若是等與 batchSize 則直接將歸還的內存返回便可,若是當前申請的內存大於 大於 batchSize,則須要調用 freeUp 方法從 free 中釋放空閒的內存出來,而後進行累加,直到大於申請的內存爲止。

二、歸還內存

申請內存的入口:

org.apache.kafka.clients.producer.internals.BufferPool#deallocate(java.nio.ByteBuffer, int)

public void deallocate(ByteBuffer buffer, int size) {
  lock.lock();
  try {
    if (size == this.poolableSize && size == buffer.capacity()) {
      buffer.clear();
      this.free.add(buffer);
    } else {
      this.nonPooledAvailableMemory += size;
    }
    Condition moreMem = this.waiters.peekFirst();
    if (moreMem != null)
      moreMem.signal();
  } finally {
    lock.unlock();
  }
}

歸還內存塊的邏輯比較簡單:

若是歸還的內存塊大小等於 batchSize,則將其清空後添加到緩衝池的 free 中,即將其歸還給緩衝池,避免了 JVM GC 回收該內存塊。若是不等於呢?直接將內存大小累加到未分配而且空閒的內存大小值中便可,內存就無需歸還了,等待 JVM GC 回收掉,最後喚醒正在等待空閒內存的線程。

通過以上的源碼分析以後,給你們指出須要注意的一個問題,若是設置不當,會給 Producer 端帶來嚴重的性能影響:

若是你的消息大小比 batchSize 還要大,則不會從 free 中循環獲取已分配好的內存塊,而是從新建立一個新的 ByteBuffer,而且該 ByteBuffer 不會被歸還到緩衝池中(JVM GC 回收),若是此時 nonPooledAvailableMemory 比消息體還要小,還會將 free 中空閒的內存塊銷燬(JVM GC 回收),以便緩衝池中有足夠的內存空間提供給用戶申請,這些動做都會致使頻繁 GC 的問題出現。

所以,須要根據業務消息的大小,適當調整 batch.size 的大小,避免頻繁 GC。




做者張乘輝,擅長消息中間件技能,負責公司百萬 TPS 級別 Kafka 集羣的維護,公號不按期分享 Kafka、RocketMQ 系列不講概念直接真刀真槍的實戰總結以及細節上的源碼分析;同時做者也是阿里開源分佈式事務框架 Seata Contributor,所以也會不按期分享關於 Seata 的相關知識;固然公號也會不按期發表 WEB 相關知識好比 Spring 全家桶等。不必定面面俱到,但必定讓你感覺到做者對於技術的追求是認真的!


本文分享自微信公衆號 - 瓜農老梁(gh_01130ae30a83)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索