Netty(六):NioServerSocketChannel源碼解析

咱們在Netty學習系列五的最後提出了一些問題還沒獲得回答,今天來經過學習NioServerSocketChannel的源碼來幫咱們找到以前問題的答案。html

先看一下NioServerSocketChannel的繼承結構。java

 

AttributeMap接口及DefaultAttributeMap主要是提供了體檢屬性和獲取屬性的能力,便於咱們爲Channel綁定額外的屬性。git

 

AbstractChannel實現了Channel接口,實現了Channel通用的行爲和方法,咱們在Netty學習系列四中已經介紹過了。github

 

AbstractNioChannel抽象類關聯了Channel接口與JDK的NIOChannel,也就是讓底層的通訊交給Nio來實現。promise

簡單介紹下源碼:app

  1 public abstract class AbstractNioChannel extends AbstractChannel {
  2 
  3     private static final InternalLogger logger =
  4             InternalLoggerFactory.getInstance(AbstractNioChannel.class);
  5 
  6     private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
  7             new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
  8 
  9     //和Java NIO的Channel綁定
 10     private final SelectableChannel ch;
 11     //爲SelectableChannel註冊的時間
 12     protected final int readInterestOp;
 13     volatile SelectionKey selectionKey;
 14 
 15     boolean readPending;
 16     
 17     private final Runnable clearReadPendingRunnable = new Runnable() {
 18         @Override
 19         public void run() {
 20             clearReadPending0();
 21         }
 22     };
 23 
 24     /**
 25      * The future of the current connection attempt.  If not null, subsequent
 26      * connection attempts will fail.
 27      */
 28     private ChannelPromise connectPromise;
 29     private ScheduledFuture<?> connectTimeoutFuture;
 30     private SocketAddress requestedRemoteAddress;
 31 
 32     //構造函數,參數分別爲父Channel,要封裝的SelectableChannel和註冊的感興趣的事件
 33     protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
 34         super(parent);
 35         this.ch = ch;
 36         this.readInterestOp = readInterestOp;
 37         try {
 38             //將SelectableChannel設置爲非阻塞
 39             ch.configureBlocking(false);
 40         } catch (IOException e) {
 41             try {
 42                 ch.close();
 43             } catch (IOException e2) {
 44                 if (logger.isWarnEnabled()) {
 45                     logger.warn(
 46                             "Failed to close a partially initialized socket.", e2);
 47                 }
 48             }
 49 
 50             throw new ChannelException("Failed to enter non-blocking mode.", e);
 51         }
 52     }
 53 
 54     //通道是否打開
 55     @Override
 56     public boolean isOpen() {
 57         return ch.isOpen();
 58     }
 59 
 60     //返回更具體的Unsafe子類
 61     @Override
 62     public NioUnsafe unsafe() {
 63         return (NioUnsafe) super.unsafe();
 64     }
 65 
 66     //返回內部封裝的SelectableChannel
 67     protected SelectableChannel javaChannel() {
 68         return ch;
 69     }
 70 
 71     //返回EventLoop更具體的子類
 72     @Override
 73     public NioEventLoop eventLoop() {
 74         return (NioEventLoop) super.eventLoop();
 75     }
 76 
 77     //返回SelectionKey
 78     protected SelectionKey selectionKey() {
 79         assert selectionKey != null;
 80         return selectionKey;
 81     }
 82 
 83     //已廢棄方法
 84     @Deprecated
 85     protected boolean isReadPending() {
 86         return readPending;
 87     }
 88 
 89     //已廢棄方法
 90     @Deprecated
 91     protected void setReadPending(final boolean readPending) {
 92         if (isRegistered()) {
 93             EventLoop eventLoop = eventLoop();
 94             if (eventLoop.inEventLoop()) {
 95                 setReadPending0(readPending);
 96             } else {
 97                 eventLoop.execute(new Runnable() {
 98                     @Override
 99                     public void run() {
100                         setReadPending0(readPending);
101                     }
102                 });
103             }
104         } else {
105             // Best effort if we are not registered yet clear readPending.
106             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
107             // not set yet so it would produce an assertion failure.
108             this.readPending = readPending;
109         }
110     }
111 
112     /**
113      * Set read pending to {@code false}.
114      */
115     protected final void clearReadPending() {
116         if (isRegistered()) {
117             EventLoop eventLoop = eventLoop();
118             if (eventLoop.inEventLoop()) {
119                 clearReadPending0();
120             } else {
121                 eventLoop.execute(clearReadPendingRunnable);
122             }
123         } else {
124             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
125             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
126             // not set yet so it would produce an assertion failure.
127             readPending = false;
128         }
129     }
130 
131     private void setReadPending0(boolean readPending) {
132         this.readPending = readPending;
133         if (!readPending) {
134             ((AbstractNioUnsafe) unsafe()).removeReadOp();
135         }
136     }
137 
138     private void clearReadPending0() {
139         readPending = false;
140         ((AbstractNioUnsafe) unsafe()).removeReadOp();
141     }
142 
143     //Unsafe的具體子類,增長了一些和NioChannel相關的特性
144     public interface NioUnsafe extends Unsafe {
145         //返回內部的SelectableChannel
146         SelectableChannel ch();
147 
148         //鏈接完成
149         void finishConnect();
150 
151         //讀方法
152         void read();
153 
154         //強制刷新
155         void forceFlush();
156     }
157 
158     //NioUnsafe的抽象實現
159     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
160 
161         protected final void removeReadOp() {
162             SelectionKey key = selectionKey();
163             // Check first if the key is still valid as it may be canceled as part of the deregistration
164             // from the EventLoop
165             // See https://github.com/netty/netty/issues/2104
166             if (!key.isValid()) {
167                 return;
168             }
169             int interestOps = key.interestOps();
170             if ((interestOps & readInterestOp) != 0) {
171                 // only remove readInterestOp if needed
172                 key.interestOps(interestOps & ~readInterestOp);
173             }
174         }
175 
176         //返回內部封裝的Channel
177         @Override
178         public final SelectableChannel ch() {
179             return javaChannel();
180         }
181 
182         //connect方法,實際在使用時NioServerSocket是不支持connect的,可是NioSocket會支持
183         @Override
184         public final void connect(
185                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
186             if (!promise.setUncancellable() || !ensureOpen(promise)) {
187                 return;
188             }
189 
190             try {
191                 if (connectPromise != null) {
192                     // Already a connect in process.
193                     throw new ConnectionPendingException();
194                 }
195 
196                 boolean wasActive = isActive();
197                 //調用具體子類的doConnect方法
198                 if (doConnect(remoteAddress, localAddress)) {
199                     //鏈接成功設置fulfillConnectPromise
200                     fulfillConnectPromise(promise, wasActive);
201                 } else {
202                     //鏈接未成功
203                     connectPromise = promise;
204                     requestedRemoteAddress = remoteAddress;
205 
206                     //根據配置的超時時間,設置超時任務,一旦到達超時時間則拋出鏈接失敗的異常
207                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
208                     if (connectTimeoutMillis > 0) {
209                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
210                             @Override
211                             public void run() {
212                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
213                                 ConnectTimeoutException cause =
214                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
215                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
216                                     close(voidPromise());
217                                 }
218                             }
219                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
220                     }
221 
222                     //添加監聽器,若是期間操做成功了,則取消掉超超時任務
223                     promise.addListener(new ChannelFutureListener() {
224                         @Override
225                         public void operationComplete(ChannelFuture future) throws Exception {
226                             if (future.isCancelled()) {
227                                 if (connectTimeoutFuture != null) {
228                                     connectTimeoutFuture.cancel(false);
229                                 }
230                                 connectPromise = null;
231                                 close(voidPromise());
232                             }
233                         }
234                     });
235                 }
236             } catch (Throwable t) {
237                 //運行出現異常,則設置Promise爲失敗
238                 promise.tryFailure(annotateConnectException(t, remoteAddress));
239                 closeIfClosed();
240             }
241         }
242 
243         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
244             if (promise == null) {
245                 // Closed via cancellation and the promise has been notified already.
246                 return;
247             }
248 
249             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
250             // We still need to ensure we call fireChannelActive() in this case.
251             boolean active = isActive();
252 
253             // trySuccess() will return false if a user cancelled the connection attempt.
254             boolean promiseSet = promise.trySuccess();
255 
256 
257             //active狀態發生改變,如今已經鏈接成功
258             if (!wasActive && active) {
259                 //pipeline產生Active事件在通道中流傳
260                 pipeline().fireChannelActive();
261             }
262 
263             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
264             if (!promiseSet) {
265                 close(voidPromise());
266             }
267         }
268 
269         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
270             if (promise == null) {
271                 // Closed via cancellation and the promise has been notified already.
272                 return;
273             }
274 
275             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
276             promise.tryFailure(cause);
277             closeIfClosed();
278         }
279 
280         //鏈接完成,該方法會在鏈接成功後,由EventLoop調用
281         @Override
282         public final void finishConnect() {
283 
284             assert eventLoop().inEventLoop();
285 
286             try {
287                 boolean wasActive = isActive();
288                 doFinishConnect();
289                 fulfillConnectPromise(connectPromise, wasActive);
290             } catch (Throwable t) {
291                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
292             } finally {
293                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
294                 // See https://github.com/netty/netty/issues/1770
295                 if (connectTimeoutFuture != null) {
296                     connectTimeoutFuture.cancel(false);
297                 }
298                 connectPromise = null;
299             }
300         }
301 
302         @Override
303         protected final void flush0() {
304             // Flush immediately only when there's no pending flush.
305             // If there's a pending flush operation, event loop will call forceFlush() later,
306             // and thus there's no need to call it now.
307             if (!isFlushPending()) {
308                 super.flush0();
309             }
310         }
311 
312         @Override
313         public final void forceFlush() {
314             // directly call super.flush0() to force a flush now
315             super.flush0();
316         }
317 
318         private boolean isFlushPending() {
319             SelectionKey selectionKey = selectionKey();
320             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
321         }
322     }
323 
324     //判斷EventLoop和Channel是否匹配
325     @Override
326     protected boolean isCompatible(EventLoop loop) {
327         return loop instanceof NioEventLoop;
328     }
329 
330     //註冊
331     @Override
332     protected void doRegister() throws Exception {
333         boolean selected = false;
334         for (;;) {
335             try {
336                 //讓內部的javaChannel先註冊的interestOps爲0
337                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
338                 return;
339             } catch (CancelledKeyException e) {
340                 if (!selected) {
341                     // Force the Selector to select now as the "canceled" SelectionKey may still be
342                     // cached and not removed because no Select.select(..) operation was called yet.
343                     eventLoop().selectNow();
344                     selected = true;
345                 } else {
346                     // We forced a select operation on the selector before but the SelectionKey is still cached
347                     // for whatever reason. JDK bug ?
348                     throw e;
349                 }
350             }
351         }
352     }
353 
354     @Override
355     protected void doDeregister() throws Exception {
356         eventLoop().cancel(selectionKey());
357     }
358 
359     //doBeginRead由read方法調用
360     @Override
361     protected void doBeginRead() throws Exception {
362         final SelectionKey selectionKey = this.selectionKey;
363         if (!selectionKey.isValid()) {
364             return;
365         }
366 
367         readPending = true;
368         //從新註冊感興趣的事件
369         final int interestOps = selectionKey.interestOps();
370         if ((interestOps & readInterestOp) == 0) {
371             selectionKey.interestOps(interestOps | readInterestOp);
372         }
373     }
374 
375     /**
376      * Connect to the remote peer
377      */
378     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
379 
380     /**
381      * Finish the connect
382      */
383     protected abstract void doFinishConnect() throws Exception;
384 
385     //分配直接內存
386     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
387         final int readableBytes = buf.readableBytes();
388         if (readableBytes == 0) {
389             ReferenceCountUtil.safeRelease(buf);
390             return Unpooled.EMPTY_BUFFER;
391         }
392 
393         final ByteBufAllocator alloc = alloc();
394         if (alloc.isDirectBufferPooled()) {
395             ByteBuf directBuf = alloc.directBuffer(readableBytes);
396             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
397             ReferenceCountUtil.safeRelease(buf);
398             return directBuf;
399         }
400 
401         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
402         if (directBuf != null) {
403             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
404             ReferenceCountUtil.safeRelease(buf);
405             return directBuf;
406         }
407 
408         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
409         return buf;
410     }
411 
412     //分配直接內存
413     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
414         final int readableBytes = buf.readableBytes();
415         if (readableBytes == 0) {
416             ReferenceCountUtil.safeRelease(holder);
417             return Unpooled.EMPTY_BUFFER;
418         }
419 
420         final ByteBufAllocator alloc = alloc();
421         if (alloc.isDirectBufferPooled()) {
422             ByteBuf directBuf = alloc.directBuffer(readableBytes);
423             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
424             ReferenceCountUtil.safeRelease(holder);
425             return directBuf;
426         }
427 
428         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
429         if (directBuf != null) {
430             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
431             ReferenceCountUtil.safeRelease(holder);
432             return directBuf;
433         }
434 
435         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
436         if (holder != buf) {
437             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
438             buf.retain();
439             ReferenceCountUtil.safeRelease(holder);
440         }
441 
442         return buf;
443     }
444 
445     //關閉方法
446     @Override
447     protected void doClose() throws Exception {
448         ChannelPromise promise = connectPromise;
449         if (promise != null) {
450             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
451             promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
452             connectPromise = null;
453         }
454 
455         ScheduledFuture<?> future = connectTimeoutFuture;
456         if (future != null) {
457             future.cancel(false);
458             connectTimeoutFuture = null;
459         }
460     }
461 }
View Code

 

