在網絡編程 - 初識Netty中,咱們看到了客戶端Bootstrap調用channel(NioSocketChannel.class)
來傳遞所要初始化的Channel對象。
AbstractBootstrap類,能夠看到傳遞的是ReflectiveChannelFactory,一個Channel的工廠類,賦值給Bootstrap的channelFactory。java
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
ReflectiveChannelFactory實現了ChannelFactory接口和newChannel方法,這個newChannel方法,會經過傳過來的clazz建立一個Channel。這個方法的調用,在Bootstrap的connect()
方法中,後面會調用initAndRegister()
方法,Bootstrap先暫且略過,咱們看看Channel是怎麼初始化的,初始化又作了哪些事情。編程
public T newChannel() { try { // 這邊調用NioSocketChannel的構造方法 return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
NioSocketChannelsegmentfault
public NioSocketChannel() { // DEFAULT_SELECTOR_PROVIDER是SelectorProvider.provider() this(DEFAULT_SELECTOR_PROVIDER); } public NioSocketChannel(SelectorProvider provider) { // 經過SelectorProvider獲取NIO的SocketChannel this(newSocket(provider)); } private static SocketChannel newSocket(SelectorProvider provider) { try { return provider.openSocketChannel(); } catch (IOException e) { throw new ChannelException("Failed to open a socket.", e); } } public NioSocketChannel(SocketChannel socket) { this(null, socket); } public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); // NioSocketChannel的屬性設置 config = new NioSocketChannelConfig(this, socket.socket()); }
AbstractNioByteChannelpromise
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { // 關注OP_READ事件 super(parent, ch, SelectionKey.OP_READ); }
AbstractNioChannel網絡
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { // 設置非阻塞 ch.configureBlocking(false); } catch (IOException e) { // 異常處理,關閉channel並拋異常,代碼略 } }
AbstractChannelapp
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); // 建立一個NioSocketChannel.NioSocketChannelUnsafe的內部類對象 unsafe = newUnsafe(); // 建立一個Pipeline,並把當前的Channel賦值給Pipeline pipeline = newChannelPipeline(); }
至此,Channel已經建立好了,在其中,他也建立了一個unsafe對象、Pipeline對象以及NioSocketChannelConfig對象。
unsafe類圖以下,能夠看出,Channel中與socket交互的,其實就是這個unsafe類。
ChannelPipeline這個類,後面再詳細的講。socket
Channel初始化完成後,開始註冊到EventLoop。
在AbstractBootstrap的initAndRegister方法中,初始化Channel後,開始執行ChannelFuture regFuture = config().group().register(channel)
。
MultithreadEventLoopGroup,這個next,就是經過chooserFactory.newChooser
獲得的chooser,根據不一樣的策略來獲取EventLoop。也就是說,這個register,其實就是註冊到EventLoop中。ide
public ChannelFuture register(Channel channel) { return next().register(channel); }
SingleThreadEventLoop,先是建立DefaultChannelPromise,這邊傳遞兩個參數,一個是channel,一個是this,這個this,就是當前的EventLoop,因此Promise保持了他們兩個的關係。再調用register方法時,調用unsafe的register方法。oop
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
AbstractChannel.AbstractUnsafe,判斷當前調用線程是不是支撐 EventLoop 的線程,若是是直接調用register0,不然,加入到隊列。這樣的好處,就是減小了線程的上下文切換致使的性能消耗。eventLoop的execute方法參見以前的文章。性能
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 賦值eventLoop,register0方法會用到 AbstractChannel.this.eventLoop = eventLoop; // 若是當前調用線程正是支撐 EventLoop 的線程,那麼直接調用register0,不然,加入到隊列 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
最終的註冊方法。javaChannel()返回的是初始化的provider.openSocketChannel()
,eventLoop()返回的是上面register方法賦值的eventLoop,經過SelectableChannel的register方法,把Channel的SocketChannel註冊到eventLoop的Selector中。
private void register0(ChannelPromise promise) { // 部分代碼略 doRegister(); // 部分代碼略 } protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // 部分代碼略 } } }