Netty4.x 源碼實戰系列(五):深刻淺出學NioEventLoopGroup

咱們都知道Netty的線程模型是基於React的線程模型,而且咱們都知道Netty是一個高性能的NIO框架,那麼其線程模型一定是它的重要貢獻之一。數組

在使用netty的服務端引導類ServerBootstrap或客戶端引導類Bootstrap進行開發時,都須要經過group屬性指定EventLoopGroup, 由於是開發NIO程序,因此咱們選擇NioEventLoopGroup。框架

接下來的兩篇文章,我將從源碼角度爲你們深刻淺出的剖析Netty的React線程模型工做機制。ide

本篇側重點是NioEventLoopGroup。oop

首先咱們先回顧一下,服務端初始化程序代碼(省略非相關代碼):性能

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap(); 
    b.group(bossGroup, workerGroup);
    
    ... // 已省略非相關代碼
    
     // 偵聽8000端口
     ChannelFuture f = b.bind(8000).sync(); 
    
     f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

在分析源碼以前,咱們先看看NioEventLoopGroup的類繼承結構圖:this

圖片描述

初始化bossGroup及workerGroup時,使用了NioEventLoopGroup的無參構造方法,本篇將今後無參構造入手,詳細分析NioEventLoopGroup的初始化過程。spa

首先咱們看看NioEventLoopGroup的無參構造方法:.net

public NioEventLoopGroup() {
    this(0);
}

其內部繼續調用器構造方法,並指定線程數爲0:線程

public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

繼續調用另外一個構造方法,指定線程爲0,且Executor爲null:debug

public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

在此構造中,它會指定selector的輔助類 "SelectorProvider.provider()",咱們繼續查看它的調用:

public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}

此構造方法中,初始化了一個默認的選擇策略工廠,用於生成select策略:

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

通過上面一系列的構造方法調用,此時個參數值對應以下:
nThreads:0
executor: null
selectorProvider: SelectorProvider.provider()
selectStrategyFactory: DefaultSelectStrategyFactory.INSTANCE
以及指定了拒絕策略RejectedExecutionHandlers.reject()

接着其會調用父類MultithreadEventLoopGroup的MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)構造方法

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

此構造方法主要作了一件事,就是當指定的線程數爲0時,使用默認的線程數DEFAULT_EVENT_LOOP_THREADS,此只是在MultithreadEventLoopGroup類被加載時完成初始化

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

因此根據代碼,咱們得出,若是初始化NioEventLoopGroup未指定線程數時,默認是CPU核心數*2

接着繼續調用父類MultithreadEventExecutorGroup的MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args)構造方法

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

在此構造方法中,咱們指定了一個EventExecutor的選擇工廠DefaultEventExecutorChooserFactory,此工廠主要是用於選擇下一個可用的EventExecutor, 其內部有兩種選擇器, 一個是基於位運算,一個是基於普通的輪詢,它們的代碼分別以下:

基於位運算的選擇器PowerOfTwoEventExecutorChooser

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

基於普通輪詢的選擇器GenericEventExecutorChooser

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

咱們接着回到剛剛的構造器,其內部會繼續調用MultithreadEventExecutorGroup的另外一個構造方法,此構造方法是NioEventLoopGroup的核心代碼

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
    
    // 爲了便於代碼剖析,以省略非相關代碼
    
    // 初始化executor
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 初始化EventExecutor
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
       
            children[i] = newChild(executor, args);
         
    }

    // 生成選擇器對象
    chooser = chooserFactory.newChooser(children);
}

此構造方法主要作了三件事:
一、初始化executor爲ThreadPerTaskExecutor的實例:
經過前面的構造方法調用,咱們知道executor爲null,因此在此構造方法中,executor會被初始化爲ThreadPerTaskExecutor實例。咱們看一下ThreadPerTaskExecutor的源碼:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

經過ThreadPerTaskExecutor 的代碼發現,ThreadPerTaskExecutor 實現了Executor接口,其內部會經過newDefaultThreadFactory()指定的默認線程工廠來建立線程,並執行相應的任務。

二、初始化EventExecutor數組children
在MultithreadEventExecutorGroup的構造方法中咱們看到,EventExecutor數組children初始化時是經過newChild(executor, args)實現的,而newChild的在MultithreadEventExecutorGroup中是個抽象方法

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

根據最開始的類繼承結構圖,咱們在NioEventLoopGroup中找到了newChild的實現

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

因此今後newChild的實現中,咱們能夠看出MultithreadEventExecutorGroup的children,其實就是對應的一組NioEventLoop對象。 關於NioEventLoop下一篇會做詳細介紹。

三、根據咱們指定的選擇器工廠,綁定NioEventLoop數組對象

chooser = chooserFactory.newChooser(children);

在前面的構造方法中,咱們指定了chooserFactory爲DefaultEventExecutorChooserFactory,在此工廠內部,會根據children數組的長度來動態選擇選擇器對象,用於選擇下一個可執行的EventExecutor,也就是NioEventLoop。

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
    if (isPowerOfTwo(executors.length)) {
        return new PowerOfTwoEventExecutorChooser(executors);
    } else {
        return new GenericEventExecutorChooser(executors);
    }
}

至此,NioEventLoopGroup初始化完成了。

經過上面的代碼分析,在NioEventLoopGroup初始化的過程當中,其實就是初始化了一堆可執行的Executor數組,而後根據某種chooser策略,來選擇下一個可用的executor。

咱們再回顧總結一下:
一、NioEventLoopGroup初始化時未指定線程數,那麼會使用默認線程數,
即 線程數 = CPU核心數 * 2;
二、每一個NioEventLoopGroup對象內部都有一組可執行的NioEventLoop(NioEventLoop對象內部包含的excutor對象爲ThreadPerTaskExecutor類型)
三、每一個NioEventLoopGroup對象都有一個NioEventLoop選擇器與之對應,其會根據NioEventLoop的個數,動態選擇chooser(若是是2的冪次方,則按位運算,不然使用普通的輪詢)

因此經過上面的分析,咱們得出NioEventLoopGroup主要功能就是爲了選擇NioEventLoop,而真正的重點就在NioEventLoop中,它是整個netty線程執行的關鍵。

下一篇咱們將詳細介紹NioEventLoop,敬請期待。

相關文章
相關標籤/搜索