Netty源碼分析之服務端啓動流程

上節對Netty的作了簡單介紹,這節分析下Netty啓動流程,後面的源碼分析都以Netty4.0.32版本爲例,如下面啓動代碼爲例子java

public class TimeServer {

    public void bind(int port) throws Exception {
	// 1.配置服務端的NIO線程組
	EventLoopGroup bossGroup = new NioEventLoopGroup();
	EventLoopGroup workerGroup = new NioEventLoopGroup();
	try {
	    ServerBootstrap b = new ServerBootstrap();
		//2.配置參數
	    b.group(bossGroup, workerGroup)
		    .channel(NioServerSocketChannel.class)
		    .option(ChannelOption.SO_BACKLOG, 1024)
		    .handler(new LoggingServerHandler())
		    .childHandler(new ChildChannelHandler());
	    // 3.綁定端口,同步等待成功
	    ChannelFuture f = b.bind(port).sync();

	    //4. 等待服務端關閉
	    f.channel().closeFuture().sync();
	} finally {
	    //5.優雅退出,釋放線程池資源
	    bossGroup.shutdownGracefully();
	    workerGroup.shutdownGracefully();
	}
    }
}

組件介紹

EventLoopGroup

EventLoopGroup是Netty的線程池,爲Acceptor Selector和IO Selector事件提供線程支持,EventLoopGroup在初始化的時候會建立相應配置數量的EventLoop來提供單個線程,EventLoop功能比較複雜,既是Select的輪詢事件線程也是其餘IO事件處理的線程。使用時調用EventLoopGroup#next()方法來獲取EventLoop。Netty的線程比較複雜後面章節會詳細說明。promise

NioServerSocketChannel

NioServerSocketChannel提供了JDK的Channel和selector的綁定功能,再綁定端口前會調用這個方法綁定操做。同時NioServerSocketChannel有個Unsafe內部類在初始化NioServerSocketChannel時會建立Unsafe做爲他的成員變量。Unsafe提供了JDK層的io操做包括讀、寫、綁定端口等根據繼承類不一樣的不一樣操做。異步

ChannelHandler

ChannelHandler是Netty的核心,它的繼承包含了2個重要的處理器ChannelOutboundHandler和ChannelInboundHandler。ChannelOutboundHandler是寫出IO事件的處理器。ChannelInboundHandler是寫入IO事件的處理器同時也包含了多個觸發器,包括鏈接斷開觸發器,異常觸發器,註冊完成觸發器,取消註冊觸發器等,因爲ChannelHandler比較複雜後面章節會詳細說明。socket

ChannelFuture

ChannelFuture是Netty的異步事件等待器,Netty利用其來實現全部IO事件的異步化,ChannelFuture能夠註冊多個事件完成監聽器,異步事件會在完成後回調監聽器。ide

ServerBootstrap

ServerBootstrap是Netty配置及啓動的入口,提供線程池、鏈接參數、線程池以及處理器的配置,同時提供了綁定端口的方法。oop

啓動時序圖

咱們以b.bind(port)這行代碼做爲開始流程,時序圖以下: 源碼分析

ServerBootstrap:this

  • 1.調用initAndRegister()建立並初始化channel
  • 1.1.調用init(channel)設置配置並註冊ChannelHandler
  • 1.2.調用EventLoopGroup裏的register(channel)將建立selector並註冊到此channel上
  • 2.調用doBind0()綁定端口並啓動監聽
  • 2.1.調用Channel的bind(localAddress, promise)方法綁定端口

EventLoopGroup:線程

  • 1.2.1.調用next()獲取下個EventLoop
  • 1.2.2.調用register(channel)委託給EventLoop去註冊

EventLoop:3d

  • 1.2.2.1.調用unsafe()獲取channel中的unsafe對象
  • 1.2.2.2.調用register(this, promise)委託unsafe註冊

Channel:

  • 1.2.2.2.1.調用doRegister()將建立selector並註冊到此channel上
  • 1.2.2.2.2.調用pipeline的fireChannelRegistered()觸發註冊成功事件
  • 1.2.2.2.3.調用pipeline的fireChannelActive()觸發激活成功事件
  • 1.2.2.2.3.1.1.1.修改interestOps添加讀操做位
  • 2.1.1.委託ChannelPipeline調用bind(localAddress, promise)
  • 2.1.1.1.1.1.調用doBind(localAddress)綁定端口
  • 2.1.1.1.1.2.若是還未激活調用pipeline的fireChannelActive()觸發激活成功事件
  • 2.1.1.1.1.2.1.1.1修改interestOps添加讀操做位

ChannelPipeline:

  • 1.2.2.2.2.1.調用ChannelHandler鏈觸發註冊成功事件
  • 1.2.2.2.3.1.調用ChannelHandler鏈觸發激活成功事件,若是配置自動讀則激活讀事件
  • 2.1.1.1.調用ChannelHandler鏈觸發綁定事件
  • 2.1.1.1.1.2.1.調用ChannelHandler鏈觸發激活成功事件,若是配置自動讀則激活讀事件

HeadContext:

  • 1.2.2.2.3.1.1.委託unsafe調用beginRead()開始讀
  • 2.1.1.1.1.委託Channel裏的unsafe調用bind(localAddress, promise)
  • 2.1.1.1.1.2.1.1.委託unsafe調用beginRead()開始讀

源碼分析

ServerBootstrap類的線程池、鏈接參數、線程池以及處理器的配置其實就是成員變量的複製這裏就是很少講,咱們直接從b.bind()開始看。

b.bind()的方法直接將端口包裝成類SocketAddress交給doBind()處理。doBind()源碼以下

