Netty5_內存管理_源碼講解

歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java

read事件發生,Netty須要從內核中讀取數據到本身內部能夠管轄的緩衝區,怎麼進行分配?使用完畢後,怎麼釋放?已經write方法調用,怎樣將相應數據進行緩衝區分配,以及write事件發生,flush完成後,怎樣將緩衝區釋放?數組

  1. read內存分配promise

要知道read是怎樣進行進行內存分配的首先要知道是什麼進行分配的,分配完以後,怎麼進行內存回收?每次分配新的ByteBuf大小是多少?緩存

  • 分配內存:假設是初次進行分配(同一個socket屢次進行分配的狀況,後面會講到.),咱們看一下是何時進行分配的.上代碼:安全

  •                 int byteBufCapacity = allocHandle.guess();
                    int totalReadAmount = 0;
                    do {
                        //多是 direct或者 heap  從與當前socket相關的allocator獲得byteBuf數組
    //                    byteBuf =allocHandle.allocate(allocator);
                        byteBuf = allocator.ioBuffer(byteBufCapacity);
                        int writable = byteBuf.writableBytes(); //分一個多大的內存就從socket中讀取多大的數據
                        int localReadAmount = doReadBytes(byteBuf);//從socket中讀取數據到bytebuf中
                        if (localReadAmount <= 0) {
                            // not was read release the buffer
                            byteBuf.release();//釋放到Thread Cache中
                            close = localReadAmount < 0;//是否進行關閉,關鍵要看讀取到的數據的長度是否爲-1;
                            break;
                        }
                        //發起讀取事件---若是是第一次積累數據的話,那麼就會將當前的bytebuf做爲累積對象,供繼續使用
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;//由pipeline進行byteBuf的釋放
                        //避免內存溢出,
                        if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                            // Avoid overflow.
                            totalReadAmount = Integer.MAX_VALUE;
                            break;
                        }
    
                        totalReadAmount += localReadAmount;
                        if (localReadAmount < writable) {
                            // Read less than what the buffer can hold,
                            // which might mean we drained the recv buffer completely.
                            break;
                        }
                    } while (++ messages < maxMessagesPerRead);//每次讀取的消息的數量
                    //讀取完成---處理完一次 讀取事件
                    pipeline.fireChannelReadComplete();
                    allocHandle.record(totalReadAmount);
                    if (close) {
                        closeOnRead(pipeline);
                        close = false;
                    }
  • 從中能夠看出,就是經過ByteBufAllocator.alloc(capacity)進行分配的。(capacity參數的大小是不斷變化的。具體的咱們會稍後介紹.)。下面咱們看一下ByteBufAllocator.alloc(capacity)的具體實現:上代碼:數據結構

  •  public ByteBuf ioBuffer(int initialCapacity) {
            if (PlatformDependent.hasUnsafe()) {
                return directBuffer(initialCapacity);
            }
            return heapBuffer(initialCapacity);
        }
        public ByteBuf heapBuffer(int initialCapacity) {
            return heapBuffer(initialCapacity, Integer.MAX_VALUE);
        }
    
        public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
            if (initialCapacity == 0 && maxCapacity == 0) {
                return emptyBuf;
            }
            validate(initialCapacity, maxCapacity);
            return newHeapBuffer(initialCapacity, maxCapacity);//newHeapBuffer是個抽象方法
        }
  • 由此能夠看出,netty是鼓勵使用直接內存。newHeapBuffer是一個抽象方法,這裏咱們僅僅關注他在類PooledByteBufAllocator的實現(另外一個實現UnpooledByteBufAllocator咱們這就不講了,至於爲何本身去想--緣由很簡單,就是實際使用的狀況不多,好歹前一個仍是用了基於池的分配方式,避免了重複不斷的分配,能夠進行不斷重複的利用。)。上PooledByteBufAllocator的newHeapBuffer實現:less

  •  protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
            //若是在線程中已經存在一個cache 沒有的話,那麼就會調用initialValue進行初始化
            PoolThreadCache cache = threadCache.get();
            PoolArena<byte[]> heapArena = cache.heapArena;
            ByteBuf buf;
            if (heapArena != null) {
                buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
            } else {//若是沒有的話,那麼就會分配一個不禁當前的allocator管理的bytebuf
                buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
  • 這裏出現了一個基於ThreadLocal的變量,這個ThreadLocal存儲的變量類型是PoolThreadCache。PoolThreadCache有個heap和direct的兩個變量,這兩個變量是用來分配direct和heap內存的。咱們來看一下threadCache的初始化代碼:socket

  •     private final PoolArena<byte[]>[] heapArenas;
        private final PoolArena<ByteBuffer>[] directArenas;
        final ThreadLocal<PoolThreadCache> threadCache = new ThreadLocal<PoolThreadCache>() {
            private final AtomicInteger index = new AtomicInteger();//爲每個線程都會分配一個heapArena和directArena
            //在爲某個線程初次調用get方法時,會調用如下此方法,用於初始化爲當前線程要初始化的數據---一個線程中的內存池盛放的數據能夠是直接內存或者堆內存
            protected PoolThreadCache initialValue() {
                final int idx = index.getAndIncrement();
                final PoolArena<byte[]> heapArena;
                final PoolArena<ByteBuffer> directArena;
                //爲線程分配的區域能夠是direct和heap的組合
                if (heapArenas != null) {
                    heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
                } else {
                    heapArena = null;
                }
    
                if (directArenas != null) {
                    directArena = directArenas[Math.abs(idx % directArenas.length)];
                } else {
                    directArena = null;
                }
    
                return new PoolThreadCache(heapArena, directArena);
            }
        };
  • initialValue方法就是爲當前的thread生成PoolThreadCache對象的初始化代碼。PoolThreadCache的的directArena和heapArena的賦值也是分別從數組directArenas和heapArenas中取摸獲得index,分別摘取兩個元素獲得的。由此能夠看出:ide

    • 假設是個socketChannel爲ABCD都有本身的PooledByteBufAllocator(就是在config()進行設置唄)。不過ABCDsocketChannel有哪個線程進行處理,他們的treadCache都是不可能相同的。由於treadCache沒有static修飾符。可是這裏須要注意一個問題,就是一個pipeline對應一個獨立的PooledByteBufAllocator的時候,PooledByteBufAllocator的成員變量heapArenas和direcArenas數組的長度爲1.不然會形成浪費。由於,threaCache一旦初始化完畢,就不會變化了,使用到的directArena和heapArena就是固定下來了,數組長度長度超過1,數組中的剩餘元素是不會被使用到的(由於每個pipeline對應一個PooledByteBufAllocator)。注意一下(能夠從PooledByteBufAllocator的源代碼中找到的): 怎麼去調整一個PooledByteBufAllocator的變量heapArenas和direcArenas的數組長度呢?咱們能夠經過設置io.netty.allocator.numHeapArenas和io.netty.allocator.numDirectArenas來設置PooledByteBufAllocator中的heapArenas和direcArenas的數組長度(固然,也能夠在初始化PooledByteBufAllocator調用構造函數,進行自定義)。這一點很重要哦!函數

    • A,B,C,D的pipeline使用同一個PooledByteBufAllocator,可是AB的事件有一個worker線程T1進行處理,可是cd的事件由另外一個worker線程T2處理,那麼cd和Ab使用的threadCache就是不一樣的,由於threadCache都是和線程進行綁定的。這個時候,就要將PooledByteBufAllocator中的heapArenas和direcArenas的數組長度設置的大一點。至於多少合適,具體應用具體對待。

  • 概述一下: 就是一個workerthread能夠管理多個socket的讀寫事件,那麼在進行內存分配時,內存的分配就要使用每個socektChannel的PooledByteBufAllocator對象,爲當前的thread分配的threadCache進行內存分配。PooledByteBufAllocator是基於內存池的形式進行使用的。至於好處,不進行多講了!!

  • 到目前爲止,咱們已經講述了內存分配的對象使用狀況,能夠當作是講述了一下read事件的過程當中,內存分配的對象圖狀況。下面咱們再來看看,PooledByteBufAllocator的heapArenas和directArenas的初始化狀況,上代碼:

  •  public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
            super(preferDirect);//查看是否字節內存可用,若是可用,則生成一個空directMemory
    
            final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
    
            int pageShifts = validateAndCalculatePageShifts(pageSize);
    
            if (nHeapArena > 0) {//
                heapArenas = newArenaArray(nHeapArena);
                for (int i = 0; i < heapArenas.length; i ++) {
                    heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize);
                }
            } else {
                heapArenas = null;
            }
    
            if (nDirectArena > 0) {
                directArenas = newArenaArray(nDirectArena);
                for (int i = 0; i < directArenas.length; i ++) {
                    directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize);
                }
            } else {
                directArenas = null;
            }
        }
  • 從中能夠看到PooledByteBufAllocator對象的heapArenas和directArenas分配都是經過直接調用PoolArena.HeapArena和PoolArena.DirectArena進行分配的。至於這兩個靜態方法的具體實現,咱們這裏就不講了,網上資料也有很多。我粘貼一下我本身收集的材料:http://note.youdao.com/share/?id=beb551ba796db0741d4ff75755b70c4a&type=note  這是我本身的網易雲筆記收集的材料,應該夠用了,很簡單,不是多麼複雜,就是一個分大小按組分配 的過程。你們本身看看吧!共享會永遠存在的!!

