寫數據是NIO Channel實現的另外一個比較複雜的功能。每個channel都有一個outboundBuffer,這是一個輸出緩衝區。當調用channel的write方法寫數據時,這個數據被一系列ChannelOutboundHandler處理以後,它被放進這個緩衝區中,並無真正把數據寫到socket channel中。而後再調用channel的flush方法,flush會把outboundBuffer中數據真正寫到socket channel。正常狀況下flush以後,數據已經真正寫完了。但使用Selector加非阻塞socket的方式寫數據,讓寫操做變得複雜了。操做系統爲每一個socket維護了一個數據發送緩衝區,它的長度SO_SNDBUF, 每次發送數據,先把數據寫到這個緩衝區中,操做系統負責把這個發送緩衝區中的數據發送出去,並清理這個緩衝區。當向緩衝區寫的速率大於系統的發送速率時,它會被填滿,在非阻塞模式下的表現爲: 調用socket的write方法寫入長度爲n數據,實際寫入的數據長度m的範圍是:0=<m<n。這個時候還剩下長度爲n-m的數據沒有寫入到socket,而數據必須以正確的順序完整地寫入到socket中。 outboundBuffer正是爲解決這個問題而設計的,沒寫進socket的剩餘數據會以正確的順序保存在outboundBuffer中,當發送緩衝區中有空間能夠寫時,能夠從outboundBuffer中取出剩餘的數據繼續寫入到socket中。java
Channel write實現: 把數據寫到outboundBuffer中git
write調用棧:github
1 io.netty.channel.AbstractChannel#write(java.lang.Object) 2 io.netty.channel.DefaultChannelPipeline#write(java.lang.Object) 3 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object) 4 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise) 5 io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise) 6 io.netty.channel.AbstractChannelHandlerContext#invokeWrite 7 io.netty.channel.DefaultChannelPipeline.HeadContext#write 8 io.netty.channel.AbstractChannel.AbstractUnsafe#write
write的主要邏輯在io.netty.channel.AbstractChannel.AbstractUnsafe#write中實現,這個方法把要寫的數據msg對象放到outboundBuffer中。在執行close時,netty不但願有但願寫新的數據,避免引發不可預料的錯誤,所以會把outboundBuffer置爲null。這裏在向outboundBuffer寫數據以前會把對它進行檢查,若是是null就拋出錯誤。下面是這個write方法的實現。promise
1 @Override 2 public final void write(Object msg, ChannelPromise promise) { 3 assertEventLoop(); 4 5 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; 6 if (outboundBuffer == null) { 7 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); 8 ReferenceCountUtil.release(msg); 9 return; 10 } 11 12 int size; 13 try { 14 msg = filterOutboundMessage(msg); 15 size = pipeline.estimatorHandle().size(msg); 16 if (size < 0) { 17 size = 0; 18 } 19 } catch (Throwable t) { 20 safeSetFailure(promise, t); 21 ReferenceCountUtil.release(msg); 22 return; 23 } 24 25 outboundBuffer.addMessage(msg, size, promise); 26 }
第5-9行,對outboudBuffer進行檢查,若是是null拋出錯誤。這個裏有個小細節,用一個局部變量引用outboundBuffer,避免由其餘線程對this.outboundBuffer置空引起錯誤。socket
14行,調用filterOutboundMessage對msg進行過濾。這是一個protected方法,默認實現是什麼都沒作,返回輸入的msg參數。子類能夠覆蓋這個方法,把msg轉換成指望的類型。ide
15行,計算msg的長度。oop
25行,把放入到outboundBuffer中。this
Channel flush實現:把數據真正寫到channelspa
flush調用棧:操作系統
1 io.netty.channel.AbstractChannel#flush 2 io.netty.channel.DefaultChannelPipeline#flush 3 io.netty.channel.AbstractChannelHandlerContext#flush 4 io.netty.channel.AbstractChannelHandlerContext#invokeFlush 5 io.netty.channel.DefaultChannelPipeline.HeadContext#flush 6 io.netty.channel.AbstractChannel.AbstractUnsafe#flush 7 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0 8 io.netty.channel.socket.nio.NioSocketChannel#doWrite 9 io.netty.channel.nio.AbstractNioByteChannel#doWrite 10 io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
以上是io.netty.channel.socket.nio.NioSocketChannel的flush調用棧,對於io.netty.channel.socket.nio.NioDatagramChannel來講,從第8行開始變得不一樣:
7 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0 8 io.netty.channel.nio.AbstractNioMessageChannel#doWrite 9 io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage
把Byte數據流寫入channel
io.netty.channel.socket.nio.NioSocketChannel#doWrite是Byte數據流的寫邏輯,io.netty.channel.nio.AbstractNioByteChannel#doWrite也是,這二者不一樣的地方在於前者是在outboundBuffer能夠轉換成java.nio.ByteBuffer的狀況下執行,後者是在outboundBuffer中的msg是ByteBuf或FileRegin類型時執行。除此以外其餘邏輯都同樣:
下面來看看io.netty.channel.socket.nio.NioSocketChannel#doWrite的實現代碼:
1 @Override 2 protected void doWrite(ChannelOutboundBuffer in) throws Exception { 3 for (;;) { 4 int size = in.size(); 5 if (size == 0) { 6 // All written so clear OP_WRITE 7 clearOpWrite(); 8 break; 9 } 10 long writtenBytes = 0; 11 boolean done = false; 12 boolean setOpWrite = false; 13 14 // Ensure the pending writes are made of ByteBufs only. 15 ByteBuffer[] nioBuffers = in.nioBuffers(); 16 int nioBufferCnt = in.nioBufferCount(); 17 long expectedWrittenBytes = in.nioBufferSize(); 18 SocketChannel ch = javaChannel(); 19 20 // Always us nioBuffers() to workaround data-corruption. 21 // See https://github.com/netty/netty/issues/2761 22 switch (nioBufferCnt) { 23 case 0: 24 // We have something else beside ByteBuffers to write so fallback to normal writes. 25 super.doWrite(in); 26 return; 27 case 1: 28 // Only one ByteBuf so use non-gathering write 29 ByteBuffer nioBuffer = nioBuffers[0]; 30 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { 31 final int localWrittenBytes = ch.write(nioBuffer); 32 if (localWrittenBytes == 0) { 33 setOpWrite = true; 34 break; 35 } 36 expectedWrittenBytes -= localWrittenBytes; 37 writtenBytes += localWrittenBytes; 38 if (expectedWrittenBytes == 0) { 39 done = true; 40 break; 41 } 42 } 43 break; 44 default: 45 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { 46 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); 47 if (localWrittenBytes == 0) { 48 setOpWrite = true; 49 break; 50 } 51 expectedWrittenBytes -= localWrittenBytes; 52 writtenBytes += localWrittenBytes; 53 if (expectedWrittenBytes == 0) { 54 done = true; 55 break; 56 } 57 } 58 break; 59 } 60 61 // Release the fully written buffers, and update the indexes of the partially written buffer. 62 in.removeBytes(writtenBytes); 63 64 if (!done) { 65 // Did not write all buffers completely. 66 incompleteWrite(setOpWrite); 67 break; 68 } 69 } 70 }
第5-7行,若是outboundBuffer中已經沒有數據了,調用clearOpWrite方法清除channel SelectionKey上的OP_WRITE事件。
第15-17行,把outboundBuffer轉換成ByteBuffer類型,並獲得數據長度。
25行,outboundBuffer不能轉換成ByteBuffer, 調用io.netty.channel.nio.AbstractNioByteChannel#doWrite執行寫操做。
29-42,45-57的邏輯基本已經,都是儘可能把ByteBuffer中的數據寫到channel中,知足下列條件中的任意一個時,結束本次寫操做:
1. ByteBuffer中的數據已經寫完,正常結束。
2. channel已經不能寫入數據,須要在channel能夠寫是繼續執行寫操做。
3. 者超過channel config中寫入次數限制,須要選擇合適的實際繼續執行寫操做。
62行,把已經寫入到channel的數據從outboundBuffer中刪除。
64-66行, 若是數據沒寫完,調用incompleteWrite處理沒寫完的狀況。當setOpWrite==true時,在channel的SelectionKey上設置OP_WRITE事件,等eventLoop觸發這個事件時再繼續執行flush操做。不然,把flush包裝成task放到eventLoop中排隊執行。
當NioEventLoop檢測到OP_WRITE事件時,會調用processSelectedKey方法處理:
if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
forceFlush的調用棧以下:
1 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#forceFlush 2 io.netty.channel.AbstractChannel.AbstractUnsafe#flush0 3 io.netty.channel.socket.nio.NioSocketChannel#doWrite 4 io.netty.channel.nio.AbstractNioByteChannel#doWrite 5 io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes
把數據寫入UDP類型的channel
io.netty.channel.nio.AbstractNioMessageChannel#doWrite是數據報的寫邏輯。相較於Byte流類型的數據,數據報的寫邏輯簡單一些。它只是把outboundBuffer中的數據報依次寫入到channel中,若是channel寫滿了,在channel的SelectionKey上設置OP_WRITE事件隨後退出,其後OP_WRITE事件處理邏輯和Byte流寫邏輯同樣。 真正的寫操做在io.netty.channel.socket.nio.NioDatagramChannel#doWriteMessage中實現,這個方法的實現以下:
1 @Override 2 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { 3 final SocketAddress remoteAddress; 4 final ByteBuf data; 5 if (msg instanceof AddressedEnvelope) { 6 @SuppressWarnings("unchecked") 7 AddressedEnvelope<ByteBuf, SocketAddress> envelope = (AddressedEnvelope<ByteBuf, SocketAddress>) msg; 8 remoteAddress = envelope.recipient(); 9 data = envelope.content(); 10 } else { 11 data = (ByteBuf) msg; 12 remoteAddress = null; 13 } 14 15 final int dataLen = data.readableBytes(); 16 if (dataLen == 0) { 17 return true; 18 } 19 20 final ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), dataLen); 21 final int writtenBytes; 22 if (remoteAddress != null) { 23 writtenBytes = javaChannel().send(nioData, remoteAddress); 24 } else { 25 writtenBytes = javaChannel().write(nioData); 26 } 27 return writtenBytes > 0; 28 }
5-9行,處理AddressedEnvelope類型的數據報,獲得數據報的遠程地址和數據。
10-12行,發送的是一個ByteBuf。沒有指定遠程地址。這種狀況下須要先調用channel的connect方法。
20-26行,分別針對兩種狀況發送數據報. 23行指定了遠程地址,25行沒有指定遠程地址,但調用過了connect方法。