Netty源碼分析(四):EventLoopGroup

不管服務端或客戶端啓動時都用到了NioEventLoopGroup,從名字就能夠看出來它是NioEventLoop的組合,是Netty多線程的基石。git

類結構

NioEventLoopGroup繼承自 MultithreadEventLoopGroup,多提供了兩個方法 setIoRatiorebuildSelectors,一個用於設置 NioEventLoop用於IO處理的時間佔比,另外一個是從新構建Selectors,來處理epoll空輪詢致使CPU100%的bug。這個兩個的用處在介紹 NioEventLoop的時候在詳細介紹。其它的方法都在接口中有定義,先看下 EventExecutorGroup

EventExecutorGroup

EventExecutorGroup繼承自ScheduledExecutorServiceIterable。這意味着EventExecutorGroup擁有定時處理任務的能力,同時自己能夠迭代。它提供的方法有:github

/**
     * 是否全部事件執行器都處在關閉途中或關閉完成
     */
    boolean isShuttingDown();

    /**
     * 優雅關閉
     */
    Future<?> shutdownGracefully();

    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    /**
     * 返回線程池終止時的異步結果
     */
    Future<?> terminationFuture();

    void shutdown();

    List<Runnable> shutdownNow();

    /**
     * 返回一個事件執行器
     */
    EventExecutor next();
複製代碼

其中shutdownshutdownNow被標記爲過期,不建議使用。EventExecutorGroup還重寫ScheduledExecutorService接口的方法,用於返回自定義的Future數組

EventLoopGroup

EventLoopGroup繼承自EventExecutorGroup,它和EventExecutorGroup想比多了註冊ChannelChannelPromise,同時從新next方法返回EventLoopbash

MultithreadEventExecutorGroup

建立NioEventLoopGroup時,最終都會調用MultithreadEventExecutorGroup的構造方法。多線程

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        // 線程數必須大於0
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
        // 沒指定Executor就建立新的Executor
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 建立EventExecutor數組
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i++) {
            // 建立結果標識
            boolean success = false;
            try {
                // 建立EventExecutor對象
                children[i] = newChild(executor, args);
                // 設置建立成功
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 建立失敗,關閉全部已建立的EventExecutor
                if (!success) {
                    // 關閉全部已建立的EventExecutor
                    for (int j = 0; j < i; j++) {
                        children[j].shutdownGracefully();
                    }
                    // 確保全部已建立的EventExecutor已關閉
                    for (int j = 0; j < i; j++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        // 建立EventExecutor選擇器
        chooser = chooserFactory.newChooser(children);
        // 建立監聽器,用於EventExecutor終止時的監聽
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                // 當EventExecutor所有關閉時
                if (terminatedChildren.incrementAndGet() == children.length) {
                    // 設置結果,並通知監聽器們。
                    terminationFuture.setSuccess(null);
                }
            }
        };
        // 給每一個EventExecutor添加上監聽器
        for (EventExecutor e : children) {
            e.terminationFuture().addListener(terminationListener);
        }
        // 建立只讀的EventExecutor集合
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
複製代碼

整個構造方法作的就是EventExecutor的建立,包括建立的異常處理,成功通知等。 AbstractEventExecutorGroupMultithreadEventLoopGroupNioEventLoopGroup內部沒有特殊之處,就不拓展了。異步

文中帖的代碼註釋全在:KAMIJYOUDOUMA, 有興趣的童鞋能夠關注一下。ide


本篇到此結束,若是讀完以爲有收穫的話,歡迎點贊、關注、加公衆號【貳級天災】,查閱更多精彩歷史!!! oop

相關文章
相關標籤/搜索