private ChannelFuture doBind(final SocketAddress localAddress) {
    	//初始化Channel並註冊Selector
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        //綁定監聽端口並開始監聽
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
            return promise;
        }
    }

這個方法主要作了2件事

  • 調用initAndRegister()初始化Channel並註冊Selector
  • 調用doBind0()綁定監聽端口並開始監聽

註冊Selector是默認是異步進行的這裏Netty作了一個處理,若是註冊Selector已完成則同步調用doBind0()不然註冊監聽器等待Selector註冊完成後調用doBind0(),這樣保證整個doBind()方法的異步性。

接下來咱們看下initAndRegister()的實現,源碼以下:

final ChannelFuture initAndRegister() {
    	//初始化channel
        final Channel channel = channelFactory().newChannel();
        try {
            init(channel);
        } catch (Throwable t) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        //註冊Selector
        ChannelFuture regFuture = group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        return regFuture;
    }

initAndRegister()方法也是作了2件事

  • 調用init()初始化Channel
  • 調用group().register()註冊Selector

channelFactory().newChannel()是建立你配置的Channel這裏源碼配置了NioServerSocketChannel,而init()的具體實現則是在NioServerSocketChannel中,init()源碼以下:

void init(Channel channel) throws Exception {
		...

        ChannelPipeline p = channel.pipeline();

        ...
        
        //將從線程組,從處理器,從配置,從屬性封裝成ServerBootstrapAcceptor處理器,共後續IOSelector使用。
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                pipeline.addLast(new ServerBootstrapAcceptor(
                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }

在上節中咱們講了Neety服務端線程模式能夠是主從模型,因此它的配置也分主從配置,init()主要作的將主處理器(ChannelHandler)添加到主ChannelPipeline裏,將 從線程組,從處理器(ChannelHandler),從配置,從屬性封裝成ServerBootstrapAcceptor處理器添加到主ChannelPipeline裏,供後續觸發客戶端鏈接事件使用。

group().register()先是取了主線程組EventLoopGroup委託EventLoopGroup註冊Selector,主線程組中也是調用next()委託給單線程EventLoop去處理

next().register(channel);

最後在EventLoop的register()中委託給Channel中的Unsafe處理(Unsafe是在Channel建立的時候被建立的)

channel.unsafe().register(this, promise);

咱們來看下Unsafe中的register()的源碼:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            
            AbstractChannel.this.eventLoop = eventLoop;
            //若是eventLoop是當前線程直接執行,不然交給傳eventLoop線程處理
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new OneTimeTask() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

Unsafe中的register()作的就是判斷next()獲取的線程是不是當前線程,若是是同步執行註冊,不然異步執行註冊,按例子中默認配置會異步調用register0()註冊。

咱們來看下register0()的源碼

private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                //調用JDK去註冊Selector
                doRegister();
                neverRegistered = false;
                registered = true;
                //設置註冊成功通知監聽器
                safeSetSuccess(promise);
                //觸發註冊成功事件
                pipeline.fireChannelRegistered();
                //若是是第一次則觸發激活成功事件
                if (firstRegistration && isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

register0()作了4件事

  • 調用JDK去註冊Selector,源碼以下
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
  • 設置註冊成功通知監聽器
  • 觸發註冊成功事件
  • 若是綁定成功則觸發激活成功事件

觸發註冊成功事件和若是綁定成功觸發激活成功事件實際上是在主pipeline中鏈式調用用戶的配置ChannelHandler,調用過程以下

圖1

這裏會有個疑問綁定是在註冊selector以後進行的爲何這裏能夠觸發激活成功事件,其實在safeSetSuccess(promise)代碼執行以後已經通知監聽器並開始執行綁定相關的代碼了。

觸發激活成功事件後若是配置了會觸發開始讀事件

public ChannelPipeline fireChannelActive() {
        head.fireChannelActive();
        //若是配置了會觸發開始讀事件
        if (channel.config().isAutoRead()) {
            channel.read();
        }

        return this;
    }

讀事件也是鏈式調用調用過程以下:

圖2

能夠看到最後調用HeadContext裏的read()方法

public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}

委託給unsafe去處理,處理的方式就是修改interestOps添加讀操做位,由IOSelector去觸發讀事件

selectionKey.interestOps(interestOps | readInterestOp);

在NioServerSocketChannel中readInterestOp其實對應的是JDK中的SelectionKey.OP_ACCEPT,在ServerSocketChannel構造方法中定義:

public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

服務端啓動後最早關心是客戶端鏈接的,若是沒有客戶端的鏈接,後續沒法IO操做,因此這裏註冊了SelectionKey.OP_ACCEPT來監聽客戶端的鏈接。

以上就是初始化Channel並註冊Selector的流程

下面說下調用doBind0()綁定監聽端口並開始監聽流程,doBind0()代碼以下:

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

綁定端口是異步進行的,這裏調用channel.eventLoop()獲取的線程默認就是以前註冊Selector的流程中next()獲取的線程。綁定的流程其實也是在主pipeline中鏈式調用用戶的配置ChannelHandler,如圖2,能夠看到最後調用HeadContext裏的bind()方法

public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception {
            unsafe.bind(localAddress, promise);
        }

也是委託給unsafe去處理,處理的方式就是調用JDK裏的綁定接口去綁定

javaChannel().socket().bind(localAddress);

若是綁定成功且以前沒觸發激活成功事件則觸發之,這裏的流程跟以前講的同樣就不累述。

以上就是綁定監聽端口並開始監聽的流程。

相關文章
相關標籤/搜索