歡迎你們關注個人微博 http://weibo.com/hotbain 會將發佈的開源項目技術貼經過微博通知你們,但願你們可以互勉共進!謝謝!也很但願可以獲得你們對我博文的反饋,寫出更高質量的文章!!java
read事件發生,Netty須要從內核中讀取數據到本身內部能夠管轄的緩衝區,怎麼進行分配?使用完畢後,怎麼釋放?已經write方法調用,怎樣將相應數據進行緩衝區分配,以及write事件發生,flush完成後,怎樣將緩衝區釋放?數組
read內存分配promise
要知道read是怎樣進行進行內存分配的首先要知道是什麼進行分配的,分配完以後,怎麼進行內存回收?每次分配新的ByteBuf大小是多少?緩存
分配內存:假設是初次進行分配(同一個socket屢次進行分配的狀況,後面會講到.),咱們看一下是何時進行分配的.上代碼:安全
int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; do { //多是 direct或者 heap 從與當前socket相關的allocator獲得byteBuf數組 // byteBuf =allocHandle.allocate(allocator); byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); //分一個多大的內存就從socket中讀取多大的數據 int localReadAmount = doReadBytes(byteBuf);//從socket中讀取數據到bytebuf中 if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release();//釋放到Thread Cache中 close = localReadAmount < 0;//是否進行關閉,關鍵要看讀取到的數據的長度是否爲-1; break; } //發起讀取事件---若是是第一次積累數據的話,那麼就會將當前的bytebuf做爲累積對象,供繼續使用 pipeline.fireChannelRead(byteBuf); byteBuf = null;//由pipeline進行byteBuf的釋放 //避免內存溢出, if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead);//每次讀取的消息的數量 //讀取完成---處理完一次 讀取事件 pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; }從中能夠看出,就是經過ByteBufAllocator.alloc(capacity)進行分配的。(capacity參數的大小是不斷變化的。具體的咱們會稍後介紹.)。下面咱們看一下ByteBufAllocator.alloc(capacity)的具體實現:上代碼:數據結構
public ByteBuf ioBuffer(int initialCapacity) { if (PlatformDependent.hasUnsafe()) { return directBuffer(initialCapacity); } return heapBuffer(initialCapacity); } public ByteBuf heapBuffer(int initialCapacity) { return heapBuffer(initialCapacity, Integer.MAX_VALUE); } public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity);//newHeapBuffer是個抽象方法 }由此能夠看出,netty是鼓勵使用直接內存。newHeapBuffer是一個抽象方法,這裏咱們僅僅關注他在類PooledByteBufAllocator的實現(另外一個實現UnpooledByteBufAllocator咱們這就不講了,至於爲何本身去想--緣由很簡單,就是實際使用的狀況不多,好歹前一個仍是用了基於池的分配方式,避免了重複不斷的分配,能夠進行不斷重複的利用。)。上PooledByteBufAllocator的newHeapBuffer實現:less
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { //若是在線程中已經存在一個cache 沒有的話,那麼就會調用initialValue進行初始化 PoolThreadCache cache = threadCache.get(); PoolArena<byte[]> heapArena = cache.heapArena; ByteBuf buf; if (heapArena != null) { buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else {//若是沒有的話,那麼就會分配一個不禁當前的allocator管理的bytebuf buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }這裏出現了一個基於ThreadLocal的變量,這個ThreadLocal存儲的變量類型是PoolThreadCache。PoolThreadCache有個heap和direct的兩個變量,這兩個變量是用來分配direct和heap內存的。咱們來看一下threadCache的初始化代碼:socket
private final PoolArena<byte[]>[] heapArenas; private final PoolArena<ByteBuffer>[] directArenas; final ThreadLocal<PoolThreadCache> threadCache = new ThreadLocal<PoolThreadCache>() { private final AtomicInteger index = new AtomicInteger();//爲每個線程都會分配一個heapArena和directArena //在爲某個線程初次調用get方法時,會調用如下此方法,用於初始化爲當前線程要初始化的數據---一個線程中的內存池盛放的數據能夠是直接內存或者堆內存 protected PoolThreadCache initialValue() { final int idx = index.getAndIncrement(); final PoolArena<byte[]> heapArena; final PoolArena<ByteBuffer> directArena; //爲線程分配的區域能夠是direct和heap的組合 if (heapArenas != null) { heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; } else { heapArena = null; } if (directArenas != null) { directArena = directArenas[Math.abs(idx % directArenas.length)]; } else { directArena = null; } return new PoolThreadCache(heapArena, directArena); } };initialValue方法就是爲當前的thread生成PoolThreadCache對象的初始化代碼。PoolThreadCache的的directArena和heapArena的賦值也是分別從數組directArenas和heapArenas中取摸獲得index,分別摘取兩個元素獲得的。由此能夠看出:ide
假設是個socketChannel爲ABCD都有本身的PooledByteBufAllocator(就是在config()進行設置唄)。不過ABCDsocketChannel有哪個線程進行處理,他們的treadCache都是不可能相同的。由於treadCache沒有static修飾符。可是這裏須要注意一個問題,就是一個pipeline對應一個獨立的PooledByteBufAllocator的時候,PooledByteBufAllocator的成員變量heapArenas和direcArenas數組的長度爲1.不然會形成浪費。由於,threaCache一旦初始化完畢,就不會變化了,使用到的directArena和heapArena就是固定下來了,數組長度長度超過1,數組中的剩餘元素是不會被使用到的(由於每個pipeline對應一個PooledByteBufAllocator)。注意一下(能夠從PooledByteBufAllocator的源代碼中找到的): 怎麼去調整一個PooledByteBufAllocator的變量heapArenas和direcArenas的數組長度呢?咱們能夠經過設置io.netty.allocator.numHeapArenas和io.netty.allocator.numDirectArenas來設置PooledByteBufAllocator中的heapArenas和direcArenas的數組長度(固然,也能夠在初始化PooledByteBufAllocator調用構造函數,進行自定義)。這一點很重要哦!函數
A,B,C,D的pipeline使用同一個PooledByteBufAllocator,可是AB的事件有一個worker線程T1進行處理,可是cd的事件由另外一個worker線程T2處理,那麼cd和Ab使用的threadCache就是不一樣的,由於threadCache都是和線程進行綁定的。這個時候,就要將PooledByteBufAllocator中的heapArenas和direcArenas的數組長度設置的大一點。至於多少合適,具體應用具體對待。
概述一下: 就是一個workerthread能夠管理多個socket的讀寫事件,那麼在進行內存分配時,內存的分配就要使用每個socektChannel的PooledByteBufAllocator對象,爲當前的thread分配的threadCache進行內存分配。PooledByteBufAllocator是基於內存池的形式進行使用的。至於好處,不進行多講了!!
到目前爲止,咱們已經講述了內存分配的對象使用狀況,能夠當作是講述了一下read事件的過程當中,內存分配的對象圖狀況。下面咱們再來看看,PooledByteBufAllocator的heapArenas和directArenas的初始化狀況,上代碼:
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { super(preferDirect);//查看是否字節內存可用,若是可用,則生成一個空directMemory final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); int pageShifts = validateAndCalculatePageShifts(pageSize); if (nHeapArena > 0) {// heapArenas = newArenaArray(nHeapArena); for (int i = 0; i < heapArenas.length; i ++) { heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize); } } else { heapArenas = null; } if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); for (int i = 0; i < directArenas.length; i ++) { directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize); } } else { directArenas = null; } }從中能夠看到PooledByteBufAllocator對象的heapArenas和directArenas分配都是經過直接調用PoolArena.HeapArena和PoolArena.DirectArena進行分配的。至於這兩個靜態方法的具體實現,咱們這裏就不講了,網上資料也有很多。我粘貼一下我本身收集的材料:http://note.youdao.com/share/?id=beb551ba796db0741d4ff75755b70c4a&type=note 這是我本身的網易雲筆記收集的材料,應該夠用了,很簡單,不是多麼複雜,就是一個分大小按組分配 的過程。你們本身看看吧!共享會永遠存在的!!
2. read內存回收:
若是觀察過netty的pipeline,確定會注意到的一點就是第一個ChannelHandler確定是ByteToMessageDecoder,每一次read事件發生,所以分配的byteBuf都是直接調用該Handler的channelRead()方法,至於handler對此bytebuf後續怎樣的處理,上層調用是不進行管理的。也就是說,ByteBuf的一些別的操做(例如釋放、合併等)都是在ByteToMessageDecoder內完成的。下面咱們來看一下ByteToMessageDecoder的channelRead方法的具體實現,看看對byteBuf進行了什麼操做(若是你們看過個人read事件處理的博客,那麼也會提早了解).上代碼
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { RecyclableArrayList out = RecyclableArrayList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { //緩衝區的大小沒有超過須要寫入的數據的大小 if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { expandCumulation(ctx, data.readableBytes()); } cumulation.writeBytes(data);//將數據寫入到積累對象中 data.release();//釋放bytebuffer(heap或者direct)--經過引用的方式進行釋放緩衝區 } //收集完畢以後解析收集到的字符串 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) {//若是累積對象爲null或者沒有可讀內容的話,那麼就將累積對象釋放掉(由於空了或者爲null了) cumulation.release(); cumulation = null; } int size = out.size(); decodeWasNull = size == 0; //針對解析後的out結果,逐個調用message for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } out.recycle(); } } else { ctx.fireChannelRead(msg); } } private void expandCumulation(ChannelHandlerContext ctx, int readable) { ByteBuf oldCumulation = cumulation;//新的容量=舊的容量+可讀取的數量 ---在此處的擴展和初次的分配都是經過同一個allocator進行分配的 cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + readable); cumulation.writeBytes(oldCumulation);//複製的過程 oldCumulation.release();//釋放老的對象 }從前面read事件處理流程博文能夠知曉,ByteToMessageDecoder的cumulation對象起到bytebuf累積做用的對象。當前ByteToMessageDecoder.cumulation不能盛放傳過來的bytebuf,那麼就要調用expandCumulation進行緩衝擴展。在expandCumulation實現中,也是經過Allocator分配一個更大的(可以盛放原先的cumulation數據和即將添加的bytebuf數據之和),而後將原先老的cumulation中的數據複製一下就能夠了。
小總結: 在進行bytebuf累積的過程當中也要面臨着容器cumulation的不斷擴充。每個decoderHandler都會有一個cumulation對象。一個socket對應一個decoderHandler對象。
上面咱們講到解析完數據以後,須要對累積對象bytebuf是怎樣進行釋放? 在channelReadComplete(每一次Read事件發生讀取byte數據完成後)會調用cumulation.discardSomeReadBytes()釋放空間)。再就是要說的就是,經過建立出來的bytebuf msg都是堆類型的,不用了就不用管了,gc會進行垃圾回收的。這個問題你們要記住哦!! 其實byte Msg 的是否爲直接內存或者堆內存都要取決於decode的具體實現是怎樣實現的(我看了一下具體的實現都是heap類型的msg,固然也能夠經過使用(ByteBuf)msg.isDirect()進行判斷)。咱們本身想一下也應該知道,其實msg爲heap類型的是最好的,由於msg會被後續的各類各樣的handler使用。
講一下累積對象的釋放,其實講的就是bytebuf的release方法。
歡迎你們吐槽! 本文僅僅是我本人本身的總結,不是太權威,若是有不一樣意的地方在oschina博客中吐槽
總結一下: 目前爲止,講解了一下,read事件中的分配和釋放問題,總結一下,分配時,是從本pipeline的Allocator爲當前線程分配的treadcache中得到direct或者heap緩衝,取得適合大小的一塊,標記引用數目一下就好了,釋放時,減1! 。一個Allocator的heaps和directs數組長度能夠經過特定參數進行設置。累積對象會在channelReadComplete事件發生時,在ByteToMessageDecoder的channelReadComplete事件處理中調用discardSomeReadBytes(須要瞭解一下ByteBuf的數據結構,本身能夠查看一下是怎麼實現的!)釋放部分數據的。解析出來的msg不用進行手動釋放,由於都是基於非pool的heap類型的,由垃圾進行回收的,之因此netty這樣設計msg的返回類型,依我我的的見解,就是由於msg還會被不少的後續的handler進行訪問,二次解碼等。
write內存分配
其實write事件處理流程僅僅涉及到ms保存到entries數組中。沒有內存分配問題。若是非得說是有內存分配問題得話,那麼write(msg)的msg的類型能夠說成是內存分配問題。msg能夠是bytebuf或者對象類型。能夠調用工具類或者Allocator直接進行內存申請便可。
flush事件內存管理
若是調用write(msg)類型爲bytebuf而且此bytebuf爲堆類型的話,那麼就將其轉換成direct內存。
在分配直接內存的時候,若是爲當前socekt的Allocator爲isDirectBufferPooled的話,那麼就那麼就分配一個直接內存bytebuf。
不是得話,就從線程中綁定的ThreadLocalPooledByteBuf生成一個(此方式咱們後續還會講解)
分配完成後,將原先的數據寫入到此bytebuf,而後釋放老的bytebuf。將新的bytebuf添加到entry的末尾。
上代碼:
if (alloc.isDirectBufferPooled()) { directBuf = alloc.directBuffer(readableBytes); } else { directBuf = ThreadLocalPooledByteBuf.newInstance(); } //將非直接內存的數據寫入到直接內存中 directBuf.writeBytes(buf, readerIndex, readableBytes); buf.release();//釋放原先的非directbuffer entry.msg = directBuf;//轉換成direct類型的buffer // cache ByteBuffer ByteBuffer nioBuf = entry.buf = directBuf.internalNioBuffer(0, readableBytes); entry.count = 1; nioBuffers[nioBufferCount ++] = nioBuf; return nioBufferCount;
半包寫入成功,釋放必要空間的過程:先上代碼:
for (int i = msgCount; i > 0; i --) { final ByteBuf buf = (ByteBuf) in.current();//獲得當前正在刷新的數據緩衝區 final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex;//獲得當前的bytebuf中能夠讀取的數據字節數 if (readableBytes < writtenBytes) {//若是寫入的部分大於當前緩存指針的大小的話,那麼就將當前的對象進行釋放 in.progress(readableBytes); in.remove();//移動指針,移動到下一個buffer中,經過refCount,安全的進行釋放 writtenBytes -= readableBytes;//將變量進行變動,爲一下 } else if (readableBytes > writtenBytes) {//該bytebuf刷出了一部分,沒有所有刷出去 buf.readerIndex(readerIndex + (int) writtenBytes);//從新設置當前的buffer的大小 in.progress(writtenBytes); break; } else { //真好所有刷出 in.progress(readableBytes); in.remove();//直接移除(實際上是刪除引用個數) break; } }
public boolean remove() { if (isEmpty()) { return false; } Entry e = buffer[flushed]; Object msg = e.msg; if (msg == null) { return false; } ChannelPromise promise = e.promise; int size = e.pendingSize; e.clear(); flushed = flushed + 1 & buffer.length - 1; safeRelease(msg);//安全釋放,就是將此msg的引用設置爲0 promise.trySuccess(); decrementPendingOutboundBytes(size); return true; }
刷出數據時,調用 ch.write(nioBuffers, 0, nioBufferCnt) 不會對niobuffers中字節內容產生影響,因此刷出去以後,還要niobuffers中的已經刷出去的bytebuff的引用數設置爲0(in.remove()).恢復其使用。
若是這次flush將全部的數據都刷出去了得話,那麼就遍歷niobuffers,逐個恢復其中每個元素的nioBytebuff的引用狀況.
總結:
flush事件: 就是將entries解析成niobuffers;而且niobuffers中元素都必須是direct類型;若是不是,還用調用Allocator分配一個direct類型,將heap數據寫入到direct內存中,並添加到niobuffers中,恢復heap bytebuf的引用爲0;處理flush事件的時候,要根據寫入的數據量與niobuffers中的bytebuf的字節比較,判斷當前的bytebuf是否已經徹底刷出,若是刷出,就從niobuffers刪除,恢復引用。不然progress方法唄調用。若是沒有將全部的數據刷出得話,還有繼續監聽write事件。
--------------------------------------------------------------------------------------------------------------------------
博文評論回覆:
問題1:CShadow
write 時,用戶本身從池內分配的內存與釋放時不在同一線程,你怎麼釋放?
回答: 雖然再也不同一個線程,可是請記住一點就是 這些線程都會訪問ChannelOutBoundBuffer,其中的buffers中每一個元素都是經過refcount進行引用和釋放的。將refcount設置爲0以後,就釋放!!設置爲1,就被佔用了!Netty爲咱們作好了釋放工做! 當flush以後,remove一個bytebuf的時候,就會本身在覈心代碼中釋放bytebuf的引用個數了!!後期我也會寫一個Netty5中 promise和future的博文。但願CShadow兄弟能夠關注一下!! 最好是經過微博-http://weibo.com/hotbain ,貼一下相關代碼:
/**handler具體實現 */ public class DiscardServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx,final Object msg) throws Exception { new Thread(new MyRunner(ctx)).start(); } public class MyRunner implements Runnable { private ChannelHandlerContext context; public MyRunner(ChannelHandlerContext context){ this.context =context; } @Override public void run() { final ByteBuf byteBuf =context.channel().config().getAllocator().ioBuffer(); if(context.channel().isOpen()){ ChannelFuture future = context.writeAndFlush(byteBuf.writeBytes("xxxxxxxxxxxx".getBytes())); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { System.err.println("server write response error,client host is: " + ((InetSocketAddress) context.channel().remoteAddress()).getHostName()+":"+((InetSocketAddress) context.channel().remoteAddress()).getPort()); context.channel().close(); }else{ System.out.println("writeAndFlush is completed"); } /** * 由於在將outbuffer中刪除數據的時候已經調用saferelease方法了 具體請查看 ChannelOutboundBuffer.remove 方法 由於allcator是共享變量,因此在業務線程中釋放refcount也是對Allocator內存的釋放。 若是不想使用原socket的Allocator,能夠爲當前的thread自定義生成一個Allocator,放到threadLocal中 * */ // byteBuf.release(); 畫蛇添足 可是,若是byteBuf是本身手動建立的,那麼此處的釋放代碼就能夠根據bytebuf是否耗費資源決定是否須要手動釋放了!! } }); } } } /**ChannelOutboundBuffer.remove 方法實現**/ public boolean remove() { if (isEmpty()) { return false; } Entry e = buffer[flushed]; Object msg = e.msg; if (msg == null) { return false; } ChannelPromise promise = e.promise; int size = e.pendingSize; e.clear(); flushed = flushed + 1 & buffer.length - 1; safeRelease(msg);//自動釋放引用,是當前的msg bytebuf的引用恢復爲0 promise.trySuccess(); //調用operationComplete回調 decrementPendingOutboundBytes(size);//調節一下 緩衝大小 return true; }
本文僅僅表明本身我的對Netty5的見解,歡迎各位吐槽!!互相學習!!