netty服務端啓動--ServerBootstrap源碼解析

netty服務端啓動--ServerBootstrap源碼解析

前面的第一篇文章中,我以spark中的netty客戶端的建立爲切入點,分析了netty的客戶端引導類Bootstrap的參數設置以及啓動過程。顯然,咱們還有另外一個重要的部分--服務端的初始化和啓動過程沒有探究,因此這一節,咱們就來從源碼層面詳細分析一下netty的服務端引導類ServerBootstrap的啓動過程。java

spark中netty服務端的建立

咱們仍然以spark中對netty的使用爲例,以此爲源碼分析的切入點,首先咱們看一下spark的NettyRpc模塊中建立netty服務端引導類的代碼:ios

TransportServer.init

TransportServer的構造方法中會調用init方法,ServerBootstrap類就是在init方法中被建立並初始化以及啓動的。
這個方法主要分爲三塊:git

  • 建立ServerBootstrap對象,並設置各類參數。咱們看到,這裏的bossGroup和workerGroup是同一個線程組,此外還設置了socket的一些參數如排隊的鏈接數,接收緩衝區,發送緩衝區大小等。
  • 設置childHandler參數,之因此把這個參數的設置單獨拿出來就是爲了凸顯這個參數的重要性,childHandler參數是用戶實現時間處理邏輯的地方
  • 最後將服務端綁定到某個端口,同時在綁定的過程當中也會啓動服務端,開始監聽io事件。

很顯然,ServerBootstrap的啓動入口就是bind方法。github

// 初始化netty服務端
      private void init(String hostToBind, int portToBind) {
      
        // io模式,有兩種選項NIO, EPOLL
        IOMode ioMode = IOMode.valueOf(conf.ioMode());
        // 建立bossGroup和workerGroup,即主線程組合子線程組
        EventLoopGroup bossGroup =
          NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
        EventLoopGroup workerGroup = bossGroup;
      
        // 緩衝分配器,分爲堆內存和直接內存
        PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
          conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
      
        // 建立一個netty服務端引導對象,並設置相關參數
        bootstrap = new ServerBootstrap()
          .group(bossGroup, workerGroup)
          .channel(NettyUtils.getServerChannelClass(ioMode))
          .option(ChannelOption.ALLOCATOR, allocator)
          .childOption(ChannelOption.ALLOCATOR, allocator);
      
        // 內存使用的度量對象
        this.metrics = new NettyMemoryMetrics(
          allocator, conf.getModuleName() + "-server", conf);
      
        // 排隊的鏈接數
        if (conf.backLog() > 0) {
          bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
        }
      
        // socket接收緩衝區大小
        if (conf.receiveBuf() > 0) {
          bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
        }
      
        // socket發送緩衝區大小
        if (conf.sendBuf() > 0) {
          bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
        }
      
        // 子channel處理器
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) {
            RpcHandler rpcHandler = appRpcHandler;
            for (TransportServerBootstrap bootstrap : bootstraps) {
              rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
            }
            context.initializePipeline(ch, rpcHandler);
          }
        });
      
        InetSocketAddress address = hostToBind == null ?
            new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
        // 綁定到ip地址和端口
        channelFuture = bootstrap.bind(address);
        // 同步等待綁定成功
        channelFuture.syncUninterruptibly();
      
        port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
        logger.debug("Shuffle server started on port: {}", port);
      }

AbstractBootstrap.init(SocketAddress localAddress)

這裏的校驗主要是對group和channelFactory的非空校驗
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}bootstrap

AbstractBootstrap.doBind

這個方法,咱們以前在分析Bootstrap的啓動過程時提到過,它的主要做用以下:api

  • 經過反射根據傳入的channel類型建立一個具體的channel對象
  • 調用init方法對這個channel對象進行初始化
  • 將初始化完成的channel對象註冊到一個EventLoop線程上