AbstractNioChannel又有兩個子類,分別是AbstractNioMessageChannel和AbstractNioByteChannel。二者的區別是前者的通道中封裝處理的是Object,然後者的通道中封裝處理的是ByteBuf(或FileRegion)。socket

對於NioServerSocketChannel而言,須要處理的是NioSocketChannel。所以它集成了AbstractNioMessageChannel。ide

AbstractNioMessageChannel源碼:函數

  1 public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
  2     boolean inputShutdown;
  3 
  4     //構造函數
  5     protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
  6         //設置父Channel, 內部封裝的JDKchannel和註冊interestOp
  7         super(parent, ch, readInterestOp);
  8     }
  9 
 10     //返回Unsafe對象
 11     @Override
 12     protected AbstractNioUnsafe newUnsafe() {
 13         return new NioMessageUnsafe();
 14     }
 15 
 16     //
 17     @Override
 18     protected void doBeginRead() throws Exception {
 19         if (inputShutdown) {
 20             return;
 21         }
 22         super.doBeginRead();
 23     }
 24 
 25     //AbstractNioUnsafe對象的
 26     private final class NioMessageUnsafe extends AbstractNioUnsafe {
 27 
 28         private final List<Object> readBuf = new ArrayList<Object>();
 29 
 30         @Override
 31         public void read() {
 32             assert eventLoop().inEventLoop();
 33             final ChannelConfig config = config();
 34             final ChannelPipeline pipeline = pipeline();
 35             final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
 36             allocHandle.reset(config);
 37 
 38             boolean closed = false;
 39             Throwable exception = null;
 40 
 41             
 42             try {
 43                 try {
 44                     //開始讀操做,主要是調用子類的doReadMessages實現,從SelectableChannel中讀取數據,並封裝到readBuf
 45                     do {
 46                         int localRead = doReadMessages(readBuf);
 47                         if (localRead == 0) {
 48                             break;
 49                         }
 50                         if (localRead < 0) {
 51                             closed = true;
 52                             break;
 53                         }
 54 
 55                         allocHandle.incMessagesRead(localRead);
 56                     } while (allocHandle.continueReading());
 57                 } catch (Throwable t) {
 58                     exception = t;
 59                 }
 60 
 61                 //將讀到的readBuf經過pipline,在通道中流通,便於被通道中的Handler處理
 62                 int size = readBuf.size();
 63                 for (int i = 0; i < size; i ++) {
 64                     readPending = false;
 65                     pipeline.fireChannelRead(readBuf.get(i));
 66                 }
 67                 //清空
 68                 readBuf.clear();
 69                 //讀完成,產生readCompleate事件
 70                 allocHandle.readComplete();
 71                 pipeline.fireChannelReadComplete();
 72 
 73                 //若是有異常,則產生異常事件
 74                 if (exception != null) {
 75                     closed = closeOnReadError(exception);
 76 
 77                     pipeline.fireExceptionCaught(exception);
 78                 }
 79 
 80                 //若是被關閉,則調用關閉
 81                 if (closed) {
 82                     inputShutdown = true;
 83                     if (isOpen()) {
 84                         close(voidPromise());
 85                     }
 86                 }
 87             } finally {
 88                 // Check if there is a readPending which was not processed yet.
 89                 // This could be for two reasons:
 90                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
 91                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
 92                 //
 93                 // See https://github.com/netty/netty/issues/2254
 94                 if (!readPending && !config.isAutoRead()) {
 95                     removeReadOp();
 96                 }
 97             }
 98         }
 99     }
