netty : NioEventLoopGroup 源碼分析

NioEventLoopGroup 源碼分析

1. 在閱讀源碼時作了必定的註釋,而且作了一些測試分析源碼內的執行流程,因爲博客篇幅有限。爲了方便 IDE 查看、跟蹤、調試 代碼,因此在 github 上提供 netty 的源碼、詳細的註釋及測試用例。歡迎你們 star、fork !

2. 因爲我的水平有限,對源碼的分析理解可能存在誤差或不透徹的地方還請你們在評論區指出,謝謝!

   從今天開始,就準備進軍 ne tty 了,主要的想法是看看 netty4 中一些比較重要的實現,也就是能常常出如今咱們面前的東西。主要是: 線程池、通道、管道、編解碼器、以及經常使用的工具類。java

   而後如今看源碼應該不會像以前的 jdk 那麼細緻了,主要是看了一個類之後就發現 netty 對代碼封裝太強了,基本一個功能可能封裝了七八個類去實現,不少的抽象類可是這些抽象類中的功能還很是的多。因此說主要看這個流程,以及裏面寫的比較好的代碼或者比較新的思想會仔細的去看看。具體的子字段,每一個方法不可能作到那麼細緻。git

   好,正式開始 netty 源碼征戰 !github

1. 基本思路

   這裏首先講一下結論,也就是先說我看這個類的源碼整理出來的思路,主要就是由於這些類太雜,一個功能在好幾個類中才徹底實現。數組

   咱們在 new 一個 worker/boss 線程的時候通常是採用的直接使用的無參的構造方法,可是無參的構造方法他建立的線程池的大小是咱們 CPU 核心的 2 倍。緊接着就須要 new 這麼多個線程放到線程池裏面,這裏的線程池採用的數據結構是一個數組存放的,每個線程須要設置一個任務隊列,顯然任務隊列使用的是一個阻塞隊列,這裏實際採用的是 LinkedBlockQueue ,而後回想一下在 jdk 中的線程池是否是還有一個比較重要的參數就是線程工廠,對的!這裏也有這個東西,他是須要咱們手動傳入的,可是若是不傳則會使用一個默認的線程工廠,裏面有一個 newThread 方法,這個方法實現基本和 jdk 中的實現如出一轍,就是建立一個級別爲 5 的非 Daemon 線程。對這就是咱們在建立一個線程池時候完成的所有工做!安全

   好如今來具體說一下,咱們每次建立的是 NioEventLoopGroup 可是他又繼承了 n 個類才實現了線程池,也就是線程池的祖先是 ScheduledExecutorService 是 jdk 中的線程池的一個接口,其中裏面最重要的數據結構就是一個 children 數組,用來裝線程的。數據結構

   而後具體的線程他也是進行了封裝的,也就是咱們常看到的 NioEventLoop 。這個類裏面有兩個比較重要的結構:taskQueue 和 thread 。很明顯這個很是相似 jdk 中的線程池。併發

2. NioEventLoopGroup 線程池分析

   首先要建立線程池,傳入的線程數爲 0,他是一直在調用 this() 最後追溯到 super(nThreads,threadFactory,selectorProvider) 也就是使用了 MultithreadEventLoopGroup 的構造方法,在這一步肯定了當傳入的線程數爲 0 時應該設置的線程數爲 CPU 核心的兩倍。而後再次上調,調用了 MultithreadEventExecutorGroup 的構造方法,在這裏纔是真正的開始了線程池的初始化。less

   首先設置了線程池工廠,而後初始化 chooser ,接着建立 n 個線程放到 children 數組中,最後設置線程中斷的監聽事件。ide

/** * 這個方法流程: * 一、設置了默認的線程工廠 * 二、初始化 chooser * 三、建立nTreads個NioEventLoop對象保存在children數組中 * 四、添加中斷的監聽事件 * @param nThreads * @param threadFactory * @param args */
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        // 默認使用線程工廠是 DefaultThreadFactory
        if (threadFactory == null) {
            threadFactory = newDefaultThreadFactory();
        }

        children = new SingleThreadEventExecutor[nThreads];
        // 二的平方的實現是看 n&-n==n
        //根據線程個數是否爲2的冪次方,採用不一樣策略初始化chooser
        if (isPowerOfTwo(children.length)) {
            chooser = new PowerOfTwoEventExecutorChooser();
        } else {
            chooser = new GenericEventExecutorChooser();
        }
        //產生nTreads個NioEventLoop對象保存在children數組中
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(threadFactory, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 沒成功,把已有的線程優雅關閉
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    // 沒有徹底關閉的線程讓它一直等待
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        // 對每個 children 添加中斷線程時候的監聽事件,就是將 terminatedChildren 自增
        // 判斷是否到達線程總數,是則更新 terminationFuture
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }
複製代碼

   其中有一個 if 分支用來初始化 chooser ,這個 chooser 就是用來選擇使用哪一個線程來執行哪些操做的。這裏用到了判斷一個數是否爲 2 的次冪的一個方法 isPowerOfTwo() 實現比較有意思,貼出來。工具

private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
複製代碼

   接下來目光要轉向 newChild(threadFactory, args) ,由於在這個類裏面這個方法是抽象的,在 NioEventLoopGroup 獲得了實現。其實看到了也很是的簡單粗暴,直接 new 了一個 NioEventLoop ,接下來就應該分析這個線程的包裝類了。

@Override
    protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception {
        // 這裏纔是重點 也就是真正的線程 被放在本身的 children 數組中
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
    }
複製代碼