2. read內存回收:

若是觀察過netty的pipeline,確定會注意到的一點就是第一個ChannelHandler確定是ByteToMessageDecoder,每一次read事件發生,所以分配的byteBuf都是直接調用該Handler的channelRead()方法,至於handler對此bytebuf後續怎樣的處理,上層調用是不進行管理的。也就是說,ByteBuf的一些別的操做(例如釋放、合併等)都是在ByteToMessageDecoder內完成的。下面咱們來看一下ByteToMessageDecoder的channelRead方法的具體實現,看看對byteBuf進行了什麼操做(若是你們看過個人read事件處理的博客,那麼也會提早了解).上代碼

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    //緩衝區的大小沒有超過須要寫入的數據的大小
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                        expandCumulation(ctx, data.readableBytes());
                    }
                    cumulation.writeBytes(data);//將數據寫入到積累對象中
                    data.release();//釋放bytebuffer(heap或者direct)--經過引用的方式進行釋放緩衝區
                }
                //收集完畢以後解析收集到的字符串
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {//若是累積對象爲null或者沒有可讀內容的話,那麼就將累積對象釋放掉(由於空了或者爲null了)
                    cumulation.release();
                    cumulation = null;
                }
                int size = out.size();
                decodeWasNull = size == 0;
                //針對解析後的out結果,逐個調用message
                for (int i = 0; i < size; i ++) {
                    ctx.fireChannelRead(out.get(i));
                }
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void expandCumulation(ChannelHandlerContext ctx, int readable) {
        ByteBuf oldCumulation = cumulation;//新的容量=舊的容量+可讀取的數量  ---在此處的擴展和初次的分配都是經過同一個allocator進行分配的
        cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable);
        cumulation.writeBytes(oldCumulation);//複製的過程
        oldCumulation.release();//釋放老的對象
    }

