netty---------write flush兩個方法到底作了什麼?

上一篇已經看到:netty的讀,是調用unsafe的read方法,把channel中的數據read到byteBuff中的byteBuffer裏,也是封裝了nio的讀。java

那麼根據猜測,netty的寫應該也是調用nio 的 channel的write(byteBuffer),將用戶空間的數據,寫到內核空間。並且nio的write方法對應的netty應該是flush方法,看看是否是這樣。數組

@Override
-----------ChannelHandlerContext的write方法實際上是調用的pipeline的write方法,到最後調用的是tail的write方法,
public final ChannelFuture write(Object msg) { return tail.write(msg); }
--------------------這裏根據傳入的boolean值,決定是write仍是writeAndflush,先看write
private
void write(Object msg, boolean flush, ChannelPromise promise) {
-----------------由於這裏執行的是tail的write方法, 因此要看tail的findContextOutbound的邏輯,tail這個context在初始化的時候inbound是true,可是outbound屬性是false,因此要從tail往前,找outboundHandler,而head的outbound屬性是true的,看它的write方
法,調用的是unsafe的write方法,是AbstractChannel的write方法。看下一段
AbstractChannelHandlerContext next
= findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }

 

@Override
-----------------unsafe的write方法
public final void write(Object msg, ChannelPromise promise) { assertEventLoop();
----------------ChannelOutboundBuffer這個類,維護這一個由他本身的內部類entry組成的一個單鏈表,entry有着指向byteBuffer和byteBuffer數組的指針,本方法的最下面的addMessage,就是新建一個entry,而後放到鏈表裏,ChannelOutboundBuffer本身有三個屬性:
----------------flushedEntry :指向第一個已經被flush的entry
----------------unflushedEntry :指向第一個沒有被flush的entry
----------------tailEntry : 隊尾的指針。這三個其實很簡單
ChannelOutboundBuffer outboundBuffer
= this.outboundBuffer; if (outboundBuffer == null) { safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }

 因此能夠看到,所謂的netty的channelHandlerContext的write方法,其實並非向內核寫入數據,而是把msg放入一個鏈表中,等待flush,而flush方法也是在headContext中,調用的unsafe的flush方法,promise

注意這裏的unsafe是socket

@Override public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; }
------------這個方法是把flushedEntry指針置空,把unFlushedEntry指針放到隊列頭部 outboundBuffer.addFlush();
------------下文 flush0(); }
@Override
-----------------從flush0到這一步的邏輯比較複雜,涉及到各類判斷,簡單起見,直接看這一步NioSocketChannel的doWriteBytes方法,netty的寫,對應的是buf的readBytes,這其實只是方法名字取的問題。回憶以前的doReadBytes對應的是buf的writeBytes方法,
當時是把channel的數據讀到nio
的ByteBuffer中,如今應該是把byteBuffer中的數據write到channel中了,寫到內核
protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
---------------仍是找到以前的那個byteBuf的實現類,能夠看出,仍是調用nio的socketChannel的write方法,把tmpBuf(一個Nio的ByteBuffer)寫到channel中。
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { ensureAccessible(); if (length == 0) { return 0; } ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = buffer.duplicate(); } tmpBuf.clear().position(index).limit(index + length); return out.write(tmpBuf); }
相關文章
相關標籤/搜索