100 
101 
102 
103     //寫操做,NioServerSocketChannel不支持寫
104     @Override
105     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
106         final SelectionKey key = selectionKey();
107         final int interestOps = key.interestOps();
108 
109         for (;;) {
110             Object msg = in.current();
111             if (msg == null) {
112                 //若是註冊了寫事件,則移除寫事件
113                 if ((interestOps & SelectionKey.OP_WRITE) != 0) {
114                     key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
115                 }
116                 break;
117             }
118             try {
119                 boolean done = false;
120                 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
121                     //具體的寫操做交給子類實現(NioServerSocketChannel不支持寫操做)
122                     if (doWriteMessage(msg, in)) {
123                         done = true;
124                         break;
125                     }
126                 }
127 
128                 if (done) {
129                     in.remove();
130                 } else {
131                     // Did not write all messages.
132                     if ((interestOps & SelectionKey.OP_WRITE) == 0) {
133                         key.interestOps(interestOps | SelectionKey.OP_WRITE);
134                     }
135                     break;
136                 }
137             } catch (Exception e) {
138                 if (continueOnWriteError()) {
139                     in.remove(e);
140                 } else {
141                     throw e;
142                 }
143             }
144         }
145     }
146 
147     /**
148      * Returns {@code true} if we should continue the write loop on a write error.
149      */
150     protected boolean continueOnWriteError() {
151         return false;
152     }
153 
154 
155     protected boolean closeOnReadError(Throwable cause) {
156         if (!isActive()) {
157             // If the channel is not active anymore for whatever reason we should not try to continue reading.
158             return true;
159         }
160         if (cause instanceof PortUnreachableException) {
161             return false;
162         }
163         if (cause instanceof IOException) {
164             // ServerChannel should not be closed even on IOException because it can often continue
165             // accepting incoming connections. (e.g. too many open files)
166             return !(this instanceof ServerChannel);
167         }
168         return true;
169     }
170 
171     //讀和寫的具體操做交給子類去實現
172     protected abstract int doReadMessages(List<Object> buf) throws Exception;
173 
174     protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
175 }
View Code

 

