Netty系列文章之服務端啓動分析

原文博客地址: pjmike的博客html

前言

本文主要分析 Netty服務端的啓動,以便對Netty框架有一個基本的認識,我用的Netty版本是 netty-4.1.29,以前的文章Netty 系列文章之基本組件概覽 對Netty的基本組件作了一個簡單的介紹,算是對本文分析Netty服務端的啓動作一個基礎鋪墊java

服務端代碼

該源碼出自 netty官方提供的 服務端demo,詳細地址: github.com/netty/netty…git

我作了一點小改動,代碼以下:github

public final class EchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
複製代碼

服務端啓動流程概覽:編程

  • 建立 ServerBootStrap啓動類
  • 設置並綁定 NioEventLoopGroup線程池
  • 建立服務端 Channel
  • 添加並設置 ChannelHandler
  • 綁定並啓動監聽端口

在以前的文章我就提到過,ServerBootstrap是Netty服務端的啓動輔助類,幫助Netty服務端完成初始化,下面我將深刻代碼,仔細分析Netty服務端啓動過程當中各組件的建立與初始化bootstrap

Channel的建立和初始化過程

Channel是Netty的網絡操做抽象類,對應於JDK底層的 Socket,Netty服務端的Channel類型是 NioServerSocketChannel。下面來分析NioServerSocketChannel的建立和初始化segmentfault

NioServerSocketChannel的建立

NioServerSocketChannel的建立其實是從 ServerBootStrapbind()方法開始的,進入bind()源碼分析(AbstractBootstrap的bind()方法):api

......
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
......
/** * Create a new {@link Channel} and bind it. */
public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    }
    ......
}
複製代碼

在源碼裏注意到一個 initAndRegister()方法,這個方法就負責 NioServerSocketChannel的初始化和註冊操做,走進initAndRegister()方法,以下圖所示:數組

channel_init_and_create

從上圖能夠看出,源碼裏是調用 channelFactory.newChannel()來建立 channel , 走進ChannelFactory發現該接口被 @Deprecated註解標註了,說明是一個過期的接口:promise

@Deprecated
public interface ChannelFactory<T extends Channel> {
    /**
     * Creates a new channel.
     */
    T newChannel();
}
複製代碼

我用的Netty版本是 Netty-4.1.29,其Netty API 文檔 中介紹 io.netty.bootstrap.ChannelFactory提到用 io.netty.channel.ChannelFactory代替。

這裏 ChannelFactory只是一個工廠接口,真正建立 Channel的是ReflectiveChannelFactory類,它是ChannelFactory的一個重要實現類,該類經過反射方式建立 Channel,源代碼以下:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}
複製代碼

其中 newChannel()方法經過 clazz.getConstructor().newInstance()來建立 Channel,即經過反射方式來建立 Channel,而這個 clazz就是 經過ServerBootStrapchannel方法傳入的,最開始的服務端代碼傳入的NioServerSocketChannel,因此對應經過反射建立了NioServerSocketChannel,而且 ChannelFactory的初始化也是在該方法中進行的,代碼以下:

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
複製代碼

到此,NioServerSocketChannel的建立過程大致結束,再次總結一下:

  • ServerBootstrap中的ChannelFactory的實現是 ReflectiveChannelFactory
  • 生成的 Channel的具體類型是 NioServerSocketChannel
  • Channel的實例化過程其實就是調用 ChannelFactory.newChannel方法,其實是經過反射方式進行建立的

NioServerSocketChanel的實例化過程

在前面的分析中,NioServerSocketChannel是經過反射建立的,它的構造方法以下:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /** * Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in * {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise. * * See <a href="https://github.com/netty/netty/issues/2308">#2308</a>. */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}
/** * Create a new instance */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
複製代碼

方法newSocket利用 provider.openServerSocketChannel()生成Nio中的 ServerSocketChannel對象,而後調用重載的構造器:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
複製代碼

該構造器中,調用了父類的構造器,傳入的參數是 SelectionKey.OP_ACCEPT,這個參數對於有Java NIO編程經驗的人來講應該很是熟悉,在Java NIO中服務端要監聽客戶端的鏈接請求,就向多路複用器 Selector註冊 SelectionKey.OP_ACCEPT 客戶端鏈接事件,而Netty又是基於 Java NIO開發的,這裏可見一斑。接着進入父類構造器:

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
複製代碼

而後:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
複製代碼

設置當前 ServerSocketChannel爲非阻塞通道,而後再次進入父類構造器 AbstractChannel(Channel parent):

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
複製代碼
  • parent屬性設置爲null
  • 初始化unsafe,用來負責底層的connect,register,read和write操做
  • 初始化pipeline,在實例化一個Channel的同時,當有事件發生的時候,pipeline負責調用相應的Handler進行處理

關於 unsafe

Netty中的unsafe不是JDK中的sun.misc.Unsafe,該unsafe實際是封裝了 Java 底層 Socket的操做,所以是溝通 Netty上層和Java 底層重要的橋樑。