從前面read事件處理流程博文能夠知曉,ByteToMessageDecoder的cumulation對象起到bytebuf累積做用的對象。當前ByteToMessageDecoder.cumulation不能盛放傳過來的bytebuf,那麼就要調用expandCumulation進行緩衝擴展。在expandCumulation實現中,也是經過Allocator分配一個更大的(可以盛放原先的cumulation數據和即將添加的bytebuf數據之和),而後將原先老的cumulation中的數據複製一下就能夠了。

小總結: 在進行bytebuf累積的過程當中也要面臨着容器cumulation的不斷擴充。每個decoderHandler都會有一個cumulation對象。一個socket對應一個decoderHandler對象。

上面咱們講到解析完數據以後,須要對累積對象bytebuf是怎樣進行釋放?  在channelReadComplete(每一次Read事件發生讀取byte數據完成後)會調用cumulation.discardSomeReadBytes()釋放空間)。再就是要說的就是,經過建立出來的bytebuf msg都是堆類型的,不用了就不用管了,gc會進行垃圾回收的。這個問題你們要記住哦!! 其實byte Msg 的是否爲直接內存或者堆內存都要取決於decode的具體實現是怎樣實現的(我看了一下具體的實現都是heap類型的msg,固然也能夠經過使用(ByteBuf)msg.isDirect()進行判斷)。咱們本身想一下也應該知道,其實msg爲heap類型的是最好的,由於msg會被後續的各類各樣的handler使用。

講一下累積對象的釋放,其實講的就是bytebuf的release方法。

歡迎你們吐槽! 本文僅僅是我本人本身的總結,不是太權威,若是有不一樣意的地方在oschina博客中吐槽

