Netty內存池之PoolThreadCache詳解

       PoolThreadCahche是Netty內存管理中可以實現高效內存申請和釋放的一個重要緣由,Netty會爲每個線程都維護一個PoolThreadCache對象,當進行內存申請時,首先會嘗試從PoolThreadCache中申請,若是沒法從中申請到,則會嘗試從Netty的公共內存池中申請。本文首先會對PoolThreadCache的數據結構進行講解,而後會介紹Netty是如何初始化PoolThreadCache的,最後會介紹如何在PoolThreadCache中申請內存和如何將內存釋放到PoolThreadCache中。java

1. PoolThreadCache數據結構

       PoolThreadCache的數據結構與PoolArena的主要屬性結構很是類似,但細微位置有很大的不一樣。在PoolThreadCache中,其維護了三個數組(咱們以直接內存的緩存方式爲例進行講解),以下所示:數組

// 存儲tiny類型的內存緩存,該數組長度爲32,其中只有下標爲1~31的元素緩存了有效數據,第0號位空置。
// 這裏內存大小的存儲方式也與PoolSubpage相似,數組的每一號元素都存儲了不一樣等級的內存塊,每一個等級的
// 內存塊的內存大小差值爲16byte,好比第1號位維護了大小爲16byte的內存塊,第二號爲維護了大小爲32byte的
// 內存塊,依次類推,第31號位維護了大小爲496byte的內存塊。
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
// 存儲small類型的內存緩存,該數組長度爲4,數組中每一個元素中維護的內存塊大小也是成等級遞增的,而且這裏
// 的遞增方式是按照2的指數次冪進行的,好比第0號爲維護的是大小爲512byte的內存塊,第1號位維護的是大小爲
// 1024byte的內存塊,第2號位維護的是大小爲2048byte的內存塊,第3號位維護的是大小爲4096byte的內存塊
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
// 存儲normal類型的內存緩存。須要注意的是,這裏雖然說是維護的normal類型的緩存,可是其只維護2<<13,2<<14
// 和2<<15三個大小的內存塊,而該數組的大小也正好爲3,於是這三個大小的內存塊將被依次放置在該數組中。
// 若是申請的目標內存大於2<<15,那麼Netty會將申請動做交由PoolArena進行。
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

       這三個數組分別保存了tiny,small和normal類型的緩存數據,不一樣於PoolArena的使用PoolSubpage和PoolChunk進行內存的維護,這裏都是使用MemoryRegionCache進行的。另外,在MemoryRegionCache中保存了一個有界隊列,對於tiny類型的緩存,該隊列的長度爲512,對於small類型的緩存,該隊列的長度爲256,對於normal類型的緩存,該隊列的長度爲64。在進行內存釋放的時候,若是隊列已經滿了,那麼就會將該內存塊釋放回PoolArena中。這裏須要說明的是,這裏的隊列中的元素統一使用的是Entry這種數據結構,該結構的主要屬性以下:緩存

static final class Entry<T> {
  // 用於循環利用當前Entry對象的處理器,該處理器的實現原理,咱們後續將進行講解
  final Handle<Entry<?>> recyclerHandle;
  // 記錄了當前內存塊是從哪個PoolChunk中申請得來的
  PoolChunk<T> chunk;
  // 若是是直接內存,該屬性記錄了當前內存塊所在的ByteBuffer對象
  ByteBuffer nioBuffer;
  // 因爲當前申請的內存塊在PoolChunk以及PoolSubpage中的位置是能夠經過一個長整型參數來表示的,
  // 這個長整型參數就是這裏的handle,於是這裏直接將其記錄下來,以便後續須要將當前內存塊釋放到
  // PoolArena中時,可以快速獲取其所在的位置
  long handle = -1;
}

       PoolThreadCache中維護每個內存塊最終都是使用的一個Entry對象來進行的,從上面的屬性能夠看出,記錄該內存塊最重要的屬性是chunk和handle,chunk記錄了當前內存塊所在的PoolChunk對象,而handle則記錄了當前內存塊是在PoolChunk和PoolSubpage中的哪一個位置(關於PoolChunk,PoolSubpage和PoolArena的實現原理,建議讀者閱讀一下前面的文章,這樣有助於讀者快速理解相關原理)。如此,對於Netty使用的PoolThreadCache的存儲結構咱們就有了一個比較清晰的認識。下面咱們經過一幅圖來對PoolThreadCache的數據結構進行一個總體的演示:數據結構

