併發基礎(二) ThreadPoolExecutor

概述:html

ThreadPoolExecutor -> AbstractExecutorService =>ExecutorService =>Executor.java

ThreadPoolExecutor 解決了兩個不一樣的問題:1,經過減小任務調用開銷來改善執行大量異步任務時的性能問題;2,提供資源約束與管理,包括線程。另外還有一些統計功能,好比已完成任務個數等。android


爲了使用於不一樣的上下文環境,ThreadPoolExecutor提供了不少可調整的參數和鉤子方法, 可是官方仍然推薦編程者使用更爲便捷的 Executors 的工廠方法,這些方法根據不一樣場景進行了配置,能夠知足大部分需求:web

 newCachedThreadPool() (無大小限制線程池,線程自動回收), 編程

newFixedThreadPool(int) (線程池大小固定) 緩存

以及 newSingleThreadExecutor() (單個後臺線程). 服務器


 當你須要直接使用ThreadPoolExecutor 時, 可參考一下指南:併發


  • Core and maximum pool sizes異步

     ThreadPoolExecutor 能夠根據corePoolSize和maximumPoolSize自動調整線程池的大小(分別經過 getPoolSize() 和 getCorePoolSize()進行查看) . 當有新的任務提交時會調用execute(Runnable), 若是當前正在運行的線程數小於corePoolSize,新的線程會被建立來執行新提交的任務,即便當前有空閒線程。若是當前運行的線程數大於corePoolSize可是小於maximumPoolSize, 同時任務隊列(queue)已經滿了的話,一個新的線程會被建立來執行任務. 設置corePoolSize 和 maximumPoolSize 相等時, 至關於建立了一個固定大小的線程池. 若是設置 maximumPoolSize 爲一個無限值好比Integer.MAX_VALUE, 那麼該池中將容許有任意數量的併發任務. 通常狀況下 corePoolSize和  maximumPoolSize 在構造時傳入, 不過也能夠經過 setCorePoolSize(int) 和 setMaximumPoolSize(int)來動態設定.oop

  • On-demand construction

    默認狀況下核心線程(core threads)只在有新任務到達時纔會初始化和啓動, 不過能夠經過覆寫 prestartCoreThread() 或者 prestartAllCoreThreads()方法來改變這種行爲.

  • Creating new threads

    默認經過 ThreadFactory建立新的線程. 它們將有相同的 ThreadGroup ,具備 NORM_PRIORITY優先級而且是非守護線程. 你能夠提供不一樣的ThreadFactory 來改變線程的名字、組別、優先級、守護狀態等. 若是ThreadFactory 建立線程失敗,該executor 將繼續運行,可是不能再執行任何任務.

  • Keep-alive times

    若是當前線程池中線程的數量已經多於corePoolSize, 那麼額外的線程若是處於idle狀態超過期間 keepAlive(查看 getKeepAliveTime(TimeUnit))將會被強行終止. 這樣當線程池的使用比較緊張時能夠有效地減小資源消耗.當線程池再次緊張時,新的(額外)線程還會被建立. keepAlive時間能夠經過 setKeepAliveTime(long, TimeUnit)方法來設置 .使用Long.MAX_VALUE NANOSECONDS  可使處於idle狀態的線程不被終止,直到ThreadPoolExecutor被關閉 (shut down). 默認狀況下, keep-alive 策略只在當前線程數量多於 corePoolSize 時起做用.可是能夠經過方法 allowCoreThreadTimeOut(boolean) 設置是否將這樣的超時策略同時應用於core Threads, 前提是 keepAliveTime 是非零的.

  • Queuing

    任何 BlockingQueue 均可以用來調度和存儲提交的任務. 任務隊列的使用和線程池大小的改變有依賴關係:

    • 若是當前線程數量小於coreThreadSize, Executor 會優先選擇建立新線程來執行任務,而不是將任務存入隊列.

    • 若是有多於或等於corePoolSize個線程在運行, Executor 會將新的任務請求入隊,而不是建立新的線程.

    • 若是任務請求不能入隊,新線程會被建立,可是要求線程數量不超過 maximumPoolSize, 不然任務請求會被拒絕(rejected).


    隊列有三種普遍使用的策略:

    1. 直接傳遞(Direct handoffs). 一個不錯的默認選擇是使用同步隊列(SynchronousQueue), 將任務直接交給線程處理而不是存儲起來. 若是沒有線程及時處理,任務也不能被成功存儲,因此新的線程會被建立. 這種方法在處理一組有互相依賴的任務時避免了繁複的查找. 直接傳遞策略通常要求一個無界的線程池(unbounded maximumPoolSizes ),這樣才能避免提交的任務被拒絕. 這種作法存在一個缺點,當任務到達速度超過線程對任務的處理速度是,線程數量會無限制的增加.

    2. 無界隊列(Unbounded queues). 若是當前的corePoolSize個線程都處於忙狀態時,使用無界隊列 (好比 LinkedBlockingQueue ,不預先設置容量) 會把新的任務緩存在隊列裏. 這樣executor裏不會有多於corePoolSize 個線程. ( 這樣maximumPoolSize 就再也不起做用了.) 無界隊列策略適用於任務相互獨立的狀況,各個任務不會影響彼此的執行, 好比 web page 服務器處理頁面請求. 雖然這種方法可使爆發式請求獲得平滑的處理,可是當任務處理速度較慢時,可能形成任務隊列的無限制增加.

    3. 有界對類(Bounded queues). 有界隊列(好比 ArrayBlockingQueue) 是直接傳遞和無界隊列的一個平衡,經過設置一個有限的maximumPoolSizes值,能夠防止資源的過分消耗, 可是這種方式更難調整和控制. 任務隊列大小和最大線程池大小須要互相折中: 使用較大的隊列和較小的線程池能夠減小CPU、系統資源 以及上下文切換開銷, 可是會影響吞吐量. 當任務頻繁阻塞時 (好比等待I/O),這種作法會對系統的調度形成負面影響 . 若是使用較小的任務隊列就要求有較大的線程池, 這樣可使CPU充分利用,可是會引起過分的調度開銷,從而也會映像吞吐量.

  • Rejected tasks

    若是executor已經關閉,或者executor使用了有界的線程池和有界的任務隊列而且都已飽和,那麼經過方法 execute(Runnable) 提交的新任務會被拒絕。 不管哪一種狀況, execute 方法都會調用 RejectedExecutionHandler rejectedExecution(Runnable, ThreadPoolExecutor) 方法. RejectedExecutionHandler有四中預設的處理:

    1. 默認使用 ThreadPoolExecutor.AbortPolicy, 當有任務被拒絕時會拋出運行時異常: RejectedExecutionException .

    2. ThreadPoolExecutor.CallerRunsPolicy, 這種策略在調用 execute 方法的線程(即提交任務的線程)中運行任務 . 這種策略提供了一個簡單的返回機制來使下降新任務的提交頻率。

    3. ThreadPoolExecutor.DiscardPolicy, 這種策略只是簡單的丟棄新任務,不作任何反饋處理.

    4. ThreadPoolExecutor.DiscardOldestPolicy, 若是 executor 還沒有關閉, 任務隊列首部的任務會被丟棄, 而後重試提交任務 (若是再次失敗,還要重複這一過程.)

    定義並使用其餘的處理策略也是能夠的. 可是要格外注意特定的工做場景下的隊列容量和隊列管理策略。

  • Hook methods

    ThreadPoolExecutor提供了可覆寫的beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable)方法, 這些方法在每次執行任務前、後分別調用,可用來操做執行環境,好比從新初始化線程本地變量(ThreadLocals),收集統計信息或者添加日誌項 . 另外,還能夠覆寫 terminated() 方法在Executor徹底終止前作一些特殊的處理工做.

    若是鉤子方法或回調方法拋出異常,內部的工做線程也可能會失敗並異常終止。

  • Queue maintenance

    方法 getQueue() 提供了訪問queue的途徑,從而對queue進行監視和調試,不推薦用於其餘目的. 當有大量入隊的任務被取消時,另外兩個方法, remove(Runnable) and purge() 可用於存儲資源的回收.

  • Finalization

    線程池再也不被程序引用 而且 沒有任何剩餘的線程在運行時,線程池會被自動關閉 (shutdown ). 若是你想確保未被引用的線程池被回收, 即便用戶忘記調用 shutdown(), 那麼你必須使未被使用的線程最可以終止, 能夠經過設置適當的 keep-alive 時間, 使用下限爲0的核心線程數而且/或者  設置allowCoreThreadTimeOut(boolean)來實現.




