ThreadPoolExecutor參數解析

ThreadPoolExecutor是一個很是重要的類,用來構建帶有線程池的任務執行器,經過配置不一樣的參數來構造具備不一樣規格線程池的任務執行器html

寫在前面的是: java

線程池任務執行器,線程池的定義比較直接,能夠看作多個線程的集合。而任務執行器的概念比較的具備針對性,它用來執行任務,經過對線程池的管理實現多任務的併發,是線程池的載體。 api

線程和任務的區別,線程不是任務,線程是用來執行任務的。 併發

隊列是用來存聽任務的,不是用來存放線程的。 oracle

(37JUW4~P@]KZ]FYI@8@8`B

主要的幾個參數解析: oop

  • 核心線程數(core pool sizes)和最大線程數(maxmum  pool sizes)

一開始二者的存在很讓人摸不着頭腦,簡單的想法是用一個線程數(pool size)表示線程池的大小不就完了嗎,不到規定的線程數就建立新的線程來執行新的任務,到了規定的線程數就等待其餘線程處理完成,怎麼還出現兩個控制線程數的參數? ui

那這兩個參數是什麼意思幹什麼用的? this

核心線程數這個數與上面那個簡單想法中的數有一個共同點,就是若是當前線程數達不到核心線程數時,不會使用已有的空閒的線程(若是有的話),來了新任務就會建立新的線程spa

若是當前線程數達到核心線程數,並且沒有空閒線程,那麼來了新任務是否要建立新的線程呢?這取決於兩點: .net

  1. 當前的任務隊列是否已滿。
  2. 線程池的最大線程數。

經過這個問題能夠引出最大線程數的概念

最大線程數 : 最大線程數是和任務隊列匹配使用的,確切的說是和有長度限制的任務隊列(即有界任務隊列)匹配使用的。

補充回答上面的問題,ThreadPoolExecutor的線程池擁有一個任務隊列,這個任務隊列只有在當前線程數>核心線程數的時候纔開始使用,若是該線程池使用的任務隊列是有界隊列,好比10,那麼當該隊列被新任務填滿時也就是說隊列中有10個新任務時ThreadPoolExecutor纔會建立一個新的線程來執行隊列中的一個任務,若是再發生隊列被填滿,並且依舊沒有空閒線程時ThreadPoolExecutor再次建立新的線程,一旦線程的數量等於最大線程數就再也不建立新的線程了,若是此時隊列中還有10個任務,那麼新來的任務就會被拒絕(reject)。

上述是針對有界隊列,若是這個任務執行器的隊列是無界隊列呢?

因爲無界隊列不會被填滿,因此永遠不能達到建立新線程所須要的條件,因此也就不會有新線程被建立,因此最大線程數在這種狀況下也就失去了其存在的意義。

  • 線程空閒存活時間(keepAliveTime)

在介紹上面的核心線程數和最大線程數時有提到空閒的線程,所謂空閒的線程就是執行完任務以後閒着的線程。

超過這個時間會使得那麼核心線程以外的空閒線程被殺死,若是想把這個時間也做用在覈心線程上須要設置allowCoreThreadTimeOut(boolean)爲true

這裏有必要說一下的是,任務執行器如何實現線程的重複利用,當任務執行器執行execute(task)的時候會建立一個worker,它是一個Runnable類,能夠看作task的載體,worker包含一個thread對象,這個thread啓動的時候執行worker自己的run方法,這樣worker和線程就融爲一體。當worker的thread start的時候,就會執行worker的run方法,而worker的run會調用任務執行器的runWorker(worker),並將自身傳遞過去,意思是任務執行器啓動了一個worker,而線程重複利用關鍵就在runWorker中,在啓動了一個worker後,worker會從任務執行器中尋找能夠運行的任務,而一開始建立worker使用的task就是它的第一個任務。

 

下面是jdk1.7的源碼


//執行一個任務 task

public void execute(Runnable command) { 
        if (command == null) 
            throw new NullPointerException(); 
        int c = ctl.get(); 
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true)) // 將task裝配到一個worker中 
                return; 
            c = ctl.get(); 
        } 
        if (isRunning(c) && workQueue.offer(command)) { 
            int recheck = ctl.get(); 
            if (! isRunning(recheck) && remove(command)) 
                reject(command); 
            else if (workerCountOf(recheck) == 0) 
                addWorker(null, false); 
        } 
        else if (!addWorker(command, false)) 
            reject(command); 
    }




//添加一個worker

private boolean addWorker(Runnable firstTask, boolean core) { 
        retry: 
        for (;;) { 
            int c = ctl.get(); 
            int rs = runStateOf(c);

            // Check if queue empty only if necessary. 
            if (rs >= SHUTDOWN && 
                ! (rs == SHUTDOWN && 
                   firstTask == null && 
                   ! workQueue.isEmpty())) 
                return false;

            for (;;) { 
                int wc = workerCountOf(c); 
                if (wc >= CAPACITY || 
                    wc >= (core ? corePoolSize : maximumPoolSize)) 
                    return false; 
                if (compareAndIncrementWorkerCount(c)) 
                    break retry; 
                c = ctl.get();  // Re-read ctl 
                if (runStateOf(c) != rs) 
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop 
            } 
        }

        boolean workerStarted = false; 
        boolean workerAdded = false; 
        Worker w = null; 
        try { 
            final ReentrantLock mainLock = this.mainLock; 
            w = new Worker(firstTask); 
            final Thread t = w.thread; 
            if (t != null) { 
                mainLock.lock(); 
                try { 
                    // Recheck while holding lock. 
                    // Back out on ThreadFactory failure or if 
                    // shut down before lock acquired. 
                    int c = ctl.get(); 
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN || 
                        (rs == SHUTDOWN && firstTask == null)) { 
                        if (t.isAlive()) // precheck that t is startable 
                            throw new IllegalThreadStateException(); 
                        workers.add(w); 
                        int s = workers.size(); 
                        if (s > largestPoolSize) 
                            largestPoolSize = s; 
                        workerAdded = true; 
                    } 
                } finally { 
                    mainLock.unlock(); 
                } 
                if (workerAdded) {//若是worker建立成功,就啓動它的對應的thread 
                    t.start(); //worker中的tread啓動 
                    workerStarted = true; 
                } 
            } 
        } finally { 
            if (! workerStarted) 
                addWorkerFailed(w); 
        } 
        return workerStarted; 
    }



//啓動這個worker

final void runWorker(Worker w) { 
        Thread wt = Thread.currentThread(); 
        Runnable task = w.firstTask; 
        w.firstTask = null; 
        w.unlock(); // allow interrupts 
        boolean completedAbruptly = true; 
        try { 
            while (task != null || (task = getTask()) != null) {//這裏是關鍵,使用一個while來尋找任務執行器中(主要仍是從任務隊列中獲取)還未執行的task。 
                w.lock(); 
                // If pool is stopping, ensure thread is interrupted; 
                // if not, ensure thread is not interrupted.  This 
                // requires a recheck in second case to deal with 
                // shutdownNow race while clearing interrupt 
                if ((runStateAtLeast(ctl.get(), STOP) || 
                     (Thread.interrupted() && 
                      runStateAtLeast(ctl.get(), STOP))) && 
                    !wt.isInterrupted()) 
                    wt.interrupt(); 
                try { 
                    beforeExecute(wt, task); 
                    Throwable thrown = null; 
                    try { 
                        task.run(); 
                    } catch (RuntimeException x) { 
                        thrown = x; throw x; 
                    } catch (Error x) { 
                        thrown = x; throw x; 
                    } catch (Throwable x) { 
                        thrown = x; throw new Error(x); 
                    } finally { 
                        afterExecute(task, thrown); 
                    } 
                } finally { 
                    task = null; 
                    w.completedTasks++; 
                    w.unlock(); 
                } 
            } 
            completedAbruptly = false; 
        } finally { 
            processWorkerExit(w, completedAbruptly); 
        } 
    }
相關文章
相關標籤/搜索