本章分析Nio Channel的數據讀取功能的實現。java
Channel讀取數據須要Channel和ChannelHandler配合使用,netty設計數據讀取功能包括三個要素:Channel, EventLoop和ChannelHandler。Channel有個read方法,這個方法不會直接讀取數據,它的做用是通知持有當前channel的eventLoop能夠從這個這個channel讀取數據了,這個方法被調用以後eventLoop會在channel有數據可讀的時候從channel讀出數據而後把數據放在channelRead事件中交給ChannelInboundHandler的channelRead方法處理,當eventLoop發現channel中暫時沒時間可讀會觸發一個channelReadComplete事件。git
read: Nio Channel通知eventLoop開始讀數據github
channel read方法的調用棧:less
1 io.netty.channel.AbstractChannel#read 2 io.netty.channel.DefaultChannelPipeline#read 3 io.netty.channel.AbstractChannelHandlerContext#read 4 io.netty.channel.AbstractChannelHandlerContext#invokeRead 5 io.netty.channel.DefaultChannelPipeline.HeadContext#read 6 io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead 7 io.netty.channel.nio.AbstractNioChannel#doBeginRead
調用channel的read的方法,會觸發read事件,經過pipeline調用AbstractChannel unsafe的beginRead方法,這個方法的語義是通知eventLoop能夠從channel讀數據了,但他沒有實現具體功能,把具體功能留給doBeginRead實現。doBeginRead在AbstractChannel中定義,它是一個抽象方法。AbstractNioChannel實現了這個方法:socket
1 @Override 2 protected void doBeginRead() throws Exception { 3 // Channel.read() or ChannelHandlerContext.read() was called 4 if (inputShutdown) { 5 return; 6 } 7 8 final SelectionKey selectionKey = this.selectionKey; 9 if (!selectionKey.isValid()) { 10 return; 11 } 12 13 readPending = true; 14 15 final int interestOps = selectionKey.interestOps(); 16 if ((interestOps & readInterestOp) == 0) { 17 selectionKey.interestOps(interestOps | readInterestOp); 18 } 19 }
這裏的doBeginRead實現,只有第17行是核心代碼:把readInterestOps保存是的read操做標誌添加到SelectableChannel的SelectionKey中。這裏的readInterestOps是一個類的屬性,在AbstractNioChannel中,它沒有明確的定義,只有一個抽象的定義:NIO中的一個能夠能夠當成read操做的的標誌。在NIO中能夠當成read的有SelectionKey.OP_READ和SelectionKey.OP_ACCEPT。readInterestOps在AbstractNioChannel的構造方法中使用傳入的參數初始化,子類就能夠根據須要肯定interestOps的具體含義。ide
設置好beginRead以後,NioEventLoop就可使用Selector獲得檢測到channel上的read事件了,下面是NioEventLoop中處理read事件的代碼:oop
1 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel) 2 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 3 unsafe.read(); 4 }
這裏調用了unsafe的read的方法,在Channel的Unsafe中並無定義這個方法,它在io.netty.channel.nio.AbstractNioChannel.NioUnsafe中定義,在io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe和io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe中有兩個不一樣的實現。這兩個實現的區別是:NioMessageUnsafe.read是把從channel中讀出的數據轉換成Object, NioByteUnsafe.read是從channel中讀出byte數據流。下面來詳解分析這兩種實現。this
AbstractNioChannel.NioUnsafe.read實現:從channel讀取數據spa
netty在NIO Channel的設計上,把讀數據設計成獨立的抽象層。之因此這樣設計有兩個方面的緣由:.net
接下來開始詳細分析NioUnsafe read方法的兩種不一樣的實現。
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read實現: 從channel中讀出Object
這個實現是主要功能是調用doReadMessages方法,從channel中讀出Object消息,具體的類型這裏沒有限制,doReadMessages是一個抽象方法,留給子類實現, 下面是read方法的實現:
1 //io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe 2 @Override 3 public void read() { 4 assert eventLoop().inEventLoop(); 5 final ChannelConfig config = config(); 6 if (!config.isAutoRead() && !isReadPending()) { 7 // ChannelConfig.setAutoRead(false) was called in the meantime 8 removeReadOp(); 9 return; 10 } 11 12 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); 13 final ChannelPipeline pipeline = pipeline(); 14 boolean closed = false; 15 Throwable exception = null; 16 try { 17 try { 18 for (;;) { 19 int localRead = doReadMessages(readBuf); 20 if (localRead == 0) { 21 break; 22 } 23 if (localRead < 0) { 24 closed = true; 25 break; 26 } 27 28 // stop reading and remove op 29 if (!config.isAutoRead()) { 30 break; 31 } 32 33 if (readBuf.size() >= maxMessagesPerRead) { 34 break; 35 } 36 } 37 } catch (Throwable t) { 38 exception = t; 39 } 40 setReadPending(false); 41 int size = readBuf.size(); 42 for (int i = 0; i < size; i ++) { 43 pipeline.fireChannelRead(readBuf.get(i)); 44 } 45 46 readBuf.clear(); 47 pipeline.fireChannelReadComplete(); 48 49 if (exception != null) { 50 closed = closeOnReadError(exception); 51 52 pipeline.fireExceptionCaught(exception); 53 } 54 55 if (closed) { 56 if (isOpen()) { 57 close(voidPromise()); 58 } 59 } 60 } finally { 61 // Check if there is a readPending which was not processed yet. 62 // This could be for two reasons: 63 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 64 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 65 // 66 // See https://github.com/netty/netty/issues/2254 67 if (!config.isAutoRead() && !isReadPending()) { 68 removeReadOp(); 69 } 70 } 71 }
第12行,獲得一次循環讀取消息的最大數量maxMessagesPerRead,這個配置的默認值因不一樣的channel類型而不一樣,io.netty.channel.ChannelConfig提供了setMaxMessagesPerRead方法設置這個配置的值。調節這個值的大小能夠影響I/O操做在eventLoop線程分配的執行時間,它的值越大,I/O操做站的時間越大。
18-36行,使用doReadMessages讀取消息,並把消息放到readBuf中,readBuf是List<Object>類型。20,21行,沒有可讀的數據結束循環。23-25行,socket已經關閉。33,34行,readBuf中的消息數量已經超過限制,跳出循環。
41-47行,對readBuf中的每個消息觸發一次channelRead事件,而後清空readBuf, 觸發channelReadComplete事件。
49-53行,處理異常。
55-59行,處理channel正常關閉。
doReadMessages方法有兩個實現。一個是io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages,這個實現中讀出的消息是NioSocketChannel。另外一個是io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages,這個實現中讀出的消息時DatagramPacket。
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages實現代碼:
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3 SocketChannel ch = SocketUtils.accept(javaChannel()); 4 5 try { 6 if (ch != null) { 7 buf.add(new NioSocketChannel(this, ch)); 8 return 1; 9 } 10 } catch (Throwable t) { 11 logger.warn("Failed to create a new channel from an accepted socket.", t); 12 13 try { 14 ch.close(); 15 } catch (Throwable t2) { 16 logger.warn("Failed to close a socket.", t2); 17 } 18 } 19 20 return 0; 21 }
第3行, 使用accept方法獲得一個新的SocketChannel。
7,8行,使用新的SocketChannel建立NioSocketChannel,並把它放到buf中。
11-20行,出現異常,關閉這個socket, 最後返回0.
io.netty.channel.socket.nio.NioDatagramChannel#doReadMessages實現代碼:
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3 DatagramChannel ch = javaChannel(); 4 DatagramChannelConfig config = config(); 5 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 6 if (allocHandle == null) { 7 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 8 } 9 ByteBuf data = allocHandle.allocate(config.getAllocator()); 10 boolean free = true; 11 try { 12 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes()); 13 int pos = nioData.position(); 14 InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(nioData); 15 if (remoteAddress == null) { 16 return 0; 17 } 18 19 int readBytes = nioData.position() - pos; 20 data.writerIndex(data.writerIndex() + readBytes); 21 allocHandle.record(readBytes); 22 23 buf.add(new DatagramPacket(data, localAddress(), remoteAddress)); 24 free = false; 25 return 1; 26 } catch (Throwable cause) { 27 PlatformDependent.throwException(cause); 28 return -1; 29 } finally { 30 if (free) { 31 data.release(); 32 } 33 } 34 }
4-12行,獲得接收數據的緩衝區data。
13-21行,從socket收到一個數據包,這個數據報包含兩部分: data中的二進制數據和發送端的地址remoteAddress(第14行)。而後設置data中的數據長度。
23-25行,把數據報轉換成DatagramPacket類型放到buf中返回。
io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read實現:從channel中讀byte流
這個實現的主要功能是調用doReadBytes讀取byte流。doReadBytes是一個抽象方法,留給子類實現。下面是這個read的實現。
1 @Override 2 public final void read() { 3 final ChannelConfig config = config(); 4 if (!config.isAutoRead() && !isReadPending()) { 5 // ChannelConfig.setAutoRead(false) was called in the meantime 6 removeReadOp(); 7 return; 8 } 9 10 final ChannelPipeline pipeline = pipeline(); 11 final ByteBufAllocator allocator = config.getAllocator(); 12 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); 13 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 14 if (allocHandle == null) { 15 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 16 } 17 18 ByteBuf byteBuf = null; 19 int messages = 0; 20 boolean close = false; 21 try { 22 int totalReadAmount = 0; 23 boolean readPendingReset = false; 24 do { 25 byteBuf = allocHandle.allocate(allocator); 26 int writable = byteBuf.writableBytes(); 27 int localReadAmount = doReadBytes(byteBuf); 28 if (localReadAmount <= 0) { 29 // not was read release the buffer 30 byteBuf.release(); 31 byteBuf = null; 32 close = localReadAmount < 0; 33 if (close) { 34 // There is nothing left to read as we received an EOF. 35 setReadPending(false); 36 } 37 break; 38 } 39 if (!readPendingReset) { 40 readPendingReset = true; 41 setReadPending(false); 42 } 43 pipeline.fireChannelRead(byteBuf); 44 byteBuf = null; 45 46 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { 47 // Avoid overflow. 48 totalReadAmount = Integer.MAX_VALUE; 49 break; 50 } 51 52 totalReadAmount += localReadAmount; 53 54 // stop reading 55 if (!config.isAutoRead()) { 56 break; 57 } 58 59 if (localReadAmount < writable) { 60 // Read less than what the buffer can hold, 61 // which might mean we drained the recv buffer completely. 62 break; 63 } 64 } while (++ messages < maxMessagesPerRead); 65 66 pipeline.fireChannelReadComplete(); 67 allocHandle.record(totalReadAmount); 68 69 if (close) { 70 closeOnRead(pipeline); 71 close = false; 72 } 73 } catch (Throwable t) { 74 handleReadException(pipeline, byteBuf, t, close); 75 } finally { 76 // Check if there is a readPending which was not processed yet. 77 // This could be for two reasons: 78 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 79 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 80 // 81 // See https://github.com/netty/netty/issues/2254 82 if (!config.isAutoRead() && !isReadPending()) { 83 removeReadOp(); 84 } 85 } 86 }
10-16行,獲得一個接受緩衝區的分配器和分配器的的專用handle。這兩個東西的功能是高效的建立大量的接接收數據緩衝區,具體原理和實現會在後面buffer相關章節中詳細分析,這裏暫時略過。
24-64行,這是一個使用doReadBytes讀取數據並觸發channelRead事件的循環。25-27行,獲得一個接受數據的緩衝區,而後從socket中讀取數據。28-38行,沒有數據可讀了,或socket已經斷開了。43行,正確收到了數據,觸發channelRead事件。59-62行,讀出的數據小於緩衝區的長度,表示沒有socket中暫時沒有數據可讀了。 64行,讀取次數大於上限配置,跳出。
66行,讀循環完成,觸發channelReadComplete事件。
69-72, 處理socket正常關閉。
74,83行,處理其餘異常。
doReadBytes只有一個實現:
//io.netty.channel.socket.nio.NioSocketChannel#doWriteBytes @Override protected int doWriteBytes(ByteBuf buf) throws Exception { final int expectedWrittenBytes = buf.readableBytes(); return buf.readBytes(javaChannel(), expectedWrittenBytes); }
這個實現很是簡單,使用ByteBuf的能力從SocketChannel中讀取byte流。