Netty源碼解析 -- Server啓動原理解析

文本分享Netty中事件循環機制,Channel的設計,以及Server啓動過程。
源碼分析基於Netty 4.1java

EventLoop

Netty經過事件循環(Event Loop)機制處理IO事件,即經過一個死循環不斷獲取並處理Selector中發生的事件。
下面看一下Netty中事件循環機制相關的類。promise

EventExecutorGroup表明了一組EventExecutor集合,它繼承了ScheduledExecutorService,execute方法經過next方法選擇一個EventExecutor,並調用EventLoop#execute處理事件。
而EventExecutor也繼承了EventExecutorGroup,它能夠看作一個特殊的EventExecutorGroup,它的實現類負責提供真正的execute方法實現。微信

EventLoop繼承了EventExecutor,負責處理註冊於其上的Channel的IO事件(表明事件循環機制)。而EventLoopGroup接口則繼承了EventExecutorGroup,負責調度EventLoop。網絡

而NioEventLoop是EventLoop的實現類,其execute方法的主要邏輯在run方法中,
NioEventLoop#run方法經過一個死循環不斷處理Selector中發生的事件(事件循環機制具體實現),正是Netty中的核心。多線程

MultithreadEventExecutorGroup實現EventExecutorGroup,實現多線程處理任務(其實是維護了多個EventExecutor,一個EventExecutor能夠理解爲一個線程),newChild方法構造具體的EventExecutor。
子類MultithreadEventLoopGroup實現EventLoopGroup,其newChild方法構造具體的EventLoop。
子類NioEventLoopGroup#newChild構建了NioEventLoop,處理NIO操做。app

EventLoop各實現類關係以下
框架

NioEventLoop繼承了SingleThreadEventExecutor。
SingleThreadEventExecutor中維護了一個Executor,doStartThread方法調用Executor#execute啓動一個新的線程,並在新線程中調用子類的run方法(run方法是EventLoop邏輯實現方法,新開線程纔是EventLoop真正執行線程)。
因此,一個NioEventLoop對象能夠看作是一個線程。
NioEventLoopGroup維護了一組NioEventLoop,它能夠理解爲一個線程池,而具體任務交給其中一個線程(NioEventLoop)處理。異步

Netty的啓動引導類ServerBootstrap中維護了兩個EventLoopGroup,EventLoopGroup#childGroup和AbstractBootstrap#group,
其中AbstractBootstrap#group負責管理ServerChannel,處理accept事件,並將生成的SocketChannel交給childGroup,由EventLoopGroup#childGroup處理read,write事件(爲了方便,下文我將ServerBootstrap#childGroup稱爲AcceptGroup,AbstractBootstrap#group稱爲ReadGroup。)。
另外,每一個NioEventLoop對象中都維護了一個(jvm)Selector,該NioEventLoop只需處理該Selector的事件便可。jvm

這些設計來自Reactor模式,詳細能夠見java.util.concurrent包的做者Doug Lea的《Scalable IO in Java》socket

Channel

Channel,可以完成IO操做的通道,提供read, write, connect, and bind等方法。
Channel中維護了一個Unsafe對象,用於完成數據傳輸操做(這類操做一般由IO事件觸發,而不是用戶觸發)。

子接口SocketChannel表明Socket鏈接的網絡通道,面向流,支持讀寫操做。
而另外一個子接口ServerChannel表示能夠監聽新鏈接的通道,ServerSocketChannel表明實現TCP/IP協議的ServerChannel。

AbstractChannel提供最基本的實現,它維護了Unsafe和DefaultChannelPipeline對象,並委託這兩個對象處理實際的邏輯。同時,它也提供newUnsafe,newChannelPipeline方法給子類構造他們須要的對象。
AbstractChannel中還有一個內部類AbstractUnsafe,實現了register,bind,disconnect等方法的基礎邏輯。

AbstractNioChannel實現了NIO基礎邏輯,如維護(jvm)SelectableChannel,(jvm)SelectionKey等對象,還有一個很關鍵的selectionKey,表明關注的事件Key。
他也有一個內部類,AbstractNioUnsafe,繼承了AbstractUnsafe,實現了Unsafe子接口NioUnsafe,添加一些NIO的處理邏輯。

AbstractNioChannel的子類能夠分紅ServerChannel實現類和SocketChannel實現類。

ServerChannel實現類是NioServerSocketChannel,實現TCP/IP協議下的NIO網絡通訊。
其父類AbstractNioMessageChannel,newUnsafe方法返回的NioMessageUnsafe。

SocketChannel實現類是NioSocketChannel,一樣實現NIO網絡通訊。
其父類AbstractNioByteChannel,newUnsafe方法返回的NioByteUnsafe。

Channel各實現類關係以下