最後來看NioServerSocketChannel源碼:oop

  1 public class NioServerSocketChannel extends AbstractNioMessageChannel
  2                              implements io.netty.channel.socket.ServerSocketChannel {
  3 
  4     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
  5     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
  6 
  7     private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
  8 
  9     //產生NIOServerSocketChannel的方法
 10     private static ServerSocketChannel newSocket(SelectorProvider provider) {
 11         try {
 12             /**
 13              *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
 14              *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
 15              *
 16              *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
 17              */
 18             return provider.openServerSocketChannel();
 19         } catch (IOException e) {
 20             throw new ChannelException(
 21                     "Failed to open a server socket.", e);
 22         }
 23     }
 24 
 25     private final ServerSocketChannelConfig config;
 26 
 27     //默認構造函數, ReflectivaChannelFactory利用反射建立Channel時,便是調用了這個方法
 28     public NioServerSocketChannel() {
 29         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
 30     }
 31 
 32     /**
 33      * Create a new instance using the given {@link SelectorProvider}.
 34      */
 35     public NioServerSocketChannel(SelectorProvider provider) {
 36         this(newSocket(provider));
 37     }
 38 
 39     //將NIO中的ServerSocketChannel封裝成Netty的NioServerSocketChannel
 40     public NioServerSocketChannel(ServerSocketChannel channel) {
 41         //調用父類的構造函數,注意設置了interestOps爲OP_ACCEPT
 42         super(null, channel, SelectionKey.OP_ACCEPT);
 43         //建立配置
 44         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
 45     }
 46 
 47     //返回以太網地址
 48     @Override
 49     public InetSocketAddress localAddress() {
 50         return (InetSocketAddress) super.localAddress();
 51     }
 52 
 53     //返回元數據信息
 54     @Override
 55     public ChannelMetadata metadata() {
 56         return METADATA;
 57     }
 58 
 59     //返回配置
 60     @Override
 61     public ServerSocketChannelConfig config() {
 62         return config;
 63     }
 64 
 65     //Channel是否活躍
 66     @Override
 67     public boolean isActive() {
 68         //經過socket的bound狀態來肯定是否爲active
 69         return javaChannel().socket().isBound();
 70     }
 71 
 72     //返回遠端地址,ServerSocketChannel沒有對應的遠端地址
 73     @Override
 74     public InetSocketAddress remoteAddress() {
 75         return null;
 76     }
 77 
 78     //內部封裝的JDK自帶的Channel
 79     @Override
 80     protected ServerSocketChannel javaChannel() {
 81         return (ServerSocketChannel) super.javaChannel();
 82     }
 83 
 84     @Override
 85     protected SocketAddress localAddress0() {
 86         return SocketUtils.localSocketAddress(javaChannel().socket());
 87     }
 88 
 89     //經過調用內部封裝的JDK中的NIO channel來綁定地址
 90     @Override
 91     protected void doBind(SocketAddress localAddress) throws Exception {
 92         if (PlatformDependent.javaVersion() >= 7) {
 93             javaChannel().bind(localAddress, config.getBacklog());
 94         } else {
 95             javaChannel().socket().bind(localAddress, config.getBacklog());
 96         }
 97     }
 98 
 99     //關閉通道
100     @Override
101     protected void doClose() throws Exception {
102         javaChannel().close();
103     }
104 
105     //讀消息
106     @Override
107     protected int doReadMessages(List<Object> buf) throws Exception {
108         //其實就是調用ServerSocketChannel的accept方法監聽accept事件,返回SocketChannel
109         SocketChannel ch = SocketUtils.accept(javaChannel());
110 
111         try {
112             //將JDK NIO中的channel封裝成Netty的NioSocketChannel對象,添加進buf中,使其在Pipeline中傳遞
113             if (ch != null) {
114                 buf.add(new NioSocketChannel(this, ch));
115                 return 1;//返回數量
116             }
117         } catch (Throwable t) {
118             logger.warn("Failed to create a new channel from an accepted socket.", t);
119 
120             try {
121                 ch.close();
122             } catch (Throwable t2) {
123                 logger.warn("Failed to close a socket.", t2);
124             }
125         }
126 
127         return 0;
128     }
129 
130     //NIOServerSocketChannel不支持的部分操做 返回null 或者 UnsuppotedOperationException異常
131     @Override
132     protected boolean doConnect(
133             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
134         throw new UnsupportedOperationException();
135     }
136 
137     @Override
138     protected void doFinishConnect() throws Exception {
139         throw new UnsupportedOperationException();
140     }
141 
142     @Override
143     protected SocketAddress remoteAddress0() {
144         return null;
145     }
146 
147     @Override
148     protected void doDisconnect() throws Exception {
149         throw new UnsupportedOperationException();
150     }
151 
152     @Override
153     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
154         throw new UnsupportedOperationException();
155     }
156 
157     @Override
158     protected final Object filterOutboundMessage(Object msg) throws Exception {
159         throw new UnsupportedOperationException();
160     }
161 
162     /********************************************************************/
163 
164 
165     private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
166         private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
167             super(channel, javaSocket);
168         }
169 
170         @Override
171         protected void autoReadCleared() {
172             clearReadPending();
173         }
174     }
175 
176     // Override just to to be able to call directly via unit tests.
177     @Override
178     protected boolean closeOnReadError(Throwable cause) {
179         return super.closeOnReadError(cause);
180     }
181 }
View Code

 

