netty源碼解解析(4.0)-12 Channel NIO實現:channel初始化

  建立一個channel實例,並把它register到eventLoopGroup中以後,這個channel而後處於inactive狀態,仍然是不可用的。只有在bind或connect方法調用成功以後才能正常。所以bind或connect算是channel初始化的最後一步,本章這就重點分析這兩個功能的實現。java

  接下來的代碼分析若是沒有特別說明,都是以NioSocketChannel爲例。git

 

  bind實現github

  bind方法的調用棧以下:promise

io.netty.channel.AbstractChannel#bind(java.net.SocketAddress)
io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress)  
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise) io.netty.channel.AbstractChannelHandlerContext#invokeBind io.netty.channel.DefaultChannelPipeline.HeadContext#bind io.netty.channel.AbstractChannel.AbstractUnsafe#bind io.netty.channel.socket.nio.NioSocketChannel#doBind io.netty.channel.socket.nio.NioSocketChannel#doBind0

  爲了能簡單明瞭地展現調用關係,這個調用棧忽略了一些調用。可能有多個AbstractChannelHandlerContext的方法在不一樣的線程中被調用。之後在描述調用棧時也會忽略這一點,再也不贅述。異步

  io.netty.channel.AbstractChannel.AbstractUnsafe#bind執行了主要的bind邏輯,它會調用doBind, 而後在channel的狀態從inactive變成active,就調用pipline的fireChannelActive方法觸發channelActives事件。doBind是io.netty.channel.AbstractChannel定義的抽象方法。NioSocketChannel只須要實現這個方法,整個bind功能就完整了。socket

 1     @Override
 2     protected void doBind(SocketAddress localAddress) throws Exception { 3  doBind0(localAddress); 4  } 5 private void doBind0(SocketAddress localAddress) throws Exception { 6 if (PlatformDependent.javaVersion() >= 7) { 7  SocketUtils.bind(javaChannel(), localAddress); 8 } else { 9  SocketUtils.bind(javaChannel().socket(), localAddress); 10  } 11 }

   SocketUtils封裝了經過AccessController調用JDK的socket API接口,事實上仍是調用Socket或SocketChannel的bind方法。Nio的三個Channel類實現doBind的代碼幾乎同樣。ide

 

  connect實現oop

  connect的調用棧以下:this