Netty中將接口按功能劃分得很細,最好你們能夠按功能層次理解各接口表明含義以及實現類的​邏輯。​
以避免後續看源碼時混淆了。

Server啓動

AbstractBootstrap#bind -> AbstractBootstrap#doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
    // #1
    final ChannelFuture regFuture = initAndRegister();    
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        // #2
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        ...
    }
}

#1 初始化及註冊ServerChannel。
initAndRegister方法返回ChannelFuture,ChannelFuture繼承了(jvm)Future,表明IO異步處理結果,能夠綁定監聽事件。
咱們要有這個意識,Netty是一個異步框架,全部的IO操做都是異步的(充分利用cpu),IO方法不會等待實際IO操做完成,而是返回ChannelFuture,表明異步處理結果。
實際IO完成時,Netty會觸發ChannelFuture的回調函數。
ChannelPromise是一種特殊的ChannelFuture,提供操做結果的方法(setSuccess,setFailure方法),通常會做爲IO方法的參數(Unsafe中不少方法都有該參數)。
#2 註冊完成後,將ServerChannel綁定監聽端口。

AbstractBootstrap#initAndRegister

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // #1
        channel = channelFactory.newChannel();
        // #2
        init(channel);
    } catch (Throwable t) {
        ...
    }
    // #3
    ChannelFuture regFuture = config().group().register(channel);
    
    if (regFuture.cause() != null) {
        // #4
    }
    return regFuture;
}

#1 構造ServerChannel
AbstractBootstrap#channelFactory是一個ReflectiveChannelFactory對象,他經過反射生成Channel。
ServerBootstrap#channel方法會構造該對象,並指定具體的ServerChannel類。
(因此咱們要指定NioServerSocketChannel.class,new ServerBootstrap().channel(NioServerSocketChannel.class)
#2 初始化ServerChannel,該方法由子類實現
#3 註冊Channel,注意,config().group()返回AcceptGroup。這裏將ServerChannel註冊AcceptGroup中,具體說,註冊到AcceptGroup維護的(jvm)Selector中。
#4 若是IO操做發生了異常,須要關閉Channel。

NioServerSocketChannel#構造方法 -> NioServerSocketChannel#newSocket方法

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException("Failed to open a server socket.", e);
    }
}

使用(jvm)SelectorProvider,打開一個(jvm)ServerSocketChannel。
這裏咱們完成NIO網絡通訊第一步,構造一個(jvm)ServerSocketChannel。

ServerBootstrap#init

void init(Channel channel) throws Exception {
    // #1
    ...

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // #2
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));    // #3
                }
            });
        }
    });
}

#1 設置ServerChannel的Option和Attribute
#2 給ServerChannel的ChannelPipeline添加一個ChannelInitializer

ChannelPipeline能夠理解爲攔截鏈,維護了一條ChannelHandler鏈表,ChannelHandler即具體攔截器,能夠在IO讀寫中,對數據進行處理。
ChannelHandler也能夠分爲兩類
ChannelOutboundHandler ,在write操做前調用
ChannelInboundHandler,在read操做後調用

AbstractChannel中維護了pipeline變量,子類能夠經過newChannelPipeline方法構造具體的ChannelPipeline。
NioServerSocketChannel使用的是DefaultChannelPipeline。
能夠這樣理解,Unsafe負責數據傳輸,而ChannelPipeline負責邏輯處理。

#3 給ServerChannel的ChannelPipeline添加一個ServerBootstrapAcceptor,並將ServerBootstrap的childHandler,currentChildHandler,currentChildOptions,currentChildAttrs變量都交給它,ServerBootstrapAcceptor用於處理Accept事件,後面再寫文章解析。

AbstractBootstrap#initAndRegister方法#3步驟 -> SingleThreadEventLoop#register ->(經過Channel調用Unsafe)AbstractUnsafe#register

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ...

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {    // #1
        register0(promise);    // #2
    } else {
        try {
            eventLoop.execute(new Runnable() {    // #3
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            ...
        }
    }
}

#1 判斷當前線程是否爲EventLoop執行線程
#2 當前線程就是EventLoop執行線程,直接執行任務
#3 當前線程不是EventLoop執行線程,給EventLoop添加一個真正處理操做的任務,後面EventLoop執行線程會處理該任務。
這裏是異步的關鍵,以任務的形式將操做交給EventLoop,當前線程不須要阻塞等待真正IO操做完成。如下代碼也是Netty中處理IO的通用格式

if (eventLoop.inEventLoop()) {
        ...
    } else {
        eventLoop.execute(new Runnable() {
            ...
        });
    }

AbstractUnsafe#register0

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        // #1
        doRegister();
        neverRegistered = false;
        registered = true;

        // #2
        pipeline.invokeHandlerAddedIfNeeded();
        // #3
        safeSetSuccess(promise);
        // #4
        pipeline.fireChannelRegistered();
        // #5
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        // #6
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

#1 由子類實現具體註冊操做
#2 執行DefaultChannelPipeline中的延遲任務
#3 設置promise狀態爲Success
#4 觸發ChannelPipeline#fireChannelRegistered
#5 若是是首次註冊,觸發ChannelPipeline#fireChannelActive
#6 異常處理,關閉Channel,設置promise狀態爲Failure。

AbstractUnsafe#doRegister -> AbstractNioChannel#doRegister

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // #1
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            ...
        }
    }
}

