Netty5源碼分析--2.客戶端啓動過程

實例

樣例代碼來自於io.netty.example.telnet.TelnetClient,完整樣例請參考NettyExample工程。java

客戶端和服務端比較類似,因此本篇會在必定程度上略去重複的部分,以減小篇幅。git

public void run() throws Exception {
    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new TelnetClientInitializer());

        // Start the connection attempt.
        Channel ch = b.connect(host, port).sync().channel();

        // Read commands from the stdin.
        ChannelFuture lastWriteFuture = null;
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        for (;;) {
            String line = in.readLine();
            if (line == null) {
                break;
            }

            // Sends the received line to the server.
            lastWriteFuture = ch.writeAndFlush(line + "\r\n");

            // If user typed the 'bye' command, wait until the server closes
            // the connection.
            if ("bye".equals(line.toLowerCase())) {
                ch.closeFuture().sync();
                break;
            }
        }

        // Wait until all messages are flushed before closing the channel.
        if (lastWriteFuture != null) {
            lastWriteFuture.sync();
        }
    } finally {
        group.shutdownGracefully();
    }
}

客戶端啓動

Bootstrap b = new Bootstrap();  //tag0
b.group(group) //tag1
.channel(NioSocketChannel.class) //tag2
.handler(new TelnetClientInitializer());//tag3

tag0代碼主要初始化了父類的 options和attrs 屬性;代碼略。github

tag1設置了group屬性promise

@SuppressWarnings("unchecked")
public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return (B) this;
}

tag2設置了channelFactory屬性服務器

public Bootstrap channel(Class<? extends Channel> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new BootstrapChannelFactory<Channel>(channelClass));
}

tag3設置了handler屬性app

public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return (B) this; }less

下面開始第二段代碼分析,依次執行下面的方法。socket

Channel ch = b.connect(host, port) //tag4
.sync().channel(); //tag5

 public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }

    validate();
    return doConnect(remoteAddress, localAddress());
}

 private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();//tag4.1
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    final ChannelPromise promise = channel.newPromise();
    if (regFuture.isDone()) {
 doConnect0(regFuture, channel, remoteAddress, localAddress, promise);//tag4.2
    } else {
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
            }
        });
    }

    return promise;
}

分析tag4.1代碼,細心的讀者注意到,這些和服務端的代碼執行過程是同樣的。運用模板模式,子類定義獨特的實現。

final ChannelFuture AbstractBootstrap.initAndRegister() {
    Channel channel;
    try {
        channel = createChannel();//tag4.1.1

    } catch (Throwable t) {
        return VoidChannel.INSTANCE.newFailedFuture(t);
    }

    try {
        init(channel);//tag4.1.2
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        return channel.newFailedFuture(t);
    }

    ChannelPromise regFuture = channel.newPromise();
    channel.unsafe().register(regFuture);//tag4.1.3
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

分析 tag4.1.1,裏面經過反射來實例化NioSocketChannelide

@Override
Channel createChannel() {
    EventLoop eventLoop = group().next();
    return channelFactory().newChannel(eventLoop);//tag4.1.1.1

}

public NioSocketChannel(EventLoop eventLoop) {
    this(eventLoop, newSocket());//調用下面的newSocket()方法
}

private static SocketChannel newSocket() {
    try {
        return SocketChannel.open();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a socket.", e);
    }
}

 public NioSocketChannel(EventLoop eventLoop, SocketChannel socket) {
    this(null, eventLoop, socket);
}

protected AbstractNioByteChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch) {
    super(parent, eventLoop, ch, SelectionKey.OP_READ);//調用父類方法
}

protected AbstractNioChannel(Channel parent, EventLoop eventLoop, SelectableChannel ch, int readInterestOp) {
    super(parent, eventLoop);//調用父類方法,tag4.1.1.1
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);//tag4.1.1.2
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

protected AbstractChannel(Channel parent, EventLoop eventLoop) {
    this.parent = parent;
    this.eventLoop = validate(eventLoop);
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);//tag4.1.1.1.1
}

分析tag4.1.1.1.1,裏面調用DefaultChannelPipeline構造器,和服務端的邏輯同樣,故不做分析。oop

此時系統返回到tag4.1.1.2 繼續執行ch.configureBlocking(false);,此時完成tag4.1.1 方法執行,開始執行tag4.1.2方法

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());//tag4.1.2.1

    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
            try {
                if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue()))  	{
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + channel, t);
            }
        }
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
}