ChannelPipeline的初始化

每一個Channel都有對應的 ChannelPipeline,當一個Channel被建立時,對應的ChannelPipeline也會自動建立,在上面分析 NioServerSocketChannel實例化過程就看到,在其父類構造器中,有初始化一個 pipeline,對應源代碼以下:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    .....
    private final Channel parent;
    private final ChannelId id;
    private final Unsafe unsafe;
    private final DefaultChannelPipeline pipeline;
    ......
    /** * Creates a new instance. * * @param parent * the parent of this channel. {@code null} if there's no parent. */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    ......
        /** * Returns a new {@link DefaultChannelPipeline} instance. */
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
複製代碼

從上面代碼看到,pipeline最終被初始化爲一個 DefaultChannelPipelineDefaultChannelPipelineChannelPipeline的實現類,進入它的構造方法,以下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
複製代碼

該構造方法有一個參數 channel,這個 channel就是咱們以前傳入的 NioServerSocketChannel。關於該構造器其餘方法以及ChannelPipeline更詳細的介紹將在後續文章分析。

NioEventLoopGroup

在咱們最開始的Netty服務端代碼中初始化了兩個 NioEventLoopGroup,即一個處理客戶端鏈接請求的線程池——bossGroup,一個處理客戶端讀寫操做的線程池——workerGroup

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
複製代碼

NioEventLoopGroup的類繼承結構圖以下所示:

nioeventLoopGroup

從圖中能夠看到,NioEventLoopGroup實現了 Executor接口,Executor框架能夠用來建立線程池的,也是一個線程執行器。關於 Executor框架更加詳細的介紹請參閱 《Java併發編程的藝術》

NioEventLoopGroup

看了NioEventLoopGroup的類繼承結構,下面來分析一下它的初始化過程,構造器源代碼以下:

......
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
......
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
......
public NioEventLoopGroup( int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
複製代碼

上面幾個重載構造器其實沒作啥,最終調用父類 MultithreadEventLoopGroup 的構造器,

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
複製代碼

這裏須要注意的是若是咱們傳入的線程數 nThreads 是 0 的話,那麼Netty將會爲咱們設置默認的線程數 DEFAULT_EVENT_LOOP_THREADS,這個默認值是處理器核心數 * 2,以下:

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}
複製代碼

bossGroup這個線程池咱們傳入的 nThread是1,實際上在 bossGroup中只會有一個線程用於處理客戶端鏈接請求,因此這裏設置爲1,而不使用默認的線程數,至於爲何只用一個線程處理鏈接請求還需用線程池,在Stack Overflow有相關問題的討論。

而後回來再次進入父類MultithreadEventExecutorGroup的構造器,

......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
......
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
複製代碼

MultithreadEventExecutorGroup管理着 eventLoop的生命週期,它有幾個變量:

  • children: EventExecutor數組,保存eventloop
  • chooser: 線程選擇器,從children中選取一個 eventloop的策略

MultithreadEventExecutorGroup的構造器主要分爲如下幾個步驟:

  • 建立線程執行器——ThreadPerTaskExecutor
  • 調用 newChild方法初始化 children數組
  • 建立線程選擇器——chooser

建立ThreadPerTaskExecutor

咱們一開始初始化 NioEventLoopGroup,並無傳入 Executor參數:

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
複製代碼

因此到父類MultithreadEventExecutorGroup構造器時,executor 爲null, 而後執行:

if (executor == null) {
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
複製代碼

ThreadPerTaskExecutor是一個線程執行器,它實現了 Executor接口,

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
複製代碼

ThreadPerTaskExecutor 實現了 execute方法,每次經過調用execute方法執行線程任務

調用 newChild 方法初始化 children 數組

children[i] = newChild(executor, args);
複製代碼

在一個for循環裏,nThread線程數是總的循環次數,經過 newChild方法初始化 EventExecutor數組的每一個元素,而 newChild方法以下:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
複製代碼

每次循環經過newChild實例化一個NioEventLoop對象。

建立線程選擇器——chooser

public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}
......
private static boolean isPowerOfTwo(int val) {
    return (val & -val) == val;
}
複製代碼

根據EventExecutor[]數組的大小,採用不一樣策略初始化一個 Chooser,若是大小爲 2的冪次方則採用 PowerOfTwoEventExecutorChooser,不然使用 GenericEventExecutorChooser。 不管使用哪一個 chooser,它們的功能都是同樣的,即從 EventExecutor[]數組中,這裏也就是 NioEventLoop數組中,選擇一個合適的 NioEventLoop

NioEventLoop的初始化

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
複製代碼

從前面的分析就能夠看到,經過 newChild方法初始化 NioEventLoopGroup中的 NioEventLoop,下面來看下NioEventLoop的構造方法是怎樣的:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}
複製代碼

