Netty 4 的 Channel 多了一個 autoread 參數, 它的用處是在讓 channel 在觸發某些事件之後(例如 channelActive, channelReadComplete)之後還會自動調用一次 read(), 代碼:java
DefaultChannelPipeline.javagit
@Override public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; } ----------------------------------- @Override public ChannelPipeline fireChannelReadComplete() { head.fireChannelReadComplete(); if (channel.config().isAutoRead()) { read(); } return this; }
另一個很重要的做用是在觸發某些事件(例如 socket 關閉)時, 在 NioEventloop 的 OP_READ 位爲 1 時, 是否真正的去 channel 讀取數據, 去 channel 讀取數據的意義還包括能夠更新 channel 的狀態, 例如改變 active 狀態等, 代碼:github
Eventloop.javaless
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException e) { unsafe.close(unsafe.voidPromise()); } }
AbstractNioByteChannel.java (unsafe.read())socket
public void read() { final ChannelConfig config = config();
// 判斷是不是 autoread, 若是不是直接移除 opRead 標誌位返回 if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes();
// 這裏會到 channel 中 read 一次, 若是返回 -1, 則表示 channel 關閉
// 假如說 autoread 爲 false, 則在 socket close 時永遠不會進入這裏, 從而改變 channel 的 active 狀態 int localReadAmount = doReadBytes(byteBuf); if (localReadAmount <= 0) { // not was read release the buffer byteBuf.release(); close = localReadAmount < 0; break; } if (!readPendingReset) { readPendingReset = true; setReadPending(false); } pipeline.fireChannelRead(byteBuf); byteBuf = null; if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { // Avoid overflow. totalReadAmount = Integer.MAX_VALUE; break; } totalReadAmount += localReadAmount; // stop reading if (!config.isAutoRead()) { break; } if (localReadAmount < writable) { // Read less than what the buffer can hold, // which might mean we drained the recv buffer completely. break; } } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); // 關閉 channel, 並改變 channel active 狀態 if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } } }