Netty源碼分析之NioEventLoop(一)—NioEventLoop的建立

1、NioEventLoop的概述數組

NioEventLoop作爲Netty線程模型的核心部分,從本質上講是一個事件循環執行器,每一個NioEventLoop都會綁定一個對應的線程經過一個for(;;)循環來處理與 Channel 相關的 IO 操做, 包括 調用 select 等待就緒的 IO 事件、讀寫數據與數據的處理等;其次做爲任務隊列, 執行 taskQueue 中的任務, 例如eventLoop.schedule 提交的定時任務也是這個線程執行的。而NioEventLoopGroup顧名思義,它是維護了一組這樣的事件循環器,這也是Netty基於Reactor模式的具體設計體現。app

接下來咱們就結合具體的代碼,對NioEventLoop的整個建立流程進行一個說明與總結ide

2、NioEventLoop的建立函數

咱們基於Netty構建服務端仍是客戶端時,都首先須要建立NioEventLoopGroup 實例oop

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

 NioEventLoopGroup 作爲基於NIO的處理channle相關IO操做的事件循環器組,它的類層次結構以下優化

經過NioEventLoopGroup構造函數傳入線程數量this

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

 NioEventLoopGroup最終的構造函數中會包含如下幾個函數spa

一、nThreads:傳入的線程數量.net

二、executor :線程執行器Executor接口,默認爲空線程

三、selectorProvider:用於建立Selector的SelectorProvider 

四、selectStrategyFactory:傳入DefaultSelectStrategyFactory.INSTANCE,  一個使用默認選擇策略的工廠。

五、RejectedExecutionHandlers.reject():Netty自定義線程拒絕策略

public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

 在父類MultithreadEventLoopGroup中,會根據你傳入nThreads大小,肯定初始化的線程數量,爲0且沒有設置io.netty.eventLoopThreads參數項,則會以當前系統的核心線程數*2作爲默認的線程數量

    static {
        //若是沒有設置io.netty.eventLoopThreads參數項,則會以當前運行系統的核心線程數*2做爲線程數
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

接下來在MultithreadEventExecutorGroup的構造函數中咱們會根據傳入的線程數,去初始化和建立一組NioEventLoop

首先咱們看下NioEventLoop的類層次結構

 

下面在MultithreadEventExecutorGroup構造函數中主要完成如下幾個功能:

一、初始化ThreadPerTaskExecutor線程執行器,並傳入一個線程建立工廠,用於NioEventLoop對應線程的建立

二、根據傳入的線程數,初始化一個EventExecutor數組,用於放置建立的NioEventLoop對象

三、循環數組,經過newChild方法建立NioEventLoop對象。

    /**  
     * Create a new instance.
     *
     * @param nThreads          the number of threads that will be used by this instance.
     * @param executor          the Executor to use, or {@code null} if the default should be used.
     * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            // 建立線程工廠,netty根據須要指定了線程的命名方式、優先級、是不是守護線程等屬性
            // 該線程池沒有任何隊列,提交任務後,建立任何線程類型都是 FastThreadLocalRunnable, 而且當即start。
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //初始化一組事件循環執行器
        children = new EventExecutor[nThreads];

        //根據傳入的線程數,初始化一個線程數組
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 建立 new NioEventLoop
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

 繼續跟蹤進入newChild(executor, args)內部,看到它會返回一個NioEventLoop對象

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

繼續查看NioEventLoop構造函數和他的父類構造函數

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

父類構造函數

    /**
     * Create a new instance
     *
     * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
     * @param executor          the {@link Executor} which will be used for executing
     * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
     *                          executor thread
     * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
     * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
     */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }

經過上面的代碼咱們能夠看到,初始化NioEventLoop主要完成了如下的功能

一、保存線程執行器ThreadPerTaskExecutor

二、建立一個selector 

三、基於LinkedBlockingQueue建立一個taskQueue任務隊列,用於保存要執行的任務

這些都是爲了後續的循環執行Channel 相關事件所作準備。

到這裏其實咱們建立了一組NioEventLoop,也就是一組事件循環執行器,每一個NioEventLoop中都有對應的一個線程和一個selector ;建立完畢以後,天然就是要爲每個鏈接分配對應的NioEventLoop。Netty中經過

實現EventLoopGroup接口中的next()方法來返回一個可使用的的NioEventLoop

public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    @Override
    EventLoop next();
}

在MultithreadEventExecutorGroup中咱們能夠查看它的具體實現方式

   chooser = chooserFactory.newChooser(children);
    
   @Override
    public EventExecutor next() {
        return chooser.next();
    }

進入代碼內部咱們能夠看到Netty針對數組大小,對數組下標的計算方式進行了優化

/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //判斷是不是二的次冪,若是爲true返回PowerOfTwoEventExecutorChooser,反之GenericEventExecutorChooser
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        //經過&運算的方式循環獲取數組下標
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        //經過取模的方式循環獲取數組下標
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

到此咱們基本把Netty中NioEventLoop及NioEventLoopGroup的建立流程及核心代碼梳理了一遍。NioEventLoop作爲Netty線程模型的核心部分包含的內容比較多,上面只是初始化及建立的一部份內容,後續的部分我會陸續的補齊,其中有錯誤和不足之處還請指正與海涵。

相關文章
相關標籤/搜索