image.png

       如上圖所示展現的就是PoolThreadCache的結構示意圖。從圖中能夠看出在一個PoolThreadCache中,主要有三個MemoryRegionCache數組用於存儲tiny,small和normal類型的內存塊。每一個MemoryRegionCache中有一個隊列,隊列中的元素類型爲Entry。Entry的做用就是存儲緩存的內存塊的,其存儲的方式主要是經過記錄當前內存塊所在的PoolChunk和標誌其在PoolChunk中位置的handle參數。對於不一樣類型的數組,隊列的長度是不同的,tiny類型的是512,small類型的是256,normal類型的則是64。ide

2. PoolThreadCache初始化

       對於PoolThreadCache的初始化,這裏單獨拿出來說解的緣由是,其初始化過程是與PoolThreadLocalCache所綁定的。PoolThreadLocalCache的做用與Java中的ThreadLocal的做用很是相似,其有一個initialValue()方法,用於在沒法從PoolThreadLocalCache中獲取數據時,經過調用該方法初始化一個。另外其提供了一個get()方法和和remove()方法,分別用於從PoolThreadLocalCache中將當前綁定的數據給清除。這裏咱們首先看看獲取PoolThreadCache的入口代碼:this

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
  // 從PoolThreadLocalCache中嘗試獲取一個PoolThreadCache對象,
  // 若是不存在,則自行初始化一個返回
  PoolThreadCache cache = threadCache.get();
  // 因爲當前方法是須要返回一個direct buffer,於是這裏直接使用cache中的directArena
  PoolArena<ByteBuffer> directArena = cache.directArena;

  final ByteBuf buf;
  if (directArena != null) {
    // 若是directArena不爲空,則直接調用其allocate()方法申請內存
    buf = directArena.allocate(cache, initialCapacity, maxCapacity);
  } else {
    // 若是當前緩存中因爲某種緣由沒法獲取到directArena,則直接建立一個存有直接內存的ByteBuf,
    // 通常狀況下不會走到這一步
    buf = PlatformDependent.hasUnsafe() ?
      UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
  }

  // 爲ByteBuf設置內存泄露檢測功能
  return toLeakAwareBuffer(buf);
}

       從上面的代碼中能夠看出,在最開始的時候,就會經過PoolThreadLocalCache嘗試獲取一個PoolThreadCache對象,若是不存在,其會自行初始化一個。這裏咱們直接看其是如何初始化的,以下是PoolThreadLocalCache.initialValue()方法的源碼:線程

@Override
protected synchronized PoolThreadCache initialValue() {
  // 這裏leastUsedArena()就是獲取對應的PoolArena數組中最少被使用的那個Arena,將其返回。
  // 這裏的判斷方式是經過比較PoolArena.numThreadCaches屬性來進行的,該屬性記錄了當前PoolArena被
  // 多少個線程所佔用了。這裏採用的思想就是,找到最少被使用的那個PoolArena,將其存入新的線程緩存中
  final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
  final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

  Thread current = Thread.currentThread();
  // 只有在指定了爲每一個線程使用緩存,或者當前線程是FastThreadLocalThread的子類型時,纔會使用線程緩存
  if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
    return new PoolThreadCache(
      heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
      DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
  }
  // 若是指定了不使用緩存,或者線程換粗對象不是FastThreadLocalThread類型的,則建立一個PoolThreadCache
  // 對象,該對象中是不作任何緩存的,由於初始化數據都是0
  return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}