總結一下: 目前爲止,講解了一下,read事件中的分配和釋放問題,總結一下,分配時,是從本pipeline的Allocator爲當前線程分配的treadcache中得到direct或者heap緩衝,取得適合大小的一塊,標記引用數目一下就好了,釋放時,減1! 。一個Allocator的heaps和directs數組長度能夠經過特定參數進行設置。累積對象會在channelReadComplete事件發生時,在ByteToMessageDecoder的channelReadComplete事件處理中調用discardSomeReadBytes(須要瞭解一下ByteBuf的數據結構,本身能夠查看一下是怎麼實現的!)釋放部分數據的。解析出來的msg不用進行手動釋放,由於都是基於非pool的heap類型的,由垃圾進行回收的,之因此netty這樣設計msg的返回類型,依我我的的見解,就是由於msg還會被不少的後續的handler進行訪問,二次解碼等。

    write內存分配

其實write事件處理流程僅僅涉及到ms保存到entries數組中。沒有內存分配問題。若是非得說是有內存分配問題得話,那麼write(msg)的msg的類型能夠說成是內存分配問題。msg能夠是bytebuf或者對象類型。能夠調用工具類或者Allocator直接進行內存申請便可。

flush事件內存管理

  1. 若是調用write(msg)類型爲bytebuf而且此bytebuf爲堆類型的話,那麼就將其轉換成direct內存。

    1. 在分配直接內存的時候,若是爲當前socekt的Allocator爲isDirectBufferPooled的話,那麼就那麼就分配一個直接內存bytebuf。

    2. 不是得話,就從線程中綁定的ThreadLocalPooledByteBuf生成一個(此方式咱們後續還會講解)

    3. 分配完成後,將原先的數據寫入到此bytebuf,而後釋放老的bytebuf。將新的bytebuf添加到entry的末尾。

    4. 上代碼:

    5. if (alloc.isDirectBufferPooled()) {
                  directBuf = alloc.directBuffer(readableBytes);
              } else {
                  directBuf = ThreadLocalPooledByteBuf.newInstance();
              }
              //將非直接內存的數據寫入到直接內存中
              directBuf.writeBytes(buf, readerIndex, readableBytes);
              buf.release();//釋放原先的非directbuffer
              entry.msg = directBuf;//轉換成direct類型的buffer
              // cache ByteBuffer
              ByteBuffer nioBuf = entry.buf = directBuf.internalNioBuffer(0, readableBytes);
              entry.count = 1;
              nioBuffers[nioBufferCount ++] = nioBuf;
              return nioBufferCount;
  2. 半包寫入成功,釋放必要空間的過程:先上代碼:

    1.  for (int i = msgCount; i > 0; i --) {
                          final ByteBuf buf = (ByteBuf) in.current();//獲得當前正在刷新的數據緩衝區
                          final int readerIndex = buf.readerIndex();
                          final int readableBytes = buf.writerIndex() - readerIndex;//獲得當前的bytebuf中能夠讀取的數據字節數
      
                          if (readableBytes < writtenBytes) {//若是寫入的部分大於當前緩存指針的大小的話,那麼就將當前的對象進行釋放
                              in.progress(readableBytes);
                              in.remove();//移動指針,移動到下一個buffer中,經過refCount,安全的進行釋放
                              writtenBytes -= readableBytes;//將變量進行變動,爲一下
                          } else if (readableBytes > writtenBytes) {//該bytebuf刷出了一部分,沒有所有刷出去
                              buf.readerIndex(readerIndex + (int) writtenBytes);//從新設置當前的buffer的大小
                              in.progress(writtenBytes);
                              break;
                          } else { //真好所有刷出
                              in.progress(readableBytes);
                              in.remove();//直接移除(實際上是刪除引用個數)
                              break;
                          }
                      }
    2. public boolean remove() {
              if (isEmpty()) {
                  return false;
              }
      
              Entry e = buffer[flushed];
              Object msg = e.msg;
              if (msg == null) {
                  return false;
              }
      
              ChannelPromise promise = e.promise;
              int size = e.pendingSize;
      
              e.clear();
      
              flushed = flushed + 1 & buffer.length - 1;
      
              safeRelease(msg);//安全釋放,就是將此msg的引用設置爲0
      
              promise.trySuccess();
              decrementPendingOutboundBytes(size);
      
              return true;
          }