分析tag4.1.2.1,裏面將這個TelnetClientInitializer Handler加入到pipeline中,此時handler鏈是HeadHandler,TelnetClientInitializer,TailHandler,共計3個。

此時程序返回到tag4.1.3繼續執行,

@Override
    public final void AbstractChannel.register(final ChannelPromise promise) {
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);//tag4.1.3.1
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                promise.setFailure(t);
            }
        }
    }




     private void AbstractChannel.AbstractUnsafe.register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!ensureOpen(promise)) {
                return;
            }
            doRegister();//tag4.1.3.1.1   
            registered = true;
            promise.setSuccess();
            pipeline.fireChannelRegistered();//tag4.1.3.1.2
            if (isActive()) {
                pipeline.fireChannelActive();
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            if (!promise.tryFailure(t)) {
                logger.warn(
                        "Tried to fail the registration promise, but it is complete already. " +
                                "Swallowing the cause of the registration failure:", t);
            }
        }
    }

tag4.1.3.1.1 代碼以下

@Override
protected void AbstractNioChannel.doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);//tag4.1.3.1.2.1
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

tag4.1.3.1.2.1 把selector註冊到javaChannel上;而後程序繼續執行tag4.1.3.1.2代碼。

@Override
public ChannelPipeline fireChannelRegistered() {
    head.fireChannelRegistered();
    return this;
}

@Override
public ChannelHandlerContext fireChannelRegistered() {
    DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
    next.invoker.invokeChannelRegistered(next);
    return this;
}

@Override
@SuppressWarnings("unchecked")
public final void ChannelInitializer.channelRegistered(ChannelHandlerContext ctx) throws Exception {
    ChannelPipeline pipeline = ctx.pipeline();
    boolean success = false;
    try {
        initChannel((C) ctx.channel());//tag4.1.3.1.2.1
        pipeline.remove(this);//tag4.1.3.1.2.2
        ctx.fireChannelRegistered();//tag4.1.3.1.2.3
        success = true;
    } catch (Throwable t) {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
    } finally {
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
        if (!success) {
            ctx.close();
        }
    }
}

後面的邏輯和服務端相似,此時執行的handler是TelnetClientInitializer,並執行ChannelInitializer的channelRegistered方法,channelRegistered方法裏面接着調用了initChannel。

標記 tag4.1.3.1.2.1 代碼以下

@Override
public void TelnetClientInitializer.initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();

    // Add the text line codec combination first,
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
            8192, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", DECODER);
    pipeline.addLast("encoder", ENCODER);

    // and then business logic.
    pipeline.addLast("handler", CLIENTHANDLER);
}

在完成tag4.1.3.1.2.2的 pipeline.remove(this);後,此時handler鏈以下:HeadHandler,DelimiterBasedFrameDecoder,StringDecoder,StringEncoder,TelnetClientHandler, TailHandler。

接着程序又開始執行下一個handler,最終找到TailHandler的channelRegistered方法。TailHandler的channelRegistered方法是空方法。

此時 tag4.1 的代碼執行結束,開始執行 tag4.2的代碼

private static void doConnect0( final ChannelFuture regFuture, final Channel channel, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                if (localAddress == null) {
                    channel.connect(remoteAddress, promise);//tag4.2.1
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }
                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

@Override
public ChannelFuture AbstractChannel.connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

通過一番計算,找到HeadHandler,執行unsafe的方法。

@Override
    public void HeadHandler.connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        unsafe.connect(remoteAddress, localAddress, promise);
    }

AbstractNioChannel.AbstractNioUnsafe的 connect方法以下:

@Override
    public void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (!ensureOpen(promise)) {
            return;
        }

        try {
            if (connectPromise != null) {
                throw new IllegalStateException("connection attempt already made");
            }

            boolean wasActive = isActive();
            if (doConnect(remoteAddress, localAddress)) {//tag4.2.1.1
                fulfillConnectPromise(promise, wasActive);//tag4.2.1.2
            } else {
                connectPromise = promise;
                requestedRemoteAddress = remoteAddress;

                // Schedule connect timeout.
                int connectTimeoutMillis = config().getConnectTimeoutMillis();
                if (connectTimeoutMillis > 0) {
                    connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {//tag4.2.1.3
                            ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                            ConnectTimeoutException cause =
                                    new ConnectTimeoutException("connection timed out: " + remoteAddress);
                            if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                close(voidPromise());
                            }
                        }
                    }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                }

                promise.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isCancelled()) {
                            if (connectTimeoutFuture != null) {
                                connectTimeoutFuture.cancel(false);
                            }
                            connectPromise = null;
                            close(voidPromise());
                        }
                    }
                });
            }
        } catch (Throwable t) {
            if (t instanceof ConnectException) {
                Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
                newT.setStackTrace(t.getStackTrace());
                t = newT;
            }
            promise.tryFailure(t);
            closeIfClosed();
        }
    }