#1 javaChannel()獲取SelectableChannel(jvm),
eventLoop().unwrappedSelector()獲取AcceptGroup維護的Selector(jvm)
這裏完成了NIO網絡通訊第二步,將(jvm)ServerSocketChannel註冊到(jvm)Selector,但尚未註冊關注事件Key。
注意,這裏將當前NioServerSocketChannel做爲channle#attachment,後面使用它來判斷是否爲IO事件。

AbstractUnsafe#register0方法#5步驟 -> DefaultChannelPipeline#fireChannelActive -> HeadContext#channelActive -> HeadContext#readIfIsAutoRead -> DefaultChannelPipeline#read -> HeadContext#read -> AbstractUnsafe#beginRead -> AbstractNioChannel#doBeginRead

protected void doBeginRead() throws Exception {
    // #1
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    // #2
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

#1 selectionKey是Selector中關注事件Key集合(AbstractNioChannel#doRegister方法中生成)
#2 將readInterestOp註冊到selectionKey。
那麼readInterestOp的值是什麼呢? 它在AbstractNioChannel#構造方法中賦值,真正的值來自NioServerSocketChannel構造方法,能夠看到,它在ServerChannel中固定爲SelectionKey.OP_ACCEPT。
到這裏,(jvm)ServerChannel已經註冊到AcceptGroup維護中(jvm)Selector,關注的事件Key爲accept。
這裏完成NIO網絡通訊第三步,註冊關注事件Key。

AbstractBootstrap.doBind0 -> AbstractChannel#bind -> DefaultChannelPipeline#bind -> HeadContext#bind -> AbstractUnsafe#bind -> NioServerSocketChannel#doBind

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

根據不一樣JDK版本,調用不一樣的bind方法。
這裏完成了NIO網絡通訊第四步,綁定端口,開始socket監聽。

關於HeadContext,ChannelPipeline調用鏈路,後面會寫文章分享。

延遲任務
DefaultChannelPipeline#addFirst

public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        name = filterName(name, handler);
        // #1
        newCtx = newContext(group, name, handler);
        // #2
        addFirst0(newCtx);
        // #3
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        // #4
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

#1 使用當前ChannelHandler構造一個ChannelHandlerContext
#2 添加ChannelHandlerContext到鏈表頭
#3 當前Channel未註冊,調用DefaultChannelPipeline#callHandlerCallbackLater,添加一個延遲任務
#4 當前Channel已註冊,調用DefaultChannelPipeline#callHandlerAdded0,完成ChannelHandler添加擴展操做。

DefaultChannelPipeline#callHandlerCallbackLater方法,將當前ChannelHandlerContext轉化爲一個延遲任務PendingHandlerAddedTask或者PendingHandlerRemovedTask,加到DefaultChannelPipeline#pendingHandlerCallbackHead列表中。

AbstractUnsafe#register0方法#2步驟 -> DefaultChannelPipeline#callHandlerAddedForAllHandlers,該方法會執行pendingHandlerCallbackHead列表全部任務,調用execute方法。
PendingHandlerAddedTask#execute會調用ChannelHandler#handlerAdded,完成ChannelHandler添加擴展操做。
PendingHandlerRemovedTask#execute則調用ChannelHandler#handlerRemoved,完成完成ChannelHandler移除善後工做。

回到ServerBootstrap#init方法#2步驟,這裏給ServerChannel的ChannelPipeline添加一個ChannelInitializer,它是Netty提供的工具類,實現了ChannelHandler#handlerAdded方法,邏輯是若是當前Channel已註冊,則調用initChannel方法。(因此咱們能夠利用該方法在註冊完成後添加新的ChannelHandler給ChannelHandler),不然不處理。
注意:該步驟在未註冊時執行,因此也會生成延遲任務,由AbstractUnsafe#register0方法#2步驟觸發完成操做,將ServerBootstrapAcceptor添加到ServerChannel的ChannelPipeline中。

若是您以爲本文不錯,歡迎關注個人微信公衆號,您的關注是我堅持的動力!

相關文章
相關標籤/搜索