Netty 源碼(一)服務端啓動

Netty 源碼(一)服務端啓動

Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)html

ServerBootstap類圖結構

ServerBootstap 建立時序圖以下:java

ServerBootstap建立時序圖

1、參數配置

(1) 綁定 Reactor 線程池ios

服務端一般會構建兩個線程池,bossGroup 負責接收鏈接,childGroup 負責處理網絡 IO 以及系統 Task。git

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    // 省略...
    this.childGroup = childGroup;
    return this;
}

// bossGroup 由父類的 AbstractBootstrap 初始化
public B group(EventLoopGroup group) {
    // 省略...
    this.group = group;
    return self();
}

(2) 綁定 Channelgithub

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

// bind 方法時會用反射建立對應的 channel
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    // 省略...
    this.channelFactory = channelFactory;
    return self();
}

(3) 綁定 ChannelHandlerpromise

// NioServerSocketChannel 使用,處理客戶端鏈接
.handler(new LoggerHandler())
// NioSocketChannel 使用,處理網絡 IO
.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ServerHandler());
    }
})

這兩個 handler 的區別以下圖:網絡

NioServerSocketChannel的Handler模型

2、啓動服務

下面看一下 ServerBootstrap 是如何綁定端口socket

bind核心流程

  • doBind()
    initAndRegister
  • doBind0()
  • initAndRegister
    init()
  • beginRead()
private ChannelFuture doBind(final SocketAddress localAddress) {
    // 1. 建立 Channel 並將 Channel 註冊到 NioEventLoop 上
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // 2. 若是 channel 註冊到 NioEventLoop 上成功,則綁定端口啓動服務
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 3. 若是未註冊成功,則綁定 ChannelFutureListener
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // channel 註冊到 NioEventLoop 上失敗
                    promise.setFailure(cause);
                } else {
                    // channel 註冊到 NioEventLoop 上失敗則綁定端口
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

doBind 主要完成二件事:一是 Channel 的初始化並註冊到 NioEventLoop 上;二是綁定端口,啓動服務。ide

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 1. 建立 NioServerSocketChannel
        channel = channelFactory.newChannel();
        // 2. 配置 channel 的參數
        init(channel);
    } catch (Throwable t) {
        // 省略...
    }

    // 3. 將 channel 註冊到 NioEventLoop 上
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    } 
    return regFuture;
}

2.1 NioServerSocketChannel

channelFactory.newChannel() 其實是用反射調用 NioServerSocketChannel 的無參構造器。oop

// 建立 NIO 底層的 ServerSocketChannel 對象
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
// NioServerSocketChannel 須要註冊 OP_ACCEPT 事件
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

// 設置成非阻塞模式,並註冊感興趣的事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}

// 建立 channel 是建立對應的 pipeline
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

ServerSocketChannel 建立爲何要用 provider.openServerSocketChannel()?

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    // ServerSocketChannel.open() 每秒建立 5000 個鏈接時性能會下將 1%
    // https://github.com/netty/netty/issues/2308
    return provider.openServerSocketChannel();
}

2.2 初始化

init 方法配置 channel 的 socket 參數和附加屬性,並配置相應的 handler。

void init(Channel channel) throws Exception {
    // 1. 配置 Socket 參數和 NioServerSocketChannel 的附加屬性
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            // 2. 配置 NioServerSocketChannel 的 handler
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // 3. 將接收客戶端的 ServerBootstrapAcceptor 註冊到 pipeline 中
            //    注意:此時 channel 還未綁定到 eventLoop 上,直接調用會拋出空指針異常
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

ChannelInitializer 的執行

2.3 註冊

config().group().register(channel)

NioServerSocketChannel 是如何註冊到 NioEventLoopGroup 是的詳見:http://www.javashuo.com/article/p-yrhrzohe-dx.html

3、客戶端接入

NioServerSocketChannel 註冊到 eventLoop 後就會啓動 NioEventLoop 線程,專門處理對應 channel 的網絡 IO 事件。

客戶端接入

3.1 啓動 NioEventLoop

// NioEventLoop 啓動
protected void run() {
    // 省略...
    processSelectedKeys();
}

// Netty 對 NIO Selector 進行了優化,默認不開啓(processSelectedKeysPlain)
private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

processSelectedKeysPlain 負責處理註冊在 selector 上的 channel

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            // 處理網絡 IO
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 用戶自定義的 Task
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }
        // 省略...
    }
}

// 分別處理 OP_CONNECT、OP_WRITE、OP_READ、OP_ACCEPT 事件
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    // 省略...
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    // OP_CONNECT
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);
        unsafe.finishConnect();
    }

    // OP_WRITE
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        ch.unsafe().forceFlush();
    }

    // OP_READ、OP_ACCEPT
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}

能夠看到 channel 的網絡 IO 其實都是由 unsafe 類在處理,NioServerSocketChannel 的 unsafe 是在其父類 AbstractNioMessageChannel 中調用 newUnsafe() 初始化的。

3.2 接收客戶端

// AbstractNioMessageChannel.NioMessageUnsafe#read
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        do {
            // 1. 接收客戶端的 NioSocketChannel
            int localRead = doReadMessages(readBuf);
            if (localRead == 0) {
                break;
            }
            if (localRead < 0) {
                closed = true;
                break;
            }

            allocHandle.incMessagesRead(localRead);
        } while (allocHandle.continueReading());
    } catch (Throwable t) {
        exception = t;
    }

    // 2. 觸發 pipeline 的 fireChannelRead、fireChannelReadComplete、fireExceptionCaught
    int size = readBuf.size();
    for (int i = 0; i < size; i ++) {
        readPending = false;
        pipeline.fireChannelRead(readBuf.get(i));
    }
    readBuf.clear();
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
    if (exception != null) {
        closed = closeOnReadError(exception);
        pipeline.fireExceptionCaught(exception);
    }
}

NioMessageUnsafe 的 read 方法完成了二件事:一是接收客戶端的 NioSocketChannel;二是觸發 pipeline 的 fireChannelRead 事件完成 channel 的初始化工做 ,若有異常則觸發 fireExceptionCaught。真正接收客戶端請求的操做則委託給了子類 NioServerSocketChannel 的 doReadMessages 方法完成。

// 調用 NIO 底層接收客戶鏈接
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    if (ch != null) {
        buf.add(new NioSocketChannel(this, ch));
        return 1;
    }
    return 0;
}

至此,NioServerSocketChannel 已經將請求的 NioSocketChannel 接收過來,但還未完成 channel 的初始化工做,如 handler 綁定,參數配置等。

3.3 NioSocketChannel 初始化

上文提到 NioServerSocketChannel 在初始化的時候會綁定 ServerBootstrapAcceptor,這個 handler 完成了 channel 的初始化工做。NioServerSocketChannel 的 Pipeline 以下圖:

NioServerSocketChannel Pipeline

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    // 1. NioSocketChannel 綁定 handler 和相關配置參數
    child.pipeline().addLast(childHandler);

    // 2. 配置 Socket 的 TCP 參數和附加屬性
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    
    // 3. NioSocketChannel 註冊到 eventLoop 上
    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索