3. NioEventLoop 線程分析

   上面已經看到了,newChild 方法就是 new 了一個 NioEventLoop 。因此有必要好好看看這個線程包裝類。

   這個類的構造方法是調用了父類 SingleThreadEventLoop 的構造,接着繼續上調 SingleThreadEventExecutor 構造,在這個類中才真正的實現了線程的構造。裏面就作了兩件事 :

  1. new 了一個新的線程,新的線程還分配了一個任務,任務的內容就是調用本類中的一個 run 方法,在 NioEventLoop 中實現。

  2. 設置任務隊列爲 LinkedBlockQueue

/** * 構造方法主要完成了: * 一、new 一個新的線程執行一個 run 方法 * 二、用 LinkedBlockQueue 初始化 taskQueue * @param parent * @param threadFactory * @param addTaskWakesUp */
    protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }

        this.parent = parent;
        this.addTaskWakesUp = addTaskWakesUp;
        // new 了一個新的線程
        thread = threadFactory.newThread(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 調用一個 run 方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // 讓線程關閉
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }
                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error(
                                "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn("An event executor terminated with non-empty task queue (" + taskQueue.size() + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });

        // 使用 LinkedBlockQueue 初始化 taskQueue
        taskQueue = newTaskQueue();
    }
複製代碼

   而後看一下他要執行的 run 方法在 NioEventLoop 中獲得了實現。

/** *'wakenUp.compareAndSet(false, true)' 通常都會在 select.wakeUp() 以前執行 * 由於這樣能夠減小 select.wakeUp() 調用的次數,select.wakeUp() 調用是一個代價 * 很高的操做 * 注意:若是說咱們過早的把 wakenUp 設置爲 true,可能致使線程的競爭問題,過早設置的情形以下: 1) Selector is waken up between 'wakenUp.set(false)' and 'selector.select(...)'. (BAD) 2) Selector is waken up between 'selector.select(...)' and 'if (wakenUp.get()) { ... }'. (OK) 在第一種狀況中 wakenUp 被設置爲 true 則 select 會馬上被喚醒直到 wakenUp 再次被設置爲 false 可是wakenUp.compareAndSet(false, true)會失敗,而且致使全部但願喚醒他的線程都會失敗致使 select 進行沒必要要的休眠 爲了解決這個問題咱們是在 wakenUp 爲 true 的時候再次對 select 進行喚醒。 */

    @Override
    protected void run() {
        for (;;) {
            // 獲取以前的線程狀態,並讓 select 阻塞
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
                // 有任務在線程建立以後直接開始 select
                if (hasTasks()) {
                    selectNow(); //直接調用了 select 的 selectNow 而後再次喚醒同下面的代碼
                // 沒有任務
                } else {
                    // 自旋進行等待可進行 select 操做
                    select(oldWakenUp);
                    // 再次喚醒,解決併發問題
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                // 都是處理 selected 的通道的數據,並執行全部的任務,只是在 runAllTasks 傳的參數不一樣
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();
                    processSelectedKeys();
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);
                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }
複製代碼

   緊接着就是分析這個 run 方法,也就是線程在被建立以後進行的一系列操做。裏面主要作了三件事:

  1. 進行 select
  2. 處理 selectedKeys
  3. 喚醒隊列中全部的任務

   上面的操做都是在一個循環裏面一直執行的,因此說 NioEventLoop 這個線程的做用就只有一個那就是:進行任務處理。在這個線程被 new 出來時咱們就給他分配了線程的任務就是永不停歇的進行上面的操做。

   上面的過程說的是有線程安全問題,也就是若是咱們過早的把 wakenUp 設置爲 true,咱們的 select 就會甦醒過來,而其餘的線程不清楚這種狀態想要設置爲 wakenUp 的時候都會失敗,致使 select 休眠。主要感受有點是由於這個東西不是線程間可見的,要是採用 volatile 可能就會解決這個問題,可是 wakenUp 是 final 的不能使用 volatile 關鍵字修飾。因此做者採用的解決方案就是再次手動喚醒,防止因爲其餘線程併發設置 wakenUp 的值致使的沒必要要的休眠。

   而後要說一下 select 方法,這個方法的調用主要由於在隊列中沒有任務,因此就暫時不用 select ,這個方法裏面作的就是自旋的去 select ,沒有任務就 等待一段時間再去 select。

/** * 這個方法主要乾的事情: * 一、若是不須要等待就直接 select * 二、須要等待則等一個超時時間再去 select * 這個過程是不停進行的也就是死循環直達有任務可進行 select 時 select 完畢退出循環 * @param oldWakenUp * @throws IOException */
    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                // 不用等待進行一次 select 操做
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                // 等一個超時再去選擇
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            }
            // Harmless exception - log anyway
        }
    }
複製代碼

   接着就是 processSelectedKeys();runAllTasks(); 這兩個方法,前一個方法不說就是和咱們寫 Nio 的時候的步驟差很少,遍歷 selectedKeys 處理,而後 runAllTasks() 執行全部的任務的 run 方法。

protected boolean runAllTasks() {
        fetchFromDelayedQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

        // 這個循環就是用來循環任務隊列中的全部任務
        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            task = pollTask(); // 循環條件
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                return true;
            }
        }
    }
複製代碼

4. 總結

   好了其實到這裏線程池其實分析的已經差很少了,對於不少的細節問題並無仔細的去看,單絲咱們清楚流程以及裏面的結構基本就差很少了。

   在 NioEventLoopGroup 中包裝了 NioEventLoop 線程任務。具體包裝在了 children 數組中,而後使用 newThread 工廠建立線程,接着給線程分配任務,任務就是進行 select 操做。

相關文章
相關標籤/搜索