咱們在Netty學習系列五的最後提出了一些問題還沒獲得回答,今天來經過學習NioServerSocketChannel的源碼來幫咱們找到以前問題的答案。html
先看一下NioServerSocketChannel的繼承結構。java
AttributeMap接口及DefaultAttributeMap主要是提供了體檢屬性和獲取屬性的能力,便於咱們爲Channel綁定額外的屬性。git
AbstractChannel實現了Channel接口,實現了Channel通用的行爲和方法,咱們在Netty學習系列四中已經介紹過了。github
AbstractNioChannel抽象類關聯了Channel接口與JDK的NIOChannel,也就是讓底層的通訊交給Nio來實現。promise
簡單介紹下源碼:app
1 public abstract class AbstractNioChannel extends AbstractChannel { 2 3 private static final InternalLogger logger = 4 InternalLoggerFactory.getInstance(AbstractNioChannel.class); 5 6 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace( 7 new ClosedChannelException(), AbstractNioChannel.class, "doClose()"); 8 9 //和Java NIO的Channel綁定 10 private final SelectableChannel ch; 11 //爲SelectableChannel註冊的時間 12 protected final int readInterestOp; 13 volatile SelectionKey selectionKey; 14 15 boolean readPending; 16 17 private final Runnable clearReadPendingRunnable = new Runnable() { 18 @Override 19 public void run() { 20 clearReadPending0(); 21 } 22 }; 23 24 /** 25 * The future of the current connection attempt. If not null, subsequent 26 * connection attempts will fail. 27 */ 28 private ChannelPromise connectPromise; 29 private ScheduledFuture<?> connectTimeoutFuture; 30 private SocketAddress requestedRemoteAddress; 31 32 //構造函數,參數分別爲父Channel,要封裝的SelectableChannel和註冊的感興趣的事件 33 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 34 super(parent); 35 this.ch = ch; 36 this.readInterestOp = readInterestOp; 37 try { 38 //將SelectableChannel設置爲非阻塞 39 ch.configureBlocking(false); 40 } catch (IOException e) { 41 try { 42 ch.close(); 43 } catch (IOException e2) { 44 if (logger.isWarnEnabled()) { 45 logger.warn( 46 "Failed to close a partially initialized socket.", e2); 47 } 48 } 49 50 throw new ChannelException("Failed to enter non-blocking mode.", e); 51 } 52 } 53 54 //通道是否打開 55 @Override 56 public boolean isOpen() { 57 return ch.isOpen(); 58 } 59 60 //返回更具體的Unsafe子類 61 @Override 62 public NioUnsafe unsafe() { 63 return (NioUnsafe) super.unsafe(); 64 } 65 66 //返回內部封裝的SelectableChannel 67 protected SelectableChannel javaChannel() { 68 return ch; 69 } 70 71 //返回EventLoop更具體的子類 72 @Override 73 public NioEventLoop eventLoop() { 74 return (NioEventLoop) super.eventLoop(); 75 } 76 77 //返回SelectionKey 78 protected SelectionKey selectionKey() { 79 assert selectionKey != null; 80 return selectionKey; 81 } 82 83 //已廢棄方法 84 @Deprecated 85 protected boolean isReadPending() { 86 return readPending; 87 } 88 89 //已廢棄方法 90 @Deprecated 91 protected void setReadPending(final boolean readPending) { 92 if (isRegistered()) { 93 EventLoop eventLoop = eventLoop(); 94 if (eventLoop.inEventLoop()) { 95 setReadPending0(readPending); 96 } else { 97 eventLoop.execute(new Runnable() { 98 @Override 99 public void run() { 100 setReadPending0(readPending); 101 } 102 }); 103 } 104 } else { 105 // Best effort if we are not registered yet clear readPending. 106 // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is 107 // not set yet so it would produce an assertion failure. 108 this.readPending = readPending; 109 } 110 } 111 112 /** 113 * Set read pending to {@code false}. 114 */ 115 protected final void clearReadPending() { 116 if (isRegistered()) { 117 EventLoop eventLoop = eventLoop(); 118 if (eventLoop.inEventLoop()) { 119 clearReadPending0(); 120 } else { 121 eventLoop.execute(clearReadPendingRunnable); 122 } 123 } else { 124 // Best effort if we are not registered yet clear readPending. This happens during channel initialization. 125 // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is 126 // not set yet so it would produce an assertion failure. 127 readPending = false; 128 } 129 } 130 131 private void setReadPending0(boolean readPending) { 132 this.readPending = readPending; 133 if (!readPending) { 134 ((AbstractNioUnsafe) unsafe()).removeReadOp(); 135 } 136 } 137 138 private void clearReadPending0() { 139 readPending = false; 140 ((AbstractNioUnsafe) unsafe()).removeReadOp(); 141 } 142 143 //Unsafe的具體子類,增長了一些和NioChannel相關的特性 144 public interface NioUnsafe extends Unsafe { 145 //返回內部的SelectableChannel 146 SelectableChannel ch(); 147 148 //鏈接完成 149 void finishConnect(); 150 151 //讀方法 152 void read(); 153 154 //強制刷新 155 void forceFlush(); 156 } 157 158 //NioUnsafe的抽象實現 159 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { 160 161 protected final void removeReadOp() { 162 SelectionKey key = selectionKey(); 163 // Check first if the key is still valid as it may be canceled as part of the deregistration 164 // from the EventLoop 165 // See https://github.com/netty/netty/issues/2104 166 if (!key.isValid()) { 167 return; 168 } 169 int interestOps = key.interestOps(); 170 if ((interestOps & readInterestOp) != 0) { 171 // only remove readInterestOp if needed 172 key.interestOps(interestOps & ~readInterestOp); 173 } 174 } 175 176 //返回內部封裝的Channel 177 @Override 178 public final SelectableChannel ch() { 179 return javaChannel(); 180 } 181 182 //connect方法,實際在使用時NioServerSocket是不支持connect的,可是NioSocket會支持 183 @Override 184 public final void connect( 185 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { 186 if (!promise.setUncancellable() || !ensureOpen(promise)) { 187 return; 188 } 189 190 try { 191 if (connectPromise != null) { 192 // Already a connect in process. 193 throw new ConnectionPendingException(); 194 } 195 196 boolean wasActive = isActive(); 197 //調用具體子類的doConnect方法 198 if (doConnect(remoteAddress, localAddress)) { 199 //鏈接成功設置fulfillConnectPromise 200 fulfillConnectPromise(promise, wasActive); 201 } else { 202 //鏈接未成功 203 connectPromise = promise; 204 requestedRemoteAddress = remoteAddress; 205 206 //根據配置的超時時間,設置超時任務,一旦到達超時時間則拋出鏈接失敗的異常 207 int connectTimeoutMillis = config().getConnectTimeoutMillis(); 208 if (connectTimeoutMillis > 0) { 209 connectTimeoutFuture = eventLoop().schedule(new Runnable() { 210 @Override 211 public void run() { 212 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; 213 ConnectTimeoutException cause = 214 new ConnectTimeoutException("connection timed out: " + remoteAddress); 215 if (connectPromise != null && connectPromise.tryFailure(cause)) { 216 close(voidPromise()); 217 } 218 } 219 }, connectTimeoutMillis, TimeUnit.MILLISECONDS); 220 } 221 222 //添加監聽器,若是期間操做成功了,則取消掉超超時任務 223 promise.addListener(new ChannelFutureListener() { 224 @Override 225 public void operationComplete(ChannelFuture future) throws Exception { 226 if (future.isCancelled()) { 227 if (connectTimeoutFuture != null) { 228 connectTimeoutFuture.cancel(false); 229 } 230 connectPromise = null; 231 close(voidPromise()); 232 } 233 } 234 }); 235 } 236 } catch (Throwable t) { 237 //運行出現異常,則設置Promise爲失敗 238 promise.tryFailure(annotateConnectException(t, remoteAddress)); 239 closeIfClosed(); 240 } 241 } 242 243 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { 244 if (promise == null) { 245 // Closed via cancellation and the promise has been notified already. 246 return; 247 } 248 249 // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. 250 // We still need to ensure we call fireChannelActive() in this case. 251 boolean active = isActive(); 252 253 // trySuccess() will return false if a user cancelled the connection attempt. 254 boolean promiseSet = promise.trySuccess(); 255 256 257 //active狀態發生改變,如今已經鏈接成功 258 if (!wasActive && active) { 259 //pipeline產生Active事件在通道中流傳 260 pipeline().fireChannelActive(); 261 } 262 263 // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). 264 if (!promiseSet) { 265 close(voidPromise()); 266 } 267 } 268 269 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { 270 if (promise == null) { 271 // Closed via cancellation and the promise has been notified already. 272 return; 273 } 274 275 // Use tryFailure() instead of setFailure() to avoid the race against cancel(). 276 promise.tryFailure(cause); 277 closeIfClosed(); 278 } 279 280 //鏈接完成,該方法會在鏈接成功後,由EventLoop調用 281 @Override 282 public final void finishConnect() { 283 284 assert eventLoop().inEventLoop(); 285 286 try { 287 boolean wasActive = isActive(); 288 doFinishConnect(); 289 fulfillConnectPromise(connectPromise, wasActive); 290 } catch (Throwable t) { 291 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); 292 } finally { 293 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used 294 // See https://github.com/netty/netty/issues/1770 295 if (connectTimeoutFuture != null) { 296 connectTimeoutFuture.cancel(false); 297 } 298 connectPromise = null; 299 } 300 } 301 302 @Override 303 protected final void flush0() { 304 // Flush immediately only when there's no pending flush. 305 // If there's a pending flush operation, event loop will call forceFlush() later, 306 // and thus there's no need to call it now. 307 if (!isFlushPending()) { 308 super.flush0(); 309 } 310 } 311 312 @Override 313 public final void forceFlush() { 314 // directly call super.flush0() to force a flush now 315 super.flush0(); 316 } 317 318 private boolean isFlushPending() { 319 SelectionKey selectionKey = selectionKey(); 320 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; 321 } 322 } 323 324 //判斷EventLoop和Channel是否匹配 325 @Override 326 protected boolean isCompatible(EventLoop loop) { 327 return loop instanceof NioEventLoop; 328 } 329 330 //註冊 331 @Override 332 protected void doRegister() throws Exception { 333 boolean selected = false; 334 for (;;) { 335 try { 336 //讓內部的javaChannel先註冊的interestOps爲0 337 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 338 return; 339 } catch (CancelledKeyException e) { 340 if (!selected) { 341 // Force the Selector to select now as the "canceled" SelectionKey may still be 342 // cached and not removed because no Select.select(..) operation was called yet. 343 eventLoop().selectNow(); 344 selected = true; 345 } else { 346 // We forced a select operation on the selector before but the SelectionKey is still cached 347 // for whatever reason. JDK bug ? 348 throw e; 349 } 350 } 351 } 352 } 353 354 @Override 355 protected void doDeregister() throws Exception { 356 eventLoop().cancel(selectionKey()); 357 } 358 359 //doBeginRead由read方法調用 360 @Override 361 protected void doBeginRead() throws Exception { 362 final SelectionKey selectionKey = this.selectionKey; 363 if (!selectionKey.isValid()) { 364 return; 365 } 366 367 readPending = true; 368 //從新註冊感興趣的事件 369 final int interestOps = selectionKey.interestOps(); 370 if ((interestOps & readInterestOp) == 0) { 371 selectionKey.interestOps(interestOps | readInterestOp); 372 } 373 } 374 375 /** 376 * Connect to the remote peer 377 */ 378 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; 379 380 /** 381 * Finish the connect 382 */ 383 protected abstract void doFinishConnect() throws Exception; 384 385 //分配直接內存 386 protected final ByteBuf newDirectBuffer(ByteBuf buf) { 387 final int readableBytes = buf.readableBytes(); 388 if (readableBytes == 0) { 389 ReferenceCountUtil.safeRelease(buf); 390 return Unpooled.EMPTY_BUFFER; 391 } 392 393 final ByteBufAllocator alloc = alloc(); 394 if (alloc.isDirectBufferPooled()) { 395 ByteBuf directBuf = alloc.directBuffer(readableBytes); 396 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 397 ReferenceCountUtil.safeRelease(buf); 398 return directBuf; 399 } 400 401 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); 402 if (directBuf != null) { 403 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 404 ReferenceCountUtil.safeRelease(buf); 405 return directBuf; 406 } 407 408 // Allocating and deallocating an unpooled direct buffer is very expensive; give up. 409 return buf; 410 } 411 412 //分配直接內存 413 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) { 414 final int readableBytes = buf.readableBytes(); 415 if (readableBytes == 0) { 416 ReferenceCountUtil.safeRelease(holder); 417 return Unpooled.EMPTY_BUFFER; 418 } 419 420 final ByteBufAllocator alloc = alloc(); 421 if (alloc.isDirectBufferPooled()) { 422 ByteBuf directBuf = alloc.directBuffer(readableBytes); 423 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 424 ReferenceCountUtil.safeRelease(holder); 425 return directBuf; 426 } 427 428 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer(); 429 if (directBuf != null) { 430 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes); 431 ReferenceCountUtil.safeRelease(holder); 432 return directBuf; 433 } 434 435 // Allocating and deallocating an unpooled direct buffer is very expensive; give up. 436 if (holder != buf) { 437 // Ensure to call holder.release() to give the holder a chance to release other resources than its content. 438 buf.retain(); 439 ReferenceCountUtil.safeRelease(holder); 440 } 441 442 return buf; 443 } 444 445 //關閉方法 446 @Override 447 protected void doClose() throws Exception { 448 ChannelPromise promise = connectPromise; 449 if (promise != null) { 450 // Use tryFailure() instead of setFailure() to avoid the race against cancel(). 451 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION); 452 connectPromise = null; 453 } 454 455 ScheduledFuture<?> future = connectTimeoutFuture; 456 if (future != null) { 457 future.cancel(false); 458 connectTimeoutFuture = null; 459 } 460 } 461 }
AbstractNioChannel又有兩個子類,分別是AbstractNioMessageChannel和AbstractNioByteChannel。二者的區別是前者的通道中封裝處理的是Object,然後者的通道中封裝處理的是ByteBuf(或FileRegion)。socket
對於NioServerSocketChannel而言,須要處理的是NioSocketChannel。所以它集成了AbstractNioMessageChannel。ide
AbstractNioMessageChannel源碼:函數
1 public abstract class AbstractNioMessageChannel extends AbstractNioChannel { 2 boolean inputShutdown; 3 4 //構造函數 5 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { 6 //設置父Channel, 內部封裝的JDKchannel和註冊interestOp 7 super(parent, ch, readInterestOp); 8 } 9 10 //返回Unsafe對象 11 @Override 12 protected AbstractNioUnsafe newUnsafe() { 13 return new NioMessageUnsafe(); 14 } 15 16 //讀 17 @Override 18 protected void doBeginRead() throws Exception { 19 if (inputShutdown) { 20 return; 21 } 22 super.doBeginRead(); 23 } 24 25 //AbstractNioUnsafe對象的 26 private final class NioMessageUnsafe extends AbstractNioUnsafe { 27 28 private final List<Object> readBuf = new ArrayList<Object>(); 29 30 @Override 31 public void read() { 32 assert eventLoop().inEventLoop(); 33 final ChannelConfig config = config(); 34 final ChannelPipeline pipeline = pipeline(); 35 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); 36 allocHandle.reset(config); 37 38 boolean closed = false; 39 Throwable exception = null; 40 41 42 try { 43 try { 44 //開始讀操做,主要是調用子類的doReadMessages實現,從SelectableChannel中讀取數據,並封裝到readBuf 45 do { 46 int localRead = doReadMessages(readBuf); 47 if (localRead == 0) { 48 break; 49 } 50 if (localRead < 0) { 51 closed = true; 52 break; 53 } 54 55 allocHandle.incMessagesRead(localRead); 56 } while (allocHandle.continueReading()); 57 } catch (Throwable t) { 58 exception = t; 59 } 60 61 //將讀到的readBuf經過pipline,在通道中流通,便於被通道中的Handler處理 62 int size = readBuf.size(); 63 for (int i = 0; i < size; i ++) { 64 readPending = false; 65 pipeline.fireChannelRead(readBuf.get(i)); 66 } 67 //清空 68 readBuf.clear(); 69 //讀完成,產生readCompleate事件 70 allocHandle.readComplete(); 71 pipeline.fireChannelReadComplete(); 72 73 //若是有異常,則產生異常事件 74 if (exception != null) { 75 closed = closeOnReadError(exception); 76 77 pipeline.fireExceptionCaught(exception); 78 } 79 80 //若是被關閉,則調用關閉 81 if (closed) { 82 inputShutdown = true; 83 if (isOpen()) { 84 close(voidPromise()); 85 } 86 } 87 } finally { 88 // Check if there is a readPending which was not processed yet. 89 // This could be for two reasons: 90 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 91 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 92 // 93 // See https://github.com/netty/netty/issues/2254 94 if (!readPending && !config.isAutoRead()) { 95 removeReadOp(); 96 } 97 } 98 } 99 } 100 101 102 103 //寫操做,NioServerSocketChannel不支持寫 104 @Override 105 protected void doWrite(ChannelOutboundBuffer in) throws Exception { 106 final SelectionKey key = selectionKey(); 107 final int interestOps = key.interestOps(); 108 109 for (;;) { 110 Object msg = in.current(); 111 if (msg == null) { 112 //若是註冊了寫事件,則移除寫事件 113 if ((interestOps & SelectionKey.OP_WRITE) != 0) { 114 key.interestOps(interestOps & ~SelectionKey.OP_WRITE); 115 } 116 break; 117 } 118 try { 119 boolean done = false; 120 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { 121 //具體的寫操做交給子類實現(NioServerSocketChannel不支持寫操做) 122 if (doWriteMessage(msg, in)) { 123 done = true; 124 break; 125 } 126 } 127 128 if (done) { 129 in.remove(); 130 } else { 131 // Did not write all messages. 132 if ((interestOps & SelectionKey.OP_WRITE) == 0) { 133 key.interestOps(interestOps | SelectionKey.OP_WRITE); 134 } 135 break; 136 } 137 } catch (Exception e) { 138 if (continueOnWriteError()) { 139 in.remove(e); 140 } else { 141 throw e; 142 } 143 } 144 } 145 } 146 147 /** 148 * Returns {@code true} if we should continue the write loop on a write error. 149 */ 150 protected boolean continueOnWriteError() { 151 return false; 152 } 153 154 155 protected boolean closeOnReadError(Throwable cause) { 156 if (!isActive()) { 157 // If the channel is not active anymore for whatever reason we should not try to continue reading. 158 return true; 159 } 160 if (cause instanceof PortUnreachableException) { 161 return false; 162 } 163 if (cause instanceof IOException) { 164 // ServerChannel should not be closed even on IOException because it can often continue 165 // accepting incoming connections. (e.g. too many open files) 166 return !(this instanceof ServerChannel); 167 } 168 return true; 169 } 170 171 //讀和寫的具體操做交給子類去實現 172 protected abstract int doReadMessages(List<Object> buf) throws Exception; 173 174 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception; 175 }
最後來看NioServerSocketChannel源碼:oop
1 public class NioServerSocketChannel extends AbstractNioMessageChannel 2 implements io.netty.channel.socket.ServerSocketChannel { 3 4 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16); 5 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); 6 7 private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class); 8 9 //產生NIOServerSocketChannel的方法 10 private static ServerSocketChannel newSocket(SelectorProvider provider) { 11 try { 12 /** 13 * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in 14 * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. 15 * 16 * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. 17 */ 18 return provider.openServerSocketChannel(); 19 } catch (IOException e) { 20 throw new ChannelException( 21 "Failed to open a server socket.", e); 22 } 23 } 24 25 private final ServerSocketChannelConfig config; 26 27 //默認構造函數, ReflectivaChannelFactory利用反射建立Channel時,便是調用了這個方法 28 public NioServerSocketChannel() { 29 this(newSocket(DEFAULT_SELECTOR_PROVIDER)); 30 } 31 32 /** 33 * Create a new instance using the given {@link SelectorProvider}. 34 */ 35 public NioServerSocketChannel(SelectorProvider provider) { 36 this(newSocket(provider)); 37 } 38 39 //將NIO中的ServerSocketChannel封裝成Netty的NioServerSocketChannel 40 public NioServerSocketChannel(ServerSocketChannel channel) { 41 //調用父類的構造函數,注意設置了interestOps爲OP_ACCEPT 42 super(null, channel, SelectionKey.OP_ACCEPT); 43 //建立配置 44 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); 45 } 46 47 //返回以太網地址 48 @Override 49 public InetSocketAddress localAddress() { 50 return (InetSocketAddress) super.localAddress(); 51 } 52 53 //返回元數據信息 54 @Override 55 public ChannelMetadata metadata() { 56 return METADATA; 57 } 58 59 //返回配置 60 @Override 61 public ServerSocketChannelConfig config() { 62 return config; 63 } 64 65 //Channel是否活躍 66 @Override 67 public boolean isActive() { 68 //經過socket的bound狀態來肯定是否爲active 69 return javaChannel().socket().isBound(); 70 } 71 72 //返回遠端地址,ServerSocketChannel沒有對應的遠端地址 73 @Override 74 public InetSocketAddress remoteAddress() { 75 return null; 76 } 77 78 //內部封裝的JDK自帶的Channel 79 @Override 80 protected ServerSocketChannel javaChannel() { 81 return (ServerSocketChannel) super.javaChannel(); 82 } 83 84 @Override 85 protected SocketAddress localAddress0() { 86 return SocketUtils.localSocketAddress(javaChannel().socket()); 87 } 88 89 //經過調用內部封裝的JDK中的NIO channel來綁定地址 90 @Override 91 protected void doBind(SocketAddress localAddress) throws Exception { 92 if (PlatformDependent.javaVersion() >= 7) { 93 javaChannel().bind(localAddress, config.getBacklog()); 94 } else { 95 javaChannel().socket().bind(localAddress, config.getBacklog()); 96 } 97 } 98 99 //關閉通道 100 @Override 101 protected void doClose() throws Exception { 102 javaChannel().close(); 103 } 104 105 //讀消息 106 @Override 107 protected int doReadMessages(List<Object> buf) throws Exception { 108 //其實就是調用ServerSocketChannel的accept方法監聽accept事件,返回SocketChannel 109 SocketChannel ch = SocketUtils.accept(javaChannel()); 110 111 try { 112 //將JDK NIO中的channel封裝成Netty的NioSocketChannel對象,添加進buf中,使其在Pipeline中傳遞 113 if (ch != null) { 114 buf.add(new NioSocketChannel(this, ch)); 115 return 1;//返回數量 116 } 117 } catch (Throwable t) { 118 logger.warn("Failed to create a new channel from an accepted socket.", t); 119 120 try { 121 ch.close(); 122 } catch (Throwable t2) { 123 logger.warn("Failed to close a socket.", t2); 124 } 125 } 126 127 return 0; 128 } 129 130 //NIOServerSocketChannel不支持的部分操做 返回null 或者 UnsuppotedOperationException異常 131 @Override 132 protected boolean doConnect( 133 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { 134 throw new UnsupportedOperationException(); 135 } 136 137 @Override 138 protected void doFinishConnect() throws Exception { 139 throw new UnsupportedOperationException(); 140 } 141 142 @Override 143 protected SocketAddress remoteAddress0() { 144 return null; 145 } 146 147 @Override 148 protected void doDisconnect() throws Exception { 149 throw new UnsupportedOperationException(); 150 } 151 152 @Override 153 protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { 154 throw new UnsupportedOperationException(); 155 } 156 157 @Override 158 protected final Object filterOutboundMessage(Object msg) throws Exception { 159 throw new UnsupportedOperationException(); 160 } 161 162 /********************************************************************/ 163 164 165 private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig { 166 private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { 167 super(channel, javaSocket); 168 } 169 170 @Override 171 protected void autoReadCleared() { 172 clearReadPending(); 173 } 174 } 175 176 // Override just to to be able to call directly via unit tests. 177 @Override 178 protected boolean closeOnReadError(Throwable cause) { 179 return super.closeOnReadError(cause); 180 } 181 }
分析完源碼,再來看看上一篇文章中提出的問題:
爲何一開始register中註冊的interestOps值爲0,而非OP_ACCEPT?又是什麼時候會註冊OP_ACCEPT呢?
首先咱們經過分析NioServerSocketChannel的源碼能夠看到:
channelFactory會經過發射建立NioServerSocketChannel對象。而發射調用的構造函數中設置了readInterestOps的值爲OP_ACCEPT。而在AbstractNioChannel的doBeginRead方法中又會將readInterestOps註冊到channel。
根據方法名咱們能夠猜想在開始讀以前,selectableChannel的interestOps會從0被改成OP_ACCEPT。
爲了證明這點,咱們須要弄清楚開始時register interestOps爲0的時機和調用doBeginRead的時機。
首先註冊interestOps爲0是在AbstractNioChannel的doRegister方法中。咱們知道這個方法發生在channel的註冊階段。
再看doBeginRead的函數調用:
以前已經介紹過了註冊或者綁定成功後,會調用pipeline.fireChannelActive事件。此時的DefaultChannelPipeline除了傳遞channelActive事件以外,還會調用readIfAutoRead()。
這個方法會根據Config配置的AutoRead屬性來決定是否調用read方法。
而這個屬性默認是自動讀的。因而就能夠調用read方法,並最終爲channel註冊OP_ACCEPT事件。