以前在 NioEventLoopGroup的構造器中經過 SelectorProvider.provider()建立了一個 SelectorProvider,這裏傳遞給了NioEventLoop中的provider,而NioEventLoop又經過 openSelector()方法獲取一個 selector對象,其實是經過 provideropenSelector方法。這不就是 對應Java NIO中的建立多路複用器 selector。(這裏只是簡單闡述NioEventLoop的構造方法,後續文章會對NioEventLoop作更加詳細的分析)

Channel的註冊過程

前面已經介紹了 Channel的建立和初始化過程,是在 initAndRegister方法中進行的,這個方法裏還會將初始化好的 channel註冊到 EventLoop線程中去

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
       .....
    }

    ChannelFuture regFuture = config().group().register(channel);
    ......
}
複製代碼

調用config().group().register方法將 channel註冊到 EventLoopGroup中去,其目的就是爲了實現NIO中把ServerSocketChannel註冊到 Selector中去,這樣就是能夠實現client請求的監聽,代碼以下:

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}
......
public EventLoop next() {
    return (EventLoop) super.next();
}
複製代碼

父類MultithreadEventExecutorGroup的next()方法,next方法使用 chooser策略從 EventExecutor[]數組中選擇一個 SingleThreadEventLoop

public EventExecutor next() {
    return chooser.next();
}
.....
public EventExecutor next() {
    return executors[idx.getAndIncrement() & executors.length - 1];
}
複製代碼

而後再執行 SingleThreadEventLoopregister()註冊方法:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
...
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
複製代碼

上面代碼調用了 unsafe的register方法,具體是AbstractUnsafe.register,而unsafe主要用於實現底層的 rergister,read,write等操做。該register方法是:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ......
        AbstractChannel.this.eventLoop = eventLoop;

        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            ......
        }
    }
複製代碼

將 eventLoop 賦值給 Channel 的 eventLoop 屬性,而後又調用了 register0()方法:

private void register0(ChannelPromise promise) {
        try {
            ......
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            registered = true;
            pipeline.invokeHandlerAddedIfNeeded();
            safeSetSuccess(promise);
            pipeline.fireChannelRegistered();
            if (isActive()) {
                if (firstRegistration) {
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    beginRead();
                }
            }
        } catch (Throwable t) {
            ......
        }
    }

複製代碼

上面有個關鍵方法就是 doRegister(),doRegister纔是最終Nio的註冊方法:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                eventLoop().selectNow();
                selected = true;
            } else {
                throw e;
            }
        }
    }
}
複製代碼

經過javaChannel().register(eventLoop().unwrappedSelector(), 0, this)將 Channel對應的Java NIO ServerSocketChannel註冊到 EventLoop中的Selector上,最終完成了channel向eventLoop的註冊過程。

這裏總結下 Channel註冊過程當中函數調用鏈: AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister()

添加 ChannelHandler

在以前的 initAndRegister()方法裏,裏面有個 init()方法,以下:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

複製代碼

init()方法中設置服務端自定義的 ChannelOptions,ChannelAttrs屬性和爲服務端Channel建立出來的新鏈接的Channel設置的自定義屬性ChildOptions,ChildAttrs,這裏就很少敘述設置參數的問題了,重點關注 pipelineaddLast方法,該方法就是添加用於處理出站和入站數據流的 ChannelHandler,而pipeline是從 channel中獲取的,以前分析過當建立channel時會自動建立一個對應的 channelPipeline

至於ChannelInitializer又是什麼,來看下它的類繼承結構圖就知道了:

channelHandler

ChannelInitializer是一個抽象類,實現了ChannelHandler接口,它有一個抽象方法initChannel,上面代碼實現了該方法而且添加了bootstrap的handler,邏輯以下:

.....
//1
ChannelHandler handler = config.handler();
if (handler != null) {
    pipeline.addLast(handler);
}
......
//2
public final ChannelHandler handler() {
    return bootstrap.handler();
}
......
//3
final ChannelHandler handler() {
    return handler;
}

複製代碼

initChannel添加的Handler就是咱們服務端代碼中 serverbootstrap設置的handler,以下:

b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 100)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) {
                 ChannelPipeline p = ch.pipeline();
                 //p.addLast(new LoggingHandler(LogLevel.INFO));
                 p.addLast(serverHandler);
             }
         });
複製代碼

示例代碼設置的handler爲LoggingHandler,用於處理日誌,這裏不細說。上面的initChannel方法能夠添加 Handler,這裏的serverbootstrap啓動類還增長了 childHandler方法,也是用來添加 handler,只不過是向已經鏈接的 channel客戶端的 channnelpipeline添加handler

serverbootstrap.handler()設置的handler在初始化就會執行,而 serverbootstrap.childHandler()設置的childHandler在客戶端鏈接成功纔會執行

小結

因爲自身知識與經驗有限,對Netty的服務端啓動源碼分析得不是很全面,在此過程當中也參考了一些大佬的Netty源碼分析文章,本文若有錯誤之處,歡迎指出。

參考資料 & 鳴謝

相關文章
相關標籤/搜索