刷出數據時,調用 ch.write(nioBuffers, 0, nioBufferCnt)  不會對niobuffers中字節內容產生影響,因此刷出去以後,還要niobuffers中的已經刷出去的bytebuff的引用數設置爲0(in.remove()).恢復其使用。

若是這次flush將全部的數據都刷出去了得話,那麼就遍歷niobuffers,逐個恢復其中每個元素的nioBytebuff的引用狀況.

總結:

flush事件: 就是將entries解析成niobuffers;而且niobuffers中元素都必須是direct類型;若是不是,還用調用Allocator分配一個direct類型,將heap數據寫入到direct內存中,並添加到niobuffers中,恢復heap bytebuf的引用爲0;處理flush事件的時候,要根據寫入的數據量與niobuffers中的bytebuf的字節比較,判斷當前的bytebuf是否已經徹底刷出,若是刷出,就從niobuffers刪除,恢復引用。不然progress方法唄調用。若是沒有將全部的數據刷出得話,還有繼續監聽write事件。

--------------------------------------------------------------------------------------------------------------------------

博文評論回覆:

問題1:CShadow
write 時,用戶本身從池內分配的內存與釋放時不在同一線程,你怎麼釋放?

回答: 雖然再也不同一個線程,可是請記住一點就是 這些線程都會訪問ChannelOutBoundBuffer,其中的buffers中每一個元素都是經過refcount進行引用和釋放的。將refcount設置爲0以後,就釋放!!設置爲1,就被佔用了!Netty爲咱們作好了釋放工做! 當flush以後,remove一個bytebuf的時候,就會本身在覈心代碼中釋放bytebuf的引用個數了!!後期我也會寫一個Netty5中 promise和future的博文。但願CShadow兄弟能夠關注一下!! 最好是經過微博-http://weibo.com/hotbain ,貼一下相關代碼:

/**handler具體實現  */
public class DiscardServerHandler extends ChannelHandlerAdapter {

    @Override
    public void channelRead(final ChannelHandlerContext ctx,final  Object msg) throws Exception {
         new Thread(new MyRunner(ctx)).start();
}

public class MyRunner implements Runnable {

    private ChannelHandlerContext context;
    
    public MyRunner(ChannelHandlerContext context){
        this.context =context;
    }
    
    @Override
    public void run() {
        final ByteBuf byteBuf =context.channel().config().getAllocator().ioBuffer();
        
        if(context.channel().isOpen()){
            ChannelFuture future = context.writeAndFlush(byteBuf.writeBytes("xxxxxxxxxxxx".getBytes()));
            future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                  System.err.println("server write response error,client  host is: " + ((InetSocketAddress) context.channel().remoteAddress()).getHostName()+":"+((InetSocketAddress) context.channel().remoteAddress()).getPort());
                  context.channel().close();
                }else{
                    System.out.println("writeAndFlush is completed");
                }
                /**
                 * 由於在將outbuffer中刪除數據的時候已經調用saferelease方法了 
                具體請查看 ChannelOutboundBuffer.remove 方法  由於allcator是共享變量,因此在業務線程中釋放refcount也是對Allocator內存的釋放。
                    若是不想使用原socket的Allocator,能夠爲當前的thread自定義生成一個Allocator,放到threadLocal中
                 * */
               // byteBuf.release(); 畫蛇添足 可是,若是byteBuf是本身手動建立的,那麼此處的釋放代碼就能夠根據bytebuf是否耗費資源決定是否須要手動釋放了!!
              }
            });
        }
        
    }
}
/**ChannelOutboundBuffer.remove 方法實現**/
public boolean remove() {
        if (isEmpty()) {
            return false;
        }

        Entry e = buffer[flushed];
        Object msg = e.msg;
        if (msg == null) {
            return false;
        }

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        e.clear();

        flushed = flushed + 1 & buffer.length - 1;
        
        safeRelease(msg);//自動釋放引用,是當前的msg bytebuf的引用恢復爲0

        promise.trySuccess(); //調用operationComplete回調
        decrementPendingOutboundBytes(size);//調節一下 緩衝大小

        return true;
    }

本文僅僅表明本身我的對Netty5的見解,歡迎各位吐槽!!互相學習!!

相關文章
相關標籤/搜索