以前,咱們分析了NioSocketChannel的構造過程,以及Bootstarp中對channel的初始化過程,
本節咱們要分析NioServerSocketChannel的構造過程,以及ServerBootstrap的init方法的實現。promise

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 建立一個channel,並對這個channel作一些初始化工做
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        // 將這個channel綁定到指定的地址
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {// 對於還沒有註冊成功的狀況,採用異步的方式,即添加一個回調
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

NioServerSocketChannel的構造方法

這裏經過調用jdk的api建立了一個ServerSocketChannel。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}安全

與NioSocketChannelConfig相似,NioServerSocketChannelConfig也是一種門面模式,是對NioServerSocketChannel中的參數接口的封裝。
此外,咱們注意到,這裏規定了NioServerSocketChannel的初始的感興趣的事件是ACCEPT事件,即默認會監聽請求創建鏈接的事件。
而在NioSocketChannel中的初始感興趣的事件是read事件。
因此,這裏與NioSocketChannel構造過程最主要的不一樣就是初始的感興趣事件不一樣。app

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

這裏首先調用了父類的構造方法,最終調用了AbstractNioChannel類的構造方法,這個過程咱們在以前分析NioSocketChannel初始化的時候已經詳細說過,主要就是建立了內部的Unsafe對象和ChannelPipeline對象。異步

ServerBootstrap.init

分析完了channel的構造過程,咱們再來看一下ServerBootstrap是怎麼對channel對象進行初始化的。

  • 設置參數,設置屬性
  • 獲取子channel的參數和屬性,以便在有新的鏈接時給新建立的channel設置參數和屬性
  • 給serverChannel中添加一個重要的handler,這個handler中實現了對新建立的channel的處理邏輯。

因此,很顯然,咱們接下來就要看一下這個特殊的handler,ServerBootstrapAcceptor的read方法。

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    // 設置參數
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 設置屬性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    // 子channel的group和handler參數
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // 添加處理器
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 通常狀況下,對於ServerBootstrap用戶無需設置handler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // 這裏添加了一個關鍵的handler,而且順手啓動了對應的EventLoop的線程
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch)

在分析ServerBootstrapAcceptor以前,咱們首先來回顧一下NioEventLoop的循環中,對於accept事件的處理邏輯,這裏截取其中的一小段代碼:

// 處理read和accept事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }

可見,對於accept事件和read事件同樣,調用NioUnsafe的read方法

AbstractNioMessageChannel.NioMessageUnsafe.read

由於NioServerSocketChannel繼承了AbstractNioMessageChannel,而且read方法的實現也是在AbstractNioMessageChannel中,

  • doReadMessages是一個抽象方法,在NioServerSocketChannel的實現中,這個方法調用jdk的api接收一個鏈接,幷包裝成NioSocketChannel對象
  • 以讀取到的channel對象做爲消息,在channelPipeline中觸發一個讀事件

根據前面對channelPipeline的分析,咱們知道,讀事件對從頭結點開始,向尾節點傳播。上面咱們也提到了,對於初始的那個NioServerSocketChannel,會在ServerBootstarp的init方法中向這個channel的處理鏈中加入一個ServerBootstrapAcceptor處理器,因此,很顯然,接下來咱們應該分析ServerBootstrapAcceptor中對讀事件的處理。