分析完源碼,再來看看上一篇文章中提出的問題:

爲何一開始register中註冊的interestOps值爲0,而非OP_ACCEPT?又是什麼時候會註冊OP_ACCEPT呢?

首先咱們經過分析NioServerSocketChannel的源碼能夠看到:

channelFactory會經過發射建立NioServerSocketChannel對象。而發射調用的構造函數中設置了readInterestOps的值爲OP_ACCEPT。而在AbstractNioChannel的doBeginRead方法中又會將readInterestOps註冊到channel。

根據方法名咱們能夠猜想在開始讀以前,selectableChannel的interestOps會從0被改成OP_ACCEPT。

爲了證明這點,咱們須要弄清楚開始時register interestOps爲0的時機和調用doBeginRead的時機。

首先註冊interestOps爲0是在AbstractNioChannel的doRegister方法中。咱們知道這個方法發生在channel的註冊階段。

再看doBeginRead的函數調用:

以前已經介紹過了註冊或者綁定成功後,會調用pipeline.fireChannelActive事件。此時的DefaultChannelPipeline除了傳遞channelActive事件以外,還會調用readIfAutoRead()。

這個方法會根據Config配置的AutoRead屬性來決定是否調用read方法。

而這個屬性默認是自動讀的。因而就能夠調用read方法,並最終爲channel註冊OP_ACCEPT事件。

相關文章
相關標籤/搜索