Netty核心概念(8)之Netty線程模型

1.前言

 第7節初步學習了一下Java本來的線程池是如何工做的,以及Future的爲何可以達到其效果,這些知識對於理解本章有很大的幫助,不瞭解的能夠先看上一節。java

 Netty爲何會高效?回答就是良好的線程模型,和內存管理。在Java的NIO例子中就我將客戶端的操做單獨放在一個線程中處理了,這麼作的緣由在於若是將客戶端鏈接串起來,後來的鏈接就要等前一個處理完,固然這並不意味着多線程比單線程有優點,而是在於每一個客戶端都須要進行讀取準備好的緩存數據,再執行一些業務邏輯。若是業務邏輯耗時好久,那麼順序執行的方式沒有多線程優點大。另外一個方面目前多核CPU很常見了,多線程是個不錯的選擇。這些在第一節就說明過,也提到過NIO並非提高了IO操做的速度,而是減小了CPU的浪費時間,這些概念不能搞混。數組

 本節不涉及內存管理,只介紹相關的線程模型。promise

2.核心類

 上圖就是咱們須要關注的體系內容了,主要從EventExecutorGroup開始往下看,再上層的父接口是JDK提供的併發包內的內容,基礎是線程池中能夠執行週期任務的線程池服務。因此從這咱們能夠知道Netty能夠實現週期任務,好比心跳檢測。接口定義下面將逐一介紹。緩存

2.1 EventExecutorGroup

  isShuttingDown():是否正在關閉,或者是已經關閉。安全

  shutdownGracefully():優雅停機,等待全部執行中的任務執行完成,並再也不接收新的任務。多線程

  terminationFuture():返回一個該線程池管理的全部線程都terminated的時候觸發的future。併發

  shutdown():廢棄了的關閉方法,被shutdownGracefully取代。app

  next():返回一個被該Group管理的EventExecutor。ide

  iterator():全部管理的EventExecutor的迭代器。oop

  submit():提交一個線程任務。

  schedule():週期執行一個任務。

 上述方法基本上是對週期線程池的一個封裝,可是擴展了EventExecuotr概念,即分了若干個小組,處理事件。另一個比較實用的就是優雅停機了。

2.2 EventLoopGroup

 EventLoopGroup中的方法不多,其主要是和channel結合了,就多了一個將channel註冊到線程池中的方法。

2.3 EventExecutor

 EventExecutor繼承自EventExecutorGroup,這個以前也提到過該類,至關於Group中的一個子集。

  next():就是找group中下一個子集

  parent():就是所屬group

  inEventLoop():當前線程是不是在該子集中

  newXXX():這個是下一節內容,此處不介紹。

2.4 EventLoop

 該接口就一個方法,就是parent();EventLoop和EventLoopGroup與EventExecutor和EventExecutorGroup是一組類似的概念。瞭解這些就能夠了。

3 實現細節

 EventLoop和EventLoopGroup的實現十分簡單,簡單看下就能夠了,這裏介紹幾個重要的實現類。

3.1 AbstractEventExecutor

 該類繼承自上節說過的AbstractExecutorService,其最重要的是execute方法未實現。該類是對AbstractExecutorService的一個進一步加工,添加了group的概念,和不一樣的Future建立方法。這裏不要被以前的Java線程池模型所幹擾,其不必定是線程池。回到上一節線程池的介紹,最終的樣子都是Execute方法決定的。

3.2 AbstractScheduledEventExecutor

 該類是對AbstractEventExecutor的一個進一步實現,其實現了週期任務的執行。原理是內部持有一個優先隊列ScheduledFutureTask。全部週期任務都添加到這個隊列中,也實現了取出週期任務的方法,可是該抽象類並無具體執行週期任務的實現。