io.netty.channel.AbstractChannel#connect(java.net.SocketAddress)
io.netty.channel.DefaultChannelPipeline#connect(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeConnect
io.netty.channel.DefaultChannelPipeline.HeadContext#connect
io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
io.netty.channel.socket.nio.NioSocketChannel#doConnect

  connect的主要邏輯在io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect中實現,它的流程是:spa

  1. 調用doConnect方法,這個方法是AbstractNioChanne定義的抽象方法。

  2. 若是doConnect成功,且channel的狀態從inactive變成active,則調用pipeline的fireChannelActive方法觸發channelActive事件。

  3. 若是doConnection失敗,調用close關閉channel。

  io.netty.channel.socket.nio.NioSocketChannel#doConnect中是socket connect API的調用。下面是connect的關鍵代碼。

 1 @Override
 2 public final void connect( 3 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { 4 if (!promise.setUncancellable() || !ensureOpen(promise)) { 5 return; 6  } 7 8 try { 9 if (connectPromise != null) { 10 // Already a connect in process. 11 throw new ConnectionPendingException(); 12  } 13 14 boolean wasActive = isActive(); 15 if (doConnect(remoteAddress, localAddress)) { 16  fulfillConnectPromise(promise, wasActive); 17 } else { 18 connectPromise = promise; 19 requestedRemoteAddress = remoteAddress; 20 21 // Schedule connect timeout. 22 int connectTimeoutMillis = config().getConnectTimeoutMillis(); 23 if (connectTimeoutMillis > 0) { 24 connectTimeoutFuture = eventLoop().schedule(new Runnable() { 25  @Override 26 public void run() { 27 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; 28 ConnectTimeoutException cause = 29 new ConnectTimeoutException("connection timed out: " + remoteAddress); 30 if (connectPromise != null && connectPromise.tryFailure(cause)) { 31 close(voidPromise()); 32  } 33  } 34  }, connectTimeoutMillis, TimeUnit.MILLISECONDS); 35  } 36 37 promise.addListener(new ChannelFutureListener() { 38  @Override 39 public void operationComplete(ChannelFuture future) throws Exception { 40 if (future.isCancelled()) { 41 if (connectTimeoutFuture != null) { 42 connectTimeoutFuture.cancel(false); 43  } 44 connectPromise = null; 45 close(voidPromise()); 46  } 47  } 48  }); 49  } 50 } catch (Throwable t) { 51  promise.tryFailure(annotateConnectException(t, remoteAddress)); 52 closeIfClosed(); 53  } 54 } 55 56 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { 57 if (promise == null) { 58 return; 59  } 60 boolean active = isActive(); 61 boolean promiseSet = promise.trySuccess(); 62 63 if (!wasActive && active) { 64 pipeline().fireChannelActive(); 65  } 66 if (!promiseSet) { 67  close(voidPromise()); 68  } 69 }

  第14,15行和整個fulfillConnectPromise方法處理正常流程。

  第18-52行處理異常流程。代碼雖然多,但總結起來就一句話: 設置promis返回錯誤,確保可以調用close方法

  io.netty.channel.socket.nio.NioSocketChannel#doConnect實現和doBind實現相似:

 1 @Override
 2 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { 3 if (localAddress != null) { 4  doBind0(localAddress); 5  } 6 7 boolean success = false; 8 try { 9 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); 10 if (!connected) { 11  selectionKey().interestOps(SelectionKey.OP_CONNECT); 12  } 13 success = true; 14 return connected; 15 } finally { 16 if (!success) { 17  doClose(); 18  } 19  } 20 }

  在第11行,註冊OP_CONNECT事件。因爲channel在初始化是被設置成非阻塞模式,connect方法可能返回false, 若是返回false表示connect操做沒有完成,須要經過selector關注OP_CONNECT事件,把connect變成一個異步過程。只有異步調用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect以後,connect纔算完成。finishConnect在eventLoop中被調用:

1 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
2 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
3     int ops = k.interestOps();
4     ops &= ~SelectionKey.OP_CONNECT;
5     k.interestOps(ops);
6     unsafe.finishConnect();
7 }

   finishConnection的實現以下:

 1 //io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect
 2 @Override
 3 public final void finishConnect() {
 4     // Note this method is invoked by the event loop only if the connection attempt was
 5     // neither cancelled nor timed out.
 6 
 7     assert eventLoop().inEventLoop();
 8     try {
 9         boolean wasActive = isActive();
10         doFinishConnect();
11         fulfillConnectPromise(connectPromise, wasActive);
12     } catch (Throwable t) {
13         fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
14     } finally {
15         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
16         // See https://github.com/netty/netty/issues/1770
17         if (connectTimeoutFuture != null) {
18             connectTimeoutFuture.cancel(false);
19         }
20         connectPromise = null;
21     }
22 }
23 
24 //io.netty.channel.socket.nio.NioSocketChannel#doFinishConnect
25 @Override
26 protected void doFinishConnect() throws Exception {
27     if (!javaChannel().finishConnect()) {
28         throw new Error();
29     }
30 }

  9-11行是finishConnection的關鍵代碼, 先調用doFinishConnect執行完成鏈接以後的操做,NioSocketChannel實現是檢查鏈接是否真的已經完成(27-29行),而後調用fulfillConnectPromise觸發事件,設置promise返回值。在前面分析netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect代碼時,能夠看到在doConnect調用成功之後會當即調用這個方法。這個方法被調用兩次是爲了確保channelActive事件必定會被觸發一次。

 

  localAddress,remoteAddress實現:獲得channel的本地和遠程地址

  這個兩個方法的實現幾乎同樣,這裏只分析localAddress,它的調用棧以下:

1 io.netty.channel.AbstractChannel#localAddress
2 io.netty.channel.AbstractChannel.AbstractUnsafe#localAddress 3 io.netty.channel.socket.nio.NioSocketChannel#localAddress0

  這個方法不會觸發任何事件,所以沒有經過pipline調用unsafe,它直接調用unsafe的方法:

 1 //io.netty.channel.AbstractChannel#localAddress
 2 @Override
 3 public SocketAddress localAddress() { 4 SocketAddress localAddress = this.localAddress; 5 if (localAddress == null) { 6 try { 7 this.localAddress = localAddress = unsafe().localAddress(); 8 } catch (Throwable t) { 9 // Sometimes fails on a closed socket in Windows. 10 return null; 11  } 12  } 13 return localAddress; 14 }

  在第7行直接調用unsafe的locallAddress方法,這個方法在AbstractUnsafe中實現,它調用了localAddress0,這一個protected的抽象方法,在NioSocketChannel中的實現是:

1 @Override
2 protected SocketAddress localAddress0() { 3 return javaChannel().socket().getLocalSocketAddress(); 4 }
相關文章
相關標籤/搜索