網絡編程 - Netty(Channel)

初始化

網絡編程 - 初識Netty中,咱們看到了客戶端Bootstrap調用channel(NioSocketChannel.class)來傳遞所要初始化的Channel對象。
AbstractBootstrap類,能夠看到傳遞的是ReflectiveChannelFactory,一個Channel的工廠類,賦值給Bootstrap的channelFactory。java

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

ReflectiveChannelFactory實現了ChannelFactory接口和newChannel方法,這個newChannel方法,會經過傳過來的clazz建立一個Channel。這個方法的調用,在Bootstrap的connect()方法中,後面會調用initAndRegister()方法,Bootstrap先暫且略過,咱們看看Channel是怎麼初始化的,初始化又作了哪些事情。編程

public T newChannel() {
    try {
        // 這邊調用NioSocketChannel的構造方法
        return clazz.getConstructor().newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

NioSocketChannelsegmentfault

public NioSocketChannel() {
    // DEFAULT_SELECTOR_PROVIDER是SelectorProvider.provider()
    this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
    // 經過SelectorProvider獲取NIO的SocketChannel
    this(newSocket(provider));
}

private static SocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}

public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    // NioSocketChannel的屬性設置
    config = new NioSocketChannelConfig(this, socket.socket());
}

AbstractNioByteChannelpromise

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    // 關注OP_READ事件
    super(parent, ch, SelectionKey.OP_READ);
}

AbstractNioChannel網絡

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        // 設置非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        // 異常處理,關閉channel並拋異常,代碼略
    }
}

AbstractChannelapp

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    // 建立一個NioSocketChannel.NioSocketChannelUnsafe的內部類對象
    unsafe = newUnsafe();
    // 建立一個Pipeline,並把當前的Channel賦值給Pipeline
    pipeline = newChannelPipeline();
}

至此,Channel已經建立好了,在其中,他也建立了一個unsafe對象、Pipeline對象以及NioSocketChannelConfig對象。
unsafe類圖以下,能夠看出,Channel中與socket交互的,其實就是這個unsafe類。
image.png
ChannelPipeline這個類,後面再詳細的講。socket

註冊

Channel初始化完成後,開始註冊到EventLoop
在AbstractBootstrap的initAndRegister方法中,初始化Channel後,開始執行ChannelFuture regFuture = config().group().register(channel)
MultithreadEventLoopGroup,這個next,就是經過chooserFactory.newChooser獲得的chooser,根據不一樣的策略來獲取EventLoop。也就是說,這個register,其實就是註冊到EventLoop中。ide

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

SingleThreadEventLoop,先是建立DefaultChannelPromise,這邊傳遞兩個參數,一個是channel,一個是this,這個this,就是當前的EventLoop,因此Promise保持了他們兩個的關係。再調用register方法時,調用unsafe的register方法。oop

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

AbstractChannel.AbstractUnsafe,判斷當前調用線程是不是支撐 EventLoop 的線程,若是是直接調用register0,不然,加入到隊列。這樣的好處,就是減小了線程的上下文切換致使的性能消耗。eventLoop的execute方法參見以前的文章性能

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
        // 賦值eventLoop,register0方法會用到
        AbstractChannel.this.eventLoop = eventLoop;
        // 若是當前調用線程正是支撐 EventLoop 的線程,那麼直接調用register0,不然,加入到隊列
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }

最終的註冊方法。javaChannel()返回的是初始化的provider.openSocketChannel(),eventLoop()返回的是上面register方法賦值的eventLoop,經過SelectableChannel的register方法,把Channel的SocketChannel註冊到eventLoop的Selector中。

private void register0(ChannelPromise promise) {
        // 部分代碼略
        doRegister();
        // 部分代碼略
}

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            // 部分代碼略
        }
    }
}
相關文章
相關標籤/搜索