public class DiscardServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx,final Object msg) throws Exception { ByteBuf bufferBuf =(ByteBuf)msg; System.out.println(new String(bufferBuf.array())); ctx.channel().write(bufferBuf); }
追蹤一下,ctx.channel().write(bufferBuf)的實現(假設out pipeline中沒有其餘的encode handler了,),咱們會看到,最終會由AbstractUnsafe(AbstractUnsafe是channel的一個內部類對象)的write方法(很好找,順着找就好了,記住,默認pipeline一定會有tail和head兩個handler)進行處理,上代碼:promise
public void write(Object msg, ChannelPromise promise) { if (!isActive()) { // Mark the write request as failure if the channel is inactive. if (isOpen()) { promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); } else { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } // release message now to prevent resource-leak ReferenceCountUtil.release(msg); } else {//往緩存中添加一個消息對象 outboundBuffer.addMessage(msg, promise); } }
這裏關注一下outboundBuffer.addMessage() 到此處,你們就會恍然大悟,知道怎麼回事兒了,就是這樣,僅僅將要寫入的message object寫入到一個buffer中。下面咱們來看一下outboundBuffer.addmessage的實現。緩存
注意: outboundBuffer是一個ChannelOutboundBuffer類型的兌現,每個channel都會一個ChannelOutboundBuffer對象與之關聯,用來盛放欲發送的消息.上代碼證實一切:網絡
protected abstract class AbstractUnsafe implements Unsafe { private ChannelOutboundBuffer outboundBuffer = ChannelOutboundBuffer.newInstance(AbstractChannel.this); private boolean inFlush0; }public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private MessageSizeEstimator.Handle estimatorHandle; private final Channel parent; private final ChannelId id = DefaultChannelId.newInstance(); private final Unsafe unsafe;//channel中有Unsafe引用 private final DefaultChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); /**省略部分代碼***/ private final EventLoop eventLoop;Unsafe對象裏有一個outboundBuffer ,而channel裏有個unsafe引用,因此能夠說,channel與outboundBuffer有has-a關係
看一下ChannelOutboundBuffer outboundBuffer的addMessage實現:
void addMessage(Object msg, ChannelPromise promise) { //預測message的size int size = channel.estimatorHandle().size(msg); if (size < 0) { size = 0; } //建立一個Entry對象,一個entry就是一個欲發送的message以及描述信息 Entry e = buffer[tail++]; e.msg = msg; e.pendingSize = size; e.promise = promise; e.total = total(msg); //由此能夠看出total和pendingSize是相等的 tail &= buffer.length - 1; if (tail == flushed) {//擴展容量 addCapacity(); } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(size);//更新一下,欲發送的字節大小。 }
private void addCapacity() { //更新鏈表的標誌位 int p = flushed; int n = buffer.length; int r = n - p; // number of elements to the right of p 剩餘沒有刷出去的 int s = size(); int newCapacity = n << 1;//擴大一倍 注意 哈哈 if (newCapacity < 0) { throw new IllegalStateException(); } Entry[] e = new Entry[newCapacity];//建立對象數組。擴容後的哦! System.arraycopy(buffer, p, e, 0, r); System.arraycopy(buffer, 0, e, r, p);//拷貝的和爲拷貝的數據(不包括二進制bytebuf數據哦)都要進行復制 for (int i = n; i < e.length; i++) {//將e數組中n到 e[i] = new Entry(); } buffer = e; flushed = 0; unflushed = s; tail = n; }
哈哈 很容易理解的,就是對數組進行擴展。而後複製
到目前爲止,咱們已經講解完了,write的的處理流程。咱們把message放入到buffer中,目的是爲了將其發送到目的socket的內核緩衝區中,何時發送(固然是對應的socket發送write可寫事件的時候)呢? 當writer事件發送的時候,就是咱們將緩衝起來的message flush到socket的內核緩衝區的時候了!!如今開始下一主題:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop /**簡潔起見,省略部分代碼**/ if ((readyOps & SelectionKey.OP_WRITE) != 0) {//對於半包消息進行輸出操做 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } /**簡潔起見,省略部分代碼*/ } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
protected void flush0() { if (inFlush0) { //若是對於一個channel正在進行刷新 // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return;//若是緩存爲空,則直接返回 } inFlush0 = true;//設置正在刷新標誌位 // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); } } finally { inFlush0 = false; } return; } try { doWrite(outboundBuffer); } catch (Throwable t) { outboundBuffer.failFlushed(t); } finally { inFlush0 = false; } }
不用多說,若是write事件發生,可是緩衝區爲空得話,那麼就會直接返回,若是不是的話,再去調用doWrite方法。直接上代碼(doWrite是個抽象方法,由NioSocketChannel實現,本身想爲何選這個類!!能夠看源代碼!從鏈接的建立開始看哦!我有寫鏈接建立的博文哦! 本身看吧! ):
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) {//不斷寫入---將全部的數據flush到內核網絡緩衝區中 // Do non-gathering write for a single buffer case. final int msgCount = in.size(); //對於只有一個message的狀況,直接寫便可 if (msgCount <= 1) { super.doWrite(in); return; } // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers();//將全部的消息轉換成java.nio.ByteBuf數組 if (nioBuffers == null) {//msg不是ByteBuf類型,則也不須要採用gathering write的方式,能夠直接調用父類AbstractNioByteChannel的doWrite方法 super.doWrite(in); return; } int nioBufferCnt = in.nioBufferCount(); //buffer的個數 long expectedWrittenBytes = in.nioBufferSize();//總的buffer的byte的數量 final SocketChannel ch = javaChannel(); long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { //雖然是每次都是針對的messag進行遍歷,可是寫入的時候確實針對的全體內部的buffer進行遍歷 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);//localWrittenBytes寫入的字節數目可能超過了本次遍歷的msg的buffer的承載的個數,因此若是寫回半包得話,須要比較current.msg的大小與此值的大小 if (localWrittenBytes == 0) { setOpWrite = true;//中止寫入,多是由於socket如今不可寫了 break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) {//若是但願寫入的數量爲0,代表所有寫入完畢 done = true; break; } } if (done) {//若是所有發送完畢 // Release all buffers for (int i = msgCount; i > 0; i --) { in.remove();//刪除buffer數組中的元素--挨個刪除 代碼 888 } // Finish the write loop if no new messages were flushed by in.remove(). if (in.isEmpty()) {//若是所有完成的話,,那麼就將其寫半包標誌刪除 clearOpWrite(); break; } } else { // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. //若是沒有所有刷寫完成,那麼就釋放已經寫入的buffer,更新部分寫入的buffer的索引 for (int i = msgCount; i > 0; i --) { //代碼 999 final ByteBuf buf = (ByteBuf) in.current();//獲得當前的直接內存,current()方法調用的不是很深,由於當前的msg已經通過niobuffers轉換成直接類型的了,因此基本上會直接返回。 final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex;//獲得可寫的數據字節數 if (readableBytes < writtenBytes) {//若是寫入的部分大於當前緩存指針的大小的話,那麼就將其釋放 in.progress(readableBytes); in.remove();//移動指針,移動到下一個buffer中 writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { buf.readerIndex(readerIndex + (int) writtenBytes);//從新設置當前的buffer的大小 in.progress(writtenBytes); break; } else { // readableBytes == writtenBytes 寫入的部分 in.progress(readableBytes); in.remove();//直接移除(實際上是刪除引用個數) break; }//代碼 1000 } incompleteWrite(setOpWrite);//註冊write興趣事件 break; } } }
調用java的原生API進行數據寫入,ch.write(nioBuffers, 0, nioBufferCnt).注意 寫入次數是能夠經過WriteSpinCount配置項限制的哦!! 該配置項咱們能夠在初始化或者運行的過程當中經過調用ctx.channel().config()進行配置或者更改。
若是全發送完成,那麼就將緩衝區中的緩衝對象依次清空 見 代碼 888
若是沒有所有發送,就將所有刷出的bytebuf釋放(至於怎樣釋放,會寫一個專門的Netty內存管理的文章),僅僅發送一部分的byte的bytebuf的readerindex進行更改。 見代碼 999和1000之間的代碼
public ByteBuffer[] nioBuffers() { long nioBufferSize = 0;//用來記錄全部須要發送的數據的字節大小 int nioBufferCount = 0;//用來記錄最底層的buffer的個數多少 final int mask = buffer.length - 1; final ByteBufAllocator alloc = channel.alloc();//用於將heap緩衝轉換成direct類型緩衝 ByteBuffer[] nioBuffers = this.nioBuffers; //底層的niobuffer,會對buffer的進行一個解包操做 Object m; int i = flushed; while (i != unflushed && (m = buffer[i].msg) != null) {//逐個遍歷即將發送的bytebuf數據 if (!(m instanceof ByteBuf)) { this.nioBufferCount = 0; this.nioBufferSize = 0; return null; } // Entry entry = buffer[i]; ByteBuf buf = (ByteBuf) m; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex;//能夠讀取的字節數組 if (readableBytes > 0) { nioBufferSize += readableBytes; int count = entry.count;//獲得低層的buffer的個數 if (count == -1) { entry.count = count = buf.nioBufferCount(); } //總的buffer的個數 int neededSpace = nioBufferCount + count; if (neededSpace > nioBuffers.length) {//若是buffer的個數超過了nioBuffers的length進行擴張,按照2倍的係數擴張 this.nioBuffers = nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); } if (buf.isDirect() || threadLocalDirectBufferSize <= 0) { if (count == 1) {//沒有封裝內部的緩存 ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount ++] = nioBuf; } else {//內部有多個buffer ByteBuffer[] nioBufs = entry.buffers; if (nioBufs == null) { // cached ByteBuffers as they may be expensive to create in terms of Object allocation entry.buffers = nioBufs = buf.nioBuffers();//獲得內部緩存 } //進行解壓,並返回內部的全部的緩存的個數(解壓後的哦) nioBufferCount = fillBufferArray(nioBufs, nioBuffers, nioBufferCount); } } else { nioBufferCount = fillBufferArrayNonDirect(entry, buf, readerIndex,//將heap緩存轉換層direct類型 readableBytes, alloc, nioBuffers, nioBufferCount); } } i = i + 1 & mask; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers;
線面咱們來看一下 current()方法的實現:
public Object current() { return current(true); } /** * 將當前即將要刷出的數據寫入到directByteBuf中 * @param preferDirect * @return */ public Object current(boolean preferDirect) { if (isEmpty()) { //若是緩存爲空,則直接返回 return null; } else { // TODO: Think of a smart way to handle ByteBufHolder messages Object msg = buffer[flushed].msg;//獲得即將要刷新的數據緩衝區--buffer[flushed]表示即將要刷新的數據緩衝去 if (threadLocalDirectBufferSize <= 0 || !preferDirect) {//若是線程中沒有直接內存緩衝區可用,不喜歡用堆外緩存 return msg; } if (msg instanceof ByteBuf) { //由此能夠看出message必須是bytebuf類修的 ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) {//是否是直接內存中分配的 //對於nioBuffers以後,已經將全部bytebuf所有轉換成direct類型的了!! return buf; } else { int readableBytes = buf.readableBytes(); if (readableBytes == 0) { //若是沒有了的話,就直接返回 return buf; } // Non-direct buffers are copied into JDK's own internal direct buffer on every I/O. // We can do a better job by using our pooled allocator. If the current allocator does not // pool a direct buffer, we use a ThreadLocal based pool. /** * 非直接內存會被拷貝的jdk本身的內部的直接緩衝區中。咱們能夠經過具備池子功能的分配器來將工做作的更好。 * 若是當前的分配器沒有起到一個緩衝直接內存的做用得話,那麼咱們就會使用基於線程的threadLocal的池子 * */ ByteBufAllocator alloc = channel.alloc();//獲得內存分配器 ByteBuf directBuf; if (alloc.isDirectBufferPooled()) {//是否爲直接內存---分配的內存是否爲直接緩衝做用的 directBuf = alloc.directBuffer(readableBytes); } else {//不然的話,就用與線程綁定的ThreadLocalPooledByteBuf進行二進制數據分配 directBuf = ThreadLocalPooledByteBuf.newInstance(); //從當前線程棧中獲取一個bytebuf--ByteBuffer.allocateDirect(initialCapacity) } //進行必要的數據拷貝--將堆內的數據拷貝到直接內存中 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); current(directBuf);//將原先非direct類型的內存釋放,而且替換成 direct的bytebuf return directBuf; } } return msg; } } /** * Replace the current msg with the given one. * The replaced msg will automatically be released 用一個指定的buffer去替換原先的buffer msg對象。唄替換的對象會自動釋放 */ public void current(Object msg) { Entry entry = buffer[flushed]; safeRelease(entry.msg);// entry.msg = msg; }