下面看一下提交一個任務到執行的具體過程:


/*

          * Proceed in 3 steps:
          *
          * 1. If fewer than corePoolSize threads are running, try to
          * start a new thread with the given command as its first
          * task.  The call to addWorker atomically checks runState and
          * workerCount, and so prevents false alarms that would add
          * threads when it shouldn't, by returning false.
          *
          * 2. If a task can be successfully queued, then we still need
          * to double-check whether we should have added a thread
          * (because existing ones died since last checking) or that
          * the pool shut down since entry into this method. So we
          * recheck state and if necessary roll back the enqueuing if
          * stopped, or start a new thread if there are none.
          *
          * 3. If we cannot queue task, then we try to add a new
          * thread.  If it fails, we know we are shut down or saturated
          * and so reject the task.
          */
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        //scene 1
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // scene 2
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // scene 3
        else if (!addWorker(command, false))
            reject(command);
    }

從註釋裏能夠看到任務被添加時有三種狀況,scene 1 和 scene 3 都容易理解, 咱們看第二種, addWorker(null, false):


addworker, 這裏的firstTask 是null, 可是worker裏的thread仍是啓動了, 爲何?

 private boolean addWorker(Runnable firstTask, boolean core) {
         
       ...
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    ...
                        workers.add(w);
                        ...
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker也是一個runnable, 這裏的線程啓動運行的是Worker的run方法,裏面又調用了runWorker 方法:

咱們看下 worker的構造:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        ...
        
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        ...
    }


下面的runWorker方法中注意while loop, 當前的worker若是有task就執行,不然調用getTask 從workQueue中取一個來執行,執行完會繼續去取, 這裏就體現了「線程池」的概念, 並非一個任務一個線程,而是在線程數量達到限定的數量,同時任務數比較多被放入緩存隊列的時候, 一個線程有可能執行多個任務。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                ...
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        
                        ...
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


看一下getTask方法:

這裏有一個無限的for loop, 直到從workQueue取到一個任務,或者等待超時,或者當前的executor被關閉

 private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


從上面的過程能夠看出, 經過addWorker(null, false /true)提交的任務雖然是null, 可是仍然會啓動一個線程去workQueue中等待或取得一個任務。

相關文章
相關標籤/搜索