private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
  if (arenas == null || arenas.length == 0) {
    return null;
  }

  // 在PoolArena數組中找到被最少線程佔用的對象,將其返回。這樣作的目的是,因爲內存池是多個線程均可以
  // 訪問的公共區域,於是當這裏就須要對內存池進行劃分,以減小線程之間的競爭。
  PoolArena<T> minArena = arenas[0];
  for (int i = 1; i < arenas.length; i++) {
    PoolArena<T> arena = arenas[i];
    if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
      minArena = arena;
    }
  }

  return minArena;
}

       從上述代碼能夠看出,對於PoolThreadCache的初始化,其首先會查找PoolArena數組中被最少線程佔用的那個arena,而後將其封裝到一個新建的PoolThreadCache中。code

3. 內存申請

       須要注意的是,PoolThreadCache申請內存並非說其會建立一塊內存,或者說其會到PoolArena中申請內存,而是指,其自己已經緩存有內存塊,而當前申請的內存塊大小正好與其一致,就會將該內存塊返回;PoolThreadCache中的內存塊都是在當前線程使用完建立的ByteBuf對象後,經過調用其release()方法釋放內存時直接緩存到當前PoolThreadCache中的,其並不會直接將內存塊返回給PoolArena。這裏咱們直接看一下其allocate()方法是如何實現的:orm

// 申請tiny類型的內存塊
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
  return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

// 申請small類型的內存塊
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}

// 申請normal類型的內存塊
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, 
     int reqCapacity, int normCapacity) {
  return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}

// 從MemoryRegionCache中申請內存
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
  if (cache == null) {
    return false;
  }

  // 從MemoryRegionCache中申請內存,本質上就是從其隊列中申請,若是存在,則初始化申請到的內存塊
  boolean allocated = cache.allocate(buf, reqCapacity);
  // 這裏是若是當前PoolThreadCache中申請內存的次數達到了8192次,則對內存塊進行一次trim()操做,
  // 對使用較少的內存塊,將其返還給PoolArena,以供給其餘線程使用
  if (++allocations >= freeSweepAllocationThreshold) {
    allocations = 0;
    trim();
  }
  return allocated;
}

       這裏對於內存塊的申請,咱們能夠看到,PoolThreadCache是將其分爲tiny,small和normal三種不一樣的方法來調用的,而具體大小的區分實際上是在PoolArena中進行區分的(讀者能夠閱讀本人前面的關於PoolArena介紹的文章)。在對應的內存數組中找到MemoryRegionCache對象以後,經過調用allocate()方法來申請內存,申請完以後還會檢查當前緩存申請次數是否達到了8192次,達到了則對緩存中使用的內存塊進行檢測,將較少使用的內存塊返還給PoolArena。這裏咱們首先看一下獲取MemoryRegionCache的代碼是如何實現的,也即cacheForTiny(),cacheForSmall()和cacheForNormal()的代碼:對象

private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
  // 計算當前數組下標索引,因爲tiny類型的內存塊每一層級相差16byte,於是這裏的計算方式就是
  // 將目標內存大小除以16
  int idx = PoolArena.tinyIdx(normCapacity);
  // 返回tiny類型的數組中對應位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(tinySubPageDirectCaches, idx);
  }
  return cache(tinySubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
  // 計算當前數組下標的索引,因爲small類型的內存塊大小都是2的指數次冪,於是這裏就是將目標內存大小
  // 除以1024以後計算其偏移量
  int idx = PoolArena.smallIdx(normCapacity);
  // 返回small類型的數組中對應位置的MemoryRegionCache
  if (area.isDirect()) {
    return cache(smallSubPageDirectCaches, idx);
  }
  return cache(smallSubPageHeapCaches, idx);
}

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
  // 對於normal類型的緩存,這裏也是首先將其向右位移13位,也就是8192,而後取2的對數,這樣就
  // 能夠獲得其在數組中的位置,而後返回normal類型的數組中對應位置的MemoryRegionCache
  if (area.isDirect()) {
    int idx = log2(normCapacity >> numShiftsNormalDirect);
    return cache(normalDirectCaches, idx);
  }
  int idx = log2(normCapacity >> numShiftsNormalHeap);
  return cache(normalHeapCaches, idx);
}

       這裏對於數組位置的計算,主要是根據各個數組數據存儲方式的不一樣而進行的,而它們最終都是經過一個MemoryRegionCache存儲的,於是只須要返回該緩存對象便可。下面咱們繼續看一下MemoryRegionCache.allocate()方法是如何申請內存的:

public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
  // 嘗試從隊列中獲取,若是隊列中不存在,說明沒有對應的內存塊,則返回false,表示申請失敗
  Entry<T> entry = queue.poll();
  if (entry == null) {
    return false;
  }
  
  // 走到這裏說明隊列中存在對應的內存塊,那麼經過其存儲的Entry對象來初始化ByteBuf對象,
  // 如此即表示申請內存成功
  initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
  // 對entry對象進行循環利用
  entry.recycle();

  // 更新當前已經申請的內存數量
  ++allocations;
  return true;
}

       能夠看到,MemoryRegionCache申請內存的方式主要是從隊列中取,若是取到了,則使用該內存塊初始化一個ByteBuf對象。

       前面咱們講到,PoolThreadCache會對其內存塊使用次數進行計數,這麼作的目的在於,若是一個ThreadPoolCache所緩存的內存塊使用較少,那麼就能夠將其釋放到PoolArena中,以便於其餘線程能夠申請使用。PoolThreadCache會在其內存總的申請次數達到8192時遍歷其全部的MemoryRegionCache,而後調用其trim()方法進行內存釋放,以下是該方法的源碼:

public final void trim() {
  // size表示當前MemoryRegionCache中隊列的最大可存儲容量,allocations表示當前MemoryRegionCache
  // 的內存申請次數,size-allocations的含義就是判斷當前申請的次數是否連隊列的容量都沒達到
  int free = size - allocations;
  allocations = 0;

  // 若是申請的次數連隊列的容量都沒達到,則釋放該內存塊
  if (free > 0) {
    free(free);
  }
}

private int free(int max) {
  int numFreed = 0;
  // 依次從隊列中取出Entry數據,調用freeEntry()方法釋放該Entry
  for (; numFreed < max; numFreed++) {
    Entry<T> entry = queue.poll();
    if (entry != null) {
      freeEntry(entry);
    } else {
      return numFreed;
    }
  }
  return numFreed;
}

private void freeEntry(Entry entry) {
  // 經過當前Entry中保存的PoolChunk和handle等數據釋放當前內存塊
  PoolChunk chunk = entry.chunk;
  long handle = entry.handle;
  ByteBuffer nioBuffer = entry.nioBuffer;
  entry.recycle();
  chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}

4. 內存釋放

       對於內存的釋放,其原理比較簡單,通常的釋放內存的入口在ByteBuf對象中。當調用ByteBuf.release()方法的時候,其首先會將釋放動做委託給PoolChunk的free()方法,PoolChunk則會判斷當前是不是池化的ByteBuf,若是是池化的ByteBuf,則調用PoolThreadCache.add()方法將其添加到PoolThreadCache中,也就是說在釋放內存時,其其實是釋放到當前線程的PoolThreadCache中的。以下是add()方法的源碼:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
    long handle, int normCapacity, SizeClass sizeClass) {
  // 經過當前釋放的內存塊的大小計算其應該放到哪一個等級的MemoryRegionCache中
  MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
  if (cache == null) {
    return false;
  }
  
  // 將內存塊釋放到目標MemoryRegionCache中
  return cache.add(chunk, nioBuffer, handle);
}

public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
  // 這裏會嘗試從緩存中獲取一個Entry對象,若是沒獲取到則建立一個
  Entry<T> entry = newEntry(chunk, nioBuffer, handle);
  // 將實例化的Entry對象放到隊列裏
  boolean queued = queue.offer(entry);
  if (!queued) {
    entry.recycle();
  }

  return queued;
}

5. 小結

       本文首先詳細講解了PoolThreadCache的數據結構,而且說明了其中須要注意的點,而後介紹了PoolThreadCache的實例化方式,接着從申請和釋放內存兩個角度介紹了PoolThreadCache源碼的實現方式。

相關文章
相關標籤/搜索