tag4.2.1.1 代碼以下,進行了bind本地端口和connect遠程服務器的操做。

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        javaChannel().socket().bind(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = javaChannel().connect(remoteAddress);//tag4.2.1.1.1
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);//tag4.2.1.1.2
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

tag4.2.1.1.2 裏面執行了connect遠程服務器的操做,,個人機器上該方法返回false(返回值詳見connect方法說明)。而後會觸發執行selectionKey().interestOps(SelectionKey.OP_CONNECT);

須要額外說明的是,此時服務器會觸發channelActivi事件。在本例的服務端代碼裏,會在客戶端鏈接時,發送消息給客戶端。不過先暫時忽略服務端和客戶端的數據交互,下文分析。

而後tag4.2.1.1 執行結束,因爲此時的返回值是false,因此不會執行tag4.2.1.2的 fulfillConnectPromise(promise, wasActive);

而後程序繼續執行tag4.2.1.3 代碼,進行鏈接超時處理:若是設置了超時時間,那麼等待指定的超時時間後,再看看是否已經鏈接上。若是連不上,則設置失敗狀態。

接着開始下一個事件循環,因爲在tag4.2.1.1.2執行了selectionKey().interestOps(SelectionKey.OP_CONNECT)操做,會進入到下面的代碼。這裏咱們重點關注tag4.3的代碼。

private static void NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 	//略XXXX
              if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();//tag4.3
    
    //略XXXX

}

@Override
    public void finishConnect() {
        // Note this method is invoked by the event loop only if the connection attempt was
        // neither cancelled nor timed out.

        assert eventLoop().inEventLoop();
        assert connectPromise != null;

        try {
            boolean wasActive = isActive();
            doFinishConnect();//tag4.3.1
            fulfillConnectPromise(connectPromise, wasActive);//tag4.3.2
        } catch (Throwable t) {
            if (t instanceof ConnectException) {
                Throwable newT = new ConnectException(t.getMessage() + ": " + requestedRemoteAddress);
                newT.setStackTrace(t.getStackTrace());
                t = newT;
            }

            // Use tryFailure() instead of setFailure() to avoid the race against cancel().
            connectPromise.tryFailure(t);
            closeIfClosed();
        } finally {
            // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
            // See https://github.com/netty/netty/issues/1770
            if (connectTimeoutFuture != null) {
                connectTimeoutFuture.cancel(false);
            }
            connectPromise = null;
        }
    }

    @Override
protected void NioSocketChannel.doFinishConnect() throws Exception {
    if (!javaChannel().finishConnect()) {
        throw new Error();
    }
}

在執行完下面的boolean promiseSet = promise.trySuccess(); 方法後,實例代碼中的 Channel ch = b.connect(host, port).sync().channel();就執行完畢了,而後主線程就阻塞在實例代碼中的 String line = in.readLine();這句代碼裏了。

private void AbstractNioChannel.AbstractNioUnsafe.fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
        // trySuccess() will return false if a user cancelled the connection attempt.
        boolean promiseSet = promise.trySuccess();

        // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
        // because what happened is what happened.
        if (!wasActive && isActive()) {
            pipeline().fireChannelActive();//tag4.3.2.1
        }

        // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
        if (!promiseSet) {
            close(voidPromise());
        }
    }
    
 
@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();//tag4.3.2.1.1

    if (channel.config().isAutoRead()) {
        channel.read();//tag4.3.2.1.2
    }

    return this;
}

此時,繼續執行tag4.3.2.1的代碼,進而執行tag4.3.2.1.1的代碼,最終執行TailHandler.channelActive方法。因爲TailHandler類內部的方法基本都是空實現,因此再也不貼代碼了。而後再執行tag4.3.2.1.2的channel.read();代碼,最終執行了AbstractNioChannel.doBeginRead()方法的selectionKey.interestOps(interestOps | readInterestOp);,等同於執行了selectionKey.interestOps(SelectionKey.OP_READ);

此時方法返回,在NioEventLoop.run()通過了一些簡單的數據清理後,而後有機會對服務端的channelActive時發送的數據進行處理了(在tag4.2.1.1.2曾經提過)。客戶端和服務端交互過程詳見下篇。

相關文章
相關標籤/搜索