public void read() {
        // 確認當前代碼的執行是在EventLoop的線程中
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                    // 這裏讀取到的是創建的鏈接對應的channel,
                    // jdk的socketChannel被包裝成了netty的NioSocketChannel
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                // 把接收到的每個channel做爲消息,在channelPipeline中觸發一個讀事件
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            // 最後觸發一個讀完成的事件
            pipeline.fireChannelReadComplete();

            if (exception != null) {
                closed = closeOnReadError(exception);

                pipeline.fireExceptionCaught(exception);
            }

            if (closed) {
                inputShutdown = true;
                if (isOpen()) {
                    close(voidPromise());
                }
            }
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

ServerBootstrapAcceptor.channelRead

代碼邏輯仍是比較簡單的,由於有了前面的鋪墊,即在ServerBootstrap的init方法對創始的那個serverChannel進行初始化時,將用戶設置的子channel的參數,屬性,子channel的handler和子group等參數做爲構造參數所有傳給了ServerBootstrapAcceptor,因此在這裏直接用就好了。
其實這裏的子channel的初始化和註冊過程和Bootstrap中對一個新建立的channel的初始化過程基本同樣,區別在於Bootstrap中channel是用戶代碼經過調用connect方法最終在initAndregistry中經過反射構造的一個對象;而在服務端,經過監聽ServerSocketChannel的accept事件,當有新的鏈接創建請求時,會自動建立一個SocketChannel(jdk的代碼實現),而後NioServerSocketChannel將其包裝成一個NioSocketChannel,並做爲消息在傳遞給處理器,因此在ServerSocketChannel中的子channel的建立是由底層的jdk的庫實現的。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 類型轉換,這裏的強制轉換是安全的的,
        // 是由各類具體的AbstractNioMessageChannel子類型的實現保證的
        // 各類具體的AbstractNioMessageChannel子類型的讀方法確保它們讀取並最終返回的是一個Channel類型
        final Channel child = (Channel) msg;

        // 給子channel添加handler
        child.pipeline().addLast(childHandler);

        // 給子channel設置參數
        setChannelOptions(child, childOptions, logger);

        // 給子channel設置屬性
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            // 將子channel註冊到子group中
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

AbstractBootstrap.doBind0

回到doBind方法中,在完成了channel的構造,初始化和註冊邏輯後,接下來就要把這個server類型的channel綁定到一個地址上,這樣才能接受客戶端創建鏈接的請求。
從代碼中能夠看出,調用了channel的bind方法實現綁定的邏輯。

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 調用了channel.bind方法完成綁定的邏輯
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

AbstractChannel.bind

bind操做的傳遞是從尾節點開始向前傳遞,因此咱們直接看Headcontext對於bind方法的實現
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

DefaultChannelPipeline.bind

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

HeadContext.bind

調用了unsafe的bind方法。

public void bind(
            ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
        unsafe.bind(localAddress, promise);
    }

由於後面右有幾個事件的觸發,每一個觸發事件都是經過channel的相關方法來觸發,而後又是經過channelpipeline的傳遞事件,這些事件最後基本都是由HeadContext處理了,因此這裏我只簡單地敘述一下後面的 大概邏輯,代碼比較繁瑣,並且不少都是相同的調用過程,因此就不貼代碼了。

  • 經過前面的分析,咱們知道首先經過channel觸發了一個bind操做,這個操做的實現最終由HeadCOntex實現,HeadContex的實現中是調用了unsafe.bind
  • bind的實現邏輯中,首先經過jdk的api完成了ServerSocketChannel的綁定,而後又觸發了一個channelActive的事件,這個事件的處理最終也是有HeadContext實現
  • 在HeadContext對channelActive操做的實現中,觸發了一個read()操做,注意這裏的這個read方法是不帶參數的,是ChannelOutboundInvoker接口中定義的一個方法,也是有HeadContext實現
  • HeadContext對read操做的實現中,調用了Unsafe.beginRead方法,通過幾個子類的具體實現後,最終由AbstractNioChannel.doBeginRead實現具體的開始讀的邏輯,

從代碼中能夠看出來,最終調用了jdk的api,將感興趣的事件添加到selectionKey中。經過前面的 分析,咱們知道對於NioSocketChannel,它的感興趣的讀事件類型是SelectionKey.OP_READ,也就是讀事件;
而對於NioServerSocketChannel,根據前面對其構造方法的分析,它的感興趣的事件是SelectionKey.OP_ACCEPT,也就是創建鏈接的事件。

protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    // 將讀事件類型加入到selectionKey的感興趣的事件中
    // 這樣jdk底層的selector就會監聽相應類型的事件
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

總結

到這裏,咱們就把ServerBootstrap的主要功能代碼分析完了,這裏面主要包括三個方面:

  • ServerBootstrap中對server類型的channel的初始化,包括最重要的handler----ServerBootstrapAcceptor的添加
  • ServerBootstrapAcceptor中對於新建立的子channel的處理,包括初始化和註冊的邏輯
  • 將serverChannel綁定到具體的地址上,綁定過程當中也啓動了對應的註冊的線程。
相關文章
相關標籤/搜索