3.3 SingleThreadEventExecutor

 該類是對AbstractScheduledEventExecutor的一個實現,其基本上是咱們最終的一個EventLoop的雛形了,不少不一樣協議的EventLoop都是基於它實現的。

 雖然名字叫作單線程執行器,可是其不必定是單個線程。Executor默認使用的是ThreadPerTaskExecutor,其executor會爲每個任務建立一個線程並執行,固然你也能夠傳入本身的executor。Queue使用的是LinkedBlockingQueue,無容量限制的任務隊列。其提供了添加任務到任務隊列,從任務隊列中獲取任務的方法。

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {
            return false;
        }
        for (;;) {
            safeExecute(task);
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                return true;
            }
        }
    }

 執行過程如上:1.先獲取全部的週期任務,放入taskQueue;2.不斷的執行taskQueue中的任務;3.afterRunningAllTasks就是一個自由發揮的方法。safeExecute就是直接執行run方法。

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

 上面是該Executor初始化過程,run方法又是交給子類進行初始化了。

    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        if (quietPeriod < 0) {
            throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
        }
        if (timeout < quietPeriod) {
            throw new IllegalArgumentException(
                    "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        if (isShuttingDown()) {
            return terminationFuture();
        }

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            wakeup = true;
            oldState = state;
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        newState = oldState;
                        wakeup = false;
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);

        if (oldState == ST_NOT_STARTED) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_TERMINATED);
                terminationFuture.tryFailure(cause);

                if (!(cause instanceof Exception)) {
                    // Also rethrow as it may be an OOME for example
                    PlatformDependent.throwException(cause);
                }
                return terminationFuture;
            }
        }

        if (wakeup) {
            wakeup(inEventLoop);
        }

        return terminationFuture();
    }

 上面是一個優雅停機的過程,改變該Executor的狀態成ST_SHUTTING_DOWN,這裏要注意addTask的時候只有shutdown狀態纔會拒絕,因此此時這裏的邏輯還不會拒絕新任務添加,而後返回了一個terminationFuture,這裏不作介紹。

3.4 SingleThreadEventLoop

 此類繼承自上面講解的SingleThreadEventExecutor,這裏多了一個tailTask隊列,用於每次事件循環後置任務處理,暫且無論。重要的在於很早提到了register方法,將channel註冊到線程中。

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

 實際上就是生成了一個DefaultChannelPromise,將channel和線程綁定,最後都放入了unsafe對象中。

3.5 NioEventLoop

 上面講了一些雜亂無章的內容,這裏藉助NioEventLoop好好梳理一下整個設計流程。NioEventLoop繼承自SingleThreadEventLoop,先對以前的相關研究進行總結:

  1.EventExecutorGroup接口是繼承自Java的週期任務接口,是一個事件處理器組的概念,其相關方法有:

    是否正在關閉;優雅停機;獲取一個事件處理器;提交一個任務;提交一個週期任務

  2.EventExecutor接口是事件處理器,其繼承自EventExecutorGroup,目的並非說每個事件處理器都是一個事件處理器組,而是爲了複用接口定義的方法。每一個處理器都應該具有優雅停機,提交任務,判斷是否關閉的方法,其它方法有:

    獲取處理器組;獲取下一個處理器(覆蓋了父接口的next方法);判斷是否在事件循環內;建立Promise、ProgressivePromise、SucceededFuture、FailedFuture

  3.EventLoopGroup繼承自EventExecutorGroup,更新了父接口的含義,EventExecutor的定位是處理單個事件,group就是處理事件組了。EventLoop的定位是處理一個鏈接的生命週期過程當中的週期事件,group是多個EventLoop的集合了。這裏又有一個尷尬的地方,group按照定義本不須要定義其它方法,可是因爲Server端的設計(以前說過服務端的channel也是一個線程),使用的是group,因此group必須承擔單個EventLoop的職責。最終添加了額外的方法:

    獲取下一個EventLoop;註冊channel;

  4.EventLoop,事件循環,其也是一個處理器,最終繼承自EventExecutor和EventLoopGroup,方法只有一個:

    獲取父事件循環組EventLoopGroup。

 上述接口光看名稱很容易陷入誤解,實際上定義是想將單個loop和group分離,可是實現上因爲Server端包含一個服務端監聽鏈接線程,一個客戶端鏈接線程,其Group承擔了單個的職責,因此定義了一些本該由單個執行器處理的方法,又爲了複用方法,致使loop繼承了group,這樣看起來怪怪的,接口理解起來就混亂了。結合上面的描述,再看一遍繼承圖就更清楚了:

 理解了上面的設定,咱們再來看看客戶端的事件處理是如何設計的,即總結上訴抽象類作了哪些事情。

  1. AbstractEventExecutor:入參就一個parent,該類完成了一個基本處理:

    a.將next設置成本身(上面說過繼承的group,這個操做就和group區分開了)。

    b.優雅停機調用的是帶有超時的停機方案,超時爲15秒

    c.覆蓋了Java提供的newTask包裝成FutureTask的方法,使用了本身的PromiseTask

    d.提供安全執行方法:safeExecute,直接調用的run方法

   該類是最基礎的一個抽象類,基本做用就是與group在定義混亂上作了一個區分。提供了執行器與Future關聯方法和一個基本的執行任務的方法。

  2.AbstractScheduledEventExecutor:入參也是一個parent,該類對AbstractEventExecutor未處理的週期任務提供了具體的完成方法:

    a.提供計算當前距服務啓動的時間

    b.提供存儲ScheduledFutureTask的優先隊列

    c.提供了取消全部週期任務的方法

    d.提供了獲取一個符合的週期任務的方法,要知足時間,並獲取後移除

    e.提供了獲取距最近一個週期任務的時間多久

    f.提供了移除一個週期任務的方法

    g.提供添加週期任務的方法

   該類提供了週期任務執行的一些基本方法,涉及添加週期任務,移除,獲取等方法。

  3.SingleThreadEventExecutor:入參包括parent,addTaskWakesUp標誌,maxPendingTasks最大任務隊列數(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)參數更大的那個值),executor執行器(默認是ThreadPerTaskExecutor,每一個任務建立一個線程執行),taskQueue任務隊列(默認是LinkedBlockingQueue),rejectedExecutionHandler拒絕任務的處理類(默認直接拋出RejectedExecutionException)。該類主要完成了一個單線程的EventExecutor的基本操做:

    a.建立一個taskQueue

    b.中斷線程

    c.從任務隊列中獲取一個任務,takeTask連同週期任務也會獲取

    d.添加任務到任務隊列

    e.移除任務隊列中指定任務

    f.運行全部任務,會先將週期任務存入taskQueue,再使用safeExecute方法執行任務

    g.實現了execute方法,會添加任務到任務隊列,若是當前線程不是事件循環線程,開啓一個線程。經過的就是持有的executor來開啓的線程任務。execute方法調用了run方法,該類沒有實現run方法。任務的添加都不是經過execute直接執行了,而是走的添加任務到taskQueue,由未實現的run線程來處理這些事件。

    h.優雅停機

   這樣就有了一個基礎的單線程模型了,開啓線程,保存,取出任務的方法都有了,只有在開啓線程中執行任務的run()方法還未實現。

  4.SingleThreadEventLoop:入參和SingleThreadEventExecutor一致,不一樣的是多了一個tailTasks。該類主要是針對netty自身的事件循環的定義來實現方法了:

    a.註冊channel,其實是生成了一個DefaultChannelPromise對象,持有了channel,和運行該channel的EventExecutor,而後將該對象交給最底層的unsafe處理。

    b.添加一個事件週期結束後執行的尾任務tailTasks

    c.執行尾任務

    d.刪除指定尾任務

   該類就很簡單,沒有過多的內容,只是增長了一個每一個事件週期後執行的任務而已。

 回顧完了,上面4個父類構建了一個基本的帶定時任務,普通任務,事件循環後置任務的EventLoop,每一個channel綁定了一個線程執行器,經過DefaultChannelPromise持有二者,最終交給Unsafe操做。子類只須要實現run方法,處理任務隊列中的任務。下面就是重頭戲NioEventLoop這個客戶端的線程是如何設計的了:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

 調用的就是父類方法,不過在建立EventLoop的時候建立了selector,這個是NIO中也提到過的。該EventLoop是在Group中newChild建立的。

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

 上面是咱們須要關注的run方法,該方法被單獨的線程執行。經過一個策略來判斷執行哪一個,默認的策略是任務隊列中有任務,select執行一下,返回當前的事件數,沒任務返回SelectStrategy.SELECT。簡單的說就是有任務就補充新到來的事件執行全部的任務,沒任務就執行新到的事件。先處理IO讀寫任務,再處理其餘任務,ioRation設置的耗時比例是IO任務佔一個執行週期的百分比,默認50,意思是IO執行了50秒,其餘任務也會獲得50秒的執行時間。後續操做就是獲取全部select的key,執行全部的任務了。這裏就有一個判斷若是是停機狀態,就會closeAll(),以前說優雅停機的時候就是設置了一個這個標誌,最後是在執行任務以後判斷。processSelectedKey環節都交給unsafe類完成了,這裏就掛上了handler相關觸發,handler的執行也就說明都在該線程內了。

 上面的描述雖然把整個過程都關聯上了,可是最主要的問題仍是混亂的:如何作到一個channel建立一個線程的?上面只是說明了channel和EventExecutor是綁定在DefaultChannelPromise並交給了Unsafe類,並無看到是如何建立線程的。並且另外一個問題在於,processSelectedKey是選擇了全部的key,這不是全部的channel共享了一個線程嗎?

 要解決該問題要回到Bootstrap的channel創建過程:initAndRegister()方法中,經過channelFactory建立了一個channel對象,然後ChannelFuture regFuture = config().group().register(channel);主要就是觀察register方法,該類是設置的線程池NioEventLoopGroup提供的方法,其繼承的是MultithreadEventLoopGroup,是調用了next方法獲取的EventLoop,最後接上上面channel和eventloop綁定的內容。next中獲取的EventLoop早在類初始化的時候就生成了,在構造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不過是挑選了一個線程池而已,默認數量是CPU核數的2倍。這個也就是前面說的,線程數量不是越多越好。

 這樣咱們明白了,客戶端註冊的時候是分配了一個線程給它。客戶端並不須要多線程,可是仍是繼續看後面的內容:AbstractUnsafe的register方法給出了相關解答。channel持有了該EventLoop,此時線程仍是未運行狀態,只是有了這麼一個對象而已。

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }

 這裏execute方法,這個是以前咱們講過的一個方法,其會判斷當前線程是不是該channel的線程,目前都沒有初始化線程確定不是,將任務放入任務隊列,開啓一個線程(線程處於未啓動狀態纔會開啓,不然不執行),這個設計使得execute的時候只會開啓一次線程,而全部的任務都會被放入任務隊列,由這個線程執行。再回到run方法,這個是channel線程執行的方法,目前每一個NioEventLoop都執行了Select等方法啊,這不是處理了全部的channel的工做嗎?並無達到一個channel生命週期控制在一個線程中啊。

 這裏其實是JAVA NIO的例子帶來的誤解,認爲必須一個線程來使用select,而後遍歷事件分配線程給channel執行讀寫操做。實際上在Netty中不同,Netty全部線程都在執行select並獲取相關事件,可是實際上其並無執行全部的事件。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
           
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }

 看這裏,if (eventLoop != this || eventLoop == null) ,若是channel的eventLoop不是當前的eventLoop就不執行,這個就一小段代碼,可是就直接決定了EventLoop只對本身所綁定的channel感興趣,最終達到了只處理本身相關的任務的目的。

3.6 NioEventLoopGroup

 該類沒有太多須要說明的內容,前面已經講解了不少。

  1.AbstractEventExecutorGroup,實現了基本的方法,全部方法都是調用next()即挑選出一個線程執行器完成的。

  2.MultithreadEventExecutorGroup,實現了一個基本的線程池,持有子線程,主要工做是初始化了子線程數組,提供了next方法。

  3.MultithreadEventLoopGroup,實現了Netty的channel線程池,提供了register方法,雖然也是調用next的register方法。

  4.NioEventLoopGroup,實現了建立子線程數組時newChild方法,全部的EventLoop都是這個方法建立的。

 group就是兩個重點,一個next()挑選事件執行器,一個newChild()建立線程執行器對象。

4.總結

 本節耗費了大量的篇幅講解了Netty的線程模型的設計思路,主要看點以下:

  1.解釋了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup這四者之間的關係,和這麼複雜混亂的繼承關係的緣由。

  2.解釋了group是如何初始化線程,並綁定channel的,next().register()

  3.解釋了eventloop爲何和channel綁定了,execute()開啓線程,以及每一個eventloop都在獲取IO事件,可是經過channel的eventloop是否等於當前過濾掉其它的事件,只處理本身綁定的channel事件。

 因爲每一個線程都在獲取IO事件,因此這段邏輯變的很是複雜,這也就是我以前說的寫好很IO很困難。

 最後附上一張對前面全部內容總結的一個圖,清醒一下頭腦,從複雜的代碼中脫身:

 這個圖就是一個基本的執行過程圖了,可能有遺漏的地方,可是大致狀況如圖所示。

相關文章
相關標籤/搜索