線程池之ThreadPoolExecutor線程池源碼分析筆記

1.線程池的做用

一方面當執行大量異步任務時候線程池可以提供較好的性能,在不使用線程池的時候,每當須要執行異步任務時候是直接 new 一線程進行運行,而線程的建立和銷燬是須要開銷的。使用線程池時候,線程池裏面的線程是可複用的,不會每次執行異步任務時候都從新建立和銷燬線程。java

另外一方面線程池提供了一種資源限制和管理的手段,好比能夠限制線程的個數,動態新增線程等,每一個 ThreadPoolExecutor 也保留了一些基本的統計數據,好比當前線程池完成的任務數目等。數組

 

 2.ThreadPoolExecutor 原理探究

類圖以下:

 

 

如上類圖,Executors 實際上是個工具類,裏面提供了好多靜態方法,根據用戶選擇返回不一樣的線程池實例。安全

ThreadPoolExecutor 繼承了 AbstractExecutorService,成員變量 ctl 是個 Integer 的原子變量用來記錄線程池狀態 和 線程池中線程個數,相似於 ReentrantReadWriteLock 使用一個變量存放兩種信息。多線程

這裏假設 Integer 類型是 32 位二進制標示,則其中高 3 位用來表示線程池狀態,後面 29 位用來記錄線程池線程個數。併發

//用來標記線程池狀態(高3位),線程個數(低29位)
//默認是RUNNING狀態,線程個數爲0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//線程個數掩碼位數,並非全部平臺int類型是32位,因此準確說是具體平臺下Integer的二進制位數-3後的剩餘位數纔是線程的個數,
private static final int COUNT_BITS = Integer.SIZE - 3;

//線程最大個數(低29位)00011111111111111111111111111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

線程池狀態:

//(高3位):11100000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;

//(高3位):00000000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

//(高3位):00100000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;

//(高3位):01000000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;

//(高3位):01100000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;
// 獲取高三位 運行狀態
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//獲取低29位 線程個數
private static int workerCountOf(int c)  { return c & CAPACITY; }

//計算ctl新值,線程狀態 與 線程個數
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池狀態含義:

  • RUNNING:接受新任務而且處理阻塞隊列裏的任務;異步

  • SHUTDOWN:拒絕新任務可是處理阻塞隊列裏的任務;ide

  • STOP:拒絕新任務而且拋棄阻塞隊列裏的任務,同時會中斷正在處理的任務;函數

  • TIDYING:全部任務都執行完(包含阻塞隊列裏面任務)當前線程池活動線程爲 0,將要調用 terminated 方法;工具

  • TERMINATED:終止狀態,terminated方法調用完成之後的狀態。源碼分析

線程池狀態轉換:

       1.RUNNING -> SHUTDOWN:顯式調用 shutdown() 方法,或者隱式調用了 finalize(),它裏面調用了 shutdown() 方法。

       2.RUNNING or SHUTDOWN -> STOP:顯式調用 shutdownNow() 方法時候。

       3.SHUTDOWN -> TIDYING:當線程池和任務隊列都爲空的時候。

       4.STOP -> TIDYING:當線程池爲空的時候。

       5.TIDYING -> TERMINATED:當 terminated() hook 方法執行完成時候。

 

線程池參數:

  • corePoolSize:線程池核心線程個數;

  • workQueue:用於保存等待執行的任務的阻塞隊列;好比基於數組的有界 ArrayBlockingQueue,基於鏈表的無界 LinkedBlockingQueue,最多隻有一個元素的同步隊列 SynchronousQueue,優先級隊列 PriorityBlockingQueue 等。

  • maximunPoolSize:線程池最大線程數量。

  • ThreadFactory:建立線程的工廠。

  • RejectedExecutionHandler:飽和策略,當隊列滿了而且線程個數達到 maximunPoolSize 後採起的策略,好比 AbortPolicy (拋出異常),CallerRunsPolicy(使用調用者所在線程來運行任務),DiscardOldestPolicy(調用 poll 丟棄一個任務,執行當前任務),DiscardPolicy(默默丟棄,不拋出異常)。

  • keeyAliveTime:存活時間。若是當前線程池中的線程數量比核心線程數量要多,而且是閒置狀態的話,這些閒置的線程能存活的最大時間。

  • TimeUnit,存活時間的時間單位。

線程池類型:

      1.newFixedThreadPool:建立一個核心線程個數和最大線程個數都爲 nThreads 的線程池,而且阻塞隊列長度爲 Integer.MAX_VALUEkeeyAliveTime=0 說明只要線程個數比核心線程個數多而且當前空閒則回收。代碼以下:

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }
 //使用自定義線程建立工廠
 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
 }

 

2.newSingleThreadExecutor:建立一個核心線程個數和最大線程個數都爲1的線程池,而且阻塞隊列長度爲 Integer.MAX_VALUEkeeyAliveTime=0 說明只要線程個數比核心線程個數多而且當前空閒則回收。代碼以下:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    //使用本身的線程工廠
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

 

3.newCachedThreadPool:建立一個按需建立線程的線程池,初始線程個數爲 0,最多線程個數爲 Integer.MAX_VALUE,而且阻塞隊列爲同步隊列,keeyAliveTime=60 說明只要當前線程 60s 內空閒則回收。這個特殊在於加入到同步隊列的任務會被立刻被執行,同步隊列裏面最多隻有一個任務。代碼以下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    //使用自定義的線程工廠
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

 

如類圖,其中 mainLock 是獨佔鎖,用來控制新增 Worker 線程時候的原子性,termination 是該鎖對應的條件隊列,在線程調用 awaitTermination 時候用來存放阻塞的線程。

Worker 繼承 AQS 和 Runnable 接口,是具體承載任務的對象,Worker 繼承了 AQS,本身實現了簡單不可重入獨佔鎖,其中 status=0 標示鎖未被獲取狀態,state=1 標示鎖已經被獲取的狀態,state=-1 是建立 Worker 時候默認的狀態,建立時候狀態設置爲 -1 是爲了不在該線程在運行 runWorker() 方法前被中斷,下面會具體講解到。其中變量 firstTask 記錄該工做線程執行的第一個任務,thread 是具體執行任務的線程。

DefaultThreadFactory 是線程工廠,newThread 方法是對線程的一個修飾,其中 poolNumber 是個靜態的原子變量,用來統計線程工廠的個數,threadNumber 用來記錄每一個線程工廠建立了多少線程,這兩個值也做爲線程池和線程的名稱的一部分。

 

3.源碼分析

1 public void execute(Runnable command):execute 方法是提交任務 command 到線程池進行執行,用戶線程提交任務到線程池的模型圖以下所示:

如上圖可知 ThreadPoolExecutor 的實現實際是一個生產消費模型,其中當用戶添加任務到線程池時候至關於生產者生產元素,workers 線程工做集中的線程直接執行任務或者從任務隊列裏面獲取任務至關於消費者消費元素。用戶線程提交任務的 execute 方法具體代碼以下:

public void execute(Runnable command) {

    //(1) 若是任務爲null,則拋出NPE異常
    if (command == null)
        throw new NullPointerException();

    //(2)獲取當前線程池的狀態+線程個數變量的組合值
    int c = ctl.get();

    //(3)當前線程池線程個數是否小於corePoolSize,小於則開啓新線程運行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    //(4)若是線程池處於RUNNING狀態,則添加任務到阻塞隊列
    if (isRunning(c) && workQueue.offer(command)) {

        //(4.1)二次檢查
        int recheck = ctl.get();
        //(4.2)若是當前線程池狀態不是RUNNING則從隊列刪除任務,並執行拒絕策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

        //(4.3)否者若是當前線程池線程空,則添加一個線程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //(5)若是隊列滿了,則新增線程,新增失敗則執行拒絕策略
    else if (!addWorker(command, false))
        reject(command);
}

 

1.代碼(3)判斷若是當前線程池線程個數小於 corePoolSize,如上圖會在 workers 裏面新增一個核心線程(core 線程)執行該任務。

2.若是當前線程池線程個數大於等於 corePoolSize 執行代碼(4),若是當前線程池處於 RUNNING 狀態則添加當前任務到任務隊列,這裏須要判斷線程池狀態是由於有可能線程池已經處於非 RUNNING 狀態,而非 RUNNING 狀態下是拋棄新任務的。

3.若是任務添加任務隊列成功,則代碼(4.2)對線程池狀態進行二次校驗,這是由於添加任務到任務隊列後,執行代碼(4.2)前有可能線程池的狀態已經變化了,這裏進行二次校驗,若是當前線程池狀態不是 RUNNING 了則把任務從任務隊列移除,移除後執行拒絕策略;若是二次校驗經過,則執行代碼(4.3)從新判斷當前線程池裏面是否還有線程,若是沒有則新增一個線程。

4.若是代碼(4)添加任務失敗,則說明任務隊列滿了,則執行代碼(5)嘗試新開啓線程(如上圖 thread 3 和 thread 4)來執行該任務,若是當前線程池線程個數 > maximumPoolSize 則執行拒絕策略。

 

接下來看新增線程的 addWorkder 方法的源碼,以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //(6) 檢查隊列是否只在必要時爲空
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //(7)循環cas增長線程個數
        for (;;) {
            int wc = workerCountOf(c);

            //(7.1)若是線程個數超限則返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //(7.2)cas增長線程個數,同時只有一個線程成功
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //(7.3)cas失敗了,則看線程池狀態是否變化了,變化則跳到外層循環重試從新獲取線程池狀態,否者內層循環從新cas。
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    //(8)到這裏說明cas成功了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //(8.1)建立worker
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {

            //(8.2)加獨佔鎖,爲了workers同步,由於可能多個線程調用了線程池的execute方法。
            mainLock.lock();
            try {

                //(8.3)從新檢查線程池狀態,爲了不在獲取鎖前調用了shutdown接口
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //(8.4)添加任務
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //(8.5)添加成功則啓動任務
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

 

如上代碼主要分兩部分,第一部分的雙重循環目的是經過 cas 操做增長線程池線程數,第二部分主要是併發安全的把任務添加到 workers 裏面,而且啓動任務執行。

先看第一部分的代碼(6),以下所示:

rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty())

這樣看很差理解,咱們展開!運算符後,至關於:

s >= SHUTDOWN &&
                (rs != SHUTDOWN ||//(1)
              firstTask != null ||//(2)
              workQueue.isEmpty())//(3)

 

如上代碼,也就是說代碼(6)在下面幾種狀況下會返回 false:

    1.當前線程池狀態爲 STOP,TIDYING,TERMINATED;

    2.當前線程池狀態爲 SHUTDOWN 而且已經有了第一個任務;

    3.當前線程池狀態爲 SHUTDOWN 而且任務隊列爲空。

 

回到上面看新增線程的 addWorkder 方法,發現內層循環做用是使用 cas 增長線程,代碼(7.1)若是線程個數超限則返回 false,否者執行代碼(7.2)執行 CAS 操做設置線程個數,cas 成功則退出雙循環,CAS 失敗則執行代碼(7.3)看當前線程池的狀態是否變化了,若是變了,則從新進入外層循環從新獲取線程池狀態,否者進入內層循環繼續進行 cas 嘗試。

執行到第二部分的代碼(8)說明使用 CAS 成功的增長了線程個數,可是如今任務還沒開始執行,這裏使用全局的獨佔鎖來控制把新增的 Worker 添加到工做集 workers。代碼(8.1)建立了一個工做線程 Worker。

代碼(8.2)獲取了獨佔鎖,代碼(8.3)從新檢查線程池狀態,這是爲了不在獲取鎖前其餘線程調用了 shutdown 關閉了線程池,若是線程池已經被關閉,則釋放鎖,新增線程失敗,否者執行代碼(8.4)添加工做線程到線程工做集,而後釋放鎖,代碼(8.5)若是判斷若是工做線程新增成功,則啓動工做線程。

 

3.2 工做線程 Worker 的執行

當用戶線程提交任務到線程池後,具體是使用 worker 來執行的,先看下 Worker 的構造函數:

Worker(Runnable firstTask) {
    setState(-1); // 在調用runWorker前禁止中斷
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);//建立一個線程
}

 

如上代碼構造函數內首先設置 Worker 的狀態爲 -1,是爲了不當前 worker 在調用 runWorker 方法前被中斷(當其它線程調用了線程池的 shutdownNow 時候,若是 worker 狀態 >= 0 則會中斷該線程)。這裏設置了線程的狀態爲 -1,因此該線程就不會被中斷了。以下代碼運行 runWorker 的代碼(9)時候會調用 unlock 方法,該方法把 status 變爲了 0,因此這時候調用 shutdownNow 會中斷 worker 線程了。

 

接着咱們再看看runWorker方法,代碼以下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); //(9)status設置爲0,容許中斷
        boolean completedAbruptly = true;
        try {
           //(10)
            while (task != null || (task = getTask()) != null) {

                 //(10.1)
                w.lock();
               ...
                try {
                    //(10.2)任務執行前幹一些事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//(10.3)執行任務
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //(10.4)任務執行完畢後幹一些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //(10.5)統計當前worker完成了多少個任務
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {

            //(11)執行清工做
            processWorkerExit(w, completedAbruptly);
        }
    }

 

如上代碼(10)若是當前 task==null 或者調用 getTask 從任務隊列獲取的任務返回 null,則跳轉到代碼(11)執行。若是 task 不爲 null 則執行代碼(10.1)獲取工做線程內部持有的獨佔鎖,而後執行擴展接口代碼(10.2)在具體任務執行前作一些事情,代碼(10.3)具體執行任務,代碼(10.4)在任務執行完畢後作一些事情,代碼(10.5)統計當前 worker 完成了多少個任務,並釋放鎖。

這裏在執行具體任務期間加鎖,是爲了不任務運行期間,其餘線程調用了 shutdown 或者 shutdownNow 命令關閉了線程池。

 

其中代碼(11)執行清理任務,其代碼以下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
     ...代碼太長,這裏就不展現了

    //(11.1)統計整個線程池完成的任務個數,並從工做集裏面刪除當前woker
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //(11.2)嘗試設置線程池狀態爲TERMINATED,若是當前是shutdonw狀態而且工做隊列爲空
    //或者當前是stop狀態當前線程池裏面沒有活動線程
    tryTerminate();

    //(11.3)若是當前線程個數小於核心個數,則增長
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

 

如上代碼(11.1)統計線程池完成任務個數,可知在統計前加了全局鎖,把當前工做線程中完成的任務累加到全局計數器,而後從工做集中刪除當前 worker。

代碼(11.2)判斷若是當前線程池狀態是 shutdonw 狀態而且工做隊列爲空或者當前是 stop 狀態當前線程池裏面沒有活動線程則設置線程池狀態爲 TERMINATED,若是設置爲了 TERMINATED 狀態還須要調用條件變量 termination 的 signalAll() 方法激活全部由於調用線程池的 awaitTermination 方法而被阻塞的線程

代碼(11.3)則判斷當前線程裏面線程個數是否小於核心線程個數,若是是則新增一個線程。

 

3.3 shutdown 操做:調用 shutdown 後,線程池就不會在接受新的任務了,可是工做隊列裏面的任務仍是要執行的,該方法馬上返回的,並不等待隊列任務完成在返回。代碼以下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //(12)權限檢查
        checkShutdownAccess();

        //(13)設置當前線程池狀態爲SHUTDOWN,若是已是SHUTDOWN則直接返回
        advanceRunState(SHUTDOWN);

        //(14)設置中斷標誌
        interruptIdleWorkers();
        onShutdown(); 
    } finally {
        mainLock.unlock();
    }
    //(15)嘗試狀態變爲TERMINATED
    tryTerminate();
}

 

如上代碼(12)檢查若是設置了安全管理器,則看當前調用 shutdown 命令的線程是否有關閉線程的權限,若是有權限則還要看調用線程是否有中斷工做線程的權限,若是沒有權限則拋出 SecurityException 或者 NullPointerException 異常。

其中代碼(13)內容以下,若是當前狀態 >= SHUTDOWN 則直接返回,否者設置當前狀態爲 SHUTDOWN:

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

 

代碼(14)內容以下,設置全部空閒線程的中斷標誌,這裏首先加了全局鎖,同時只有一個線程能夠調用 shutdown 設置中斷標誌,而後嘗試獲取 worker 本身的鎖,獲取成功則設置中斷標識,因爲正在執行的任務已經獲取了鎖,因此正在執行的任務沒有被中斷。這裏中斷的是阻塞到 getTask() 方法,企圖從隊列裏面獲取任務的線程,也就是空閒線程。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //若是工做線程沒有被中斷,而且沒有正在運行則設置設置中斷
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

 

代碼(15)判斷若是當前線程池狀態是 shutdonw 狀態而且工做隊列爲空或者當前是 stop 狀態當前線程池裏面沒有活動線程則設置線程池狀態爲 TERMINATED,若是設置爲了 TERMINATED 狀態還須要調用條件變量 termination 的 signalAll()方法激活全部由於調用線程池的 awaitTermination 方法而被阻塞的線程

 

3.4 shutdownNow 操做

調用 shutdownNow 後,線程池就不會在接受新的任務了,而且丟棄工做隊列裏面裏面的任務,正在執行的任務會被中斷,該方法是馬上返回的,並不等待激活的任務執行完成在返回。返回值爲這時候隊列裏面被丟棄的任務列表。代碼以下:

public List<Runnable> shutdownNow() {


    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//(16)權限檢查
        advanceRunState(STOP);//(17) 設置線程池狀態爲stop
        interruptWorkers();//(18)中斷全部線程
        tasks = drainQueue();//(19)移動隊列任務到tasks
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

 

如上代碼首先調用代碼(16)檢查權限,而後調用代碼(17)設置當前線程池狀態爲 stop,而後執行代碼(18)中斷全部的工做線程,這裏須要注意的是中斷全部的線程,包含空閒線程和正在執行任務的線程,代碼以下:

   private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

而後代碼(19)移動當前任務隊列裏面任務到 tasks 列表。

3.4 awaitTermination 操做

當線程調用 awaitTermination 方法後,當前線程會被阻塞,知道線程池狀態變爲了 TERMINATED 才返回,或者等待時間超時才返回,整個過程獨佔鎖,代碼以下:

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

 

如上代碼首先獲取了獨佔鎖,而後無限循環內部首先判斷當前線程池狀態是否至少是 TERMINATED 狀態,若是是則直接返回。否者說明當前線程池裏面還有線程在執行,則看設置的超時時間 nanos 是否小於 0,小於 0 則說明不須要等待,則直接返回;若是大於0則調用條件變量 termination 的 awaitNanos 方法等待 nanos 時間,指望在這段時間內線程池狀態內變爲 TERMINATED 狀態。

在講解 shutdown 方法時候提到當線程池狀態變爲 TERMINATED 後,會調用 termination.signalAll() 用來激活調用條件變量 termination 的 await 系列方法被阻塞的全部線程,因此若是在調用了 awaitTermination 以後調用了 shutdown 方法,而且 shutdown 內部設置線程池狀態爲 TERMINATED 了,則 termination.awaitNanos 方法會返回。

另外在工做線程 Worker 的 runWorker 方法內當工做線程運行結束後,會調用 processWorkerExit 方法,processWorkerExit 方法內部也會調用 tryTerminate 方法測試當前是否應該把線程池設置爲 TERMINATED 狀態,若是是,則也會調用 termination.signalAll() 用來激活調用線程池的 awaitTermination 方法而被阻塞的線程

另外當等待時間超時後,termination.awaitNanos 也會返回,這時候會從新檢查當前線程池狀態是否爲 TERMINATED,若是是則直接返回,否者繼續阻塞掛起本身。

四、使用線程池須要注意的地方

4.1 建立線程池時候要指定與業務相關的名字,以便於追溯問題

平常開發中當一個應用中須要建立多個線程池時候最好給線程池根據業務類型設置具體的名字,以便在出現問題時候方便進行定位,下面就經過實例來講明不設置時候爲什麼難以定位問題,以及如何進行設置。

下面經過簡單的代碼來講明不指定線程池名稱爲什麼難定位問題,代碼以下:

package com.hjc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by cong on 2019/5/26.
 */
public class ThreadPoolExecutorTest {
    static ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
    static ThreadPoolExecutor executorTwo = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());

    public static void main(String[] args) {

        //接受用戶連接模塊
        executorOne.execute(new  Runnable() {
            public void run() {
                System.out.println("接受用戶連接線程");
                throw new NullPointerException();
            }
        });
        //具體處理用戶請求模塊
        executorTwo.execute(new  Runnable() {
            public void run() {
                System.out.println("具體處理業務請求線程");
            }
        });

        executorOne.shutdown();
        executorTwo.shutdown();
    }
}

運行代碼輸出以下結果:

 

同理咱們並不知道是那個模塊的線程池拋出了這個異常,那麼咱們看下這個 pool-1-thread-1 是如何來的。實際上是使用了線程池默認的 ThreadFactory,翻看線程池建立的源碼以下:

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

   public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
   }

static class DefaultThreadFactory implements ThreadFactory {
        //(1)
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        //(2)
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        //(3)
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
           //(4)
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

 

如上代碼 DefaultThreadFactory 的實現可知:

     1.代碼(1)poolNumber 是 static 的原子變量用來記錄當前線程池的編號,它是應用級別的,全部線程池公用一個,好比建立第一個線程池時候線程池編號爲1,建立第二個線程池時候線程池的編號爲2,這裏 pool-1-thread-1 裏面的 pool-1 中的 1 就是這個值。

     2.代碼(2)threadNumber 是線程池級別的,每一個線程池有一個該變量用來記錄該線程池中線程的編號,這裏 pool-1-thread-1 裏面的 thread - 1 中的 1 就是這個值。

     3.代碼(3)namePrefix是線程池中線程的前綴,默認固定爲pool。

     4.代碼(4)具體建立線程,可知線程的名稱使用 namePrefix + threadNumber.getAndIncrement() 拼接的。

 

從上知道咱們只需對 DefaultThreadFactory 的代碼中 namePrefix 的初始化作手腳,當須要建立線程池是傳入與業務相關的 namePrefix 名稱就能夠了,代碼以下:

package com.hjc;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by cong on 2019/5/26.
 */
// 命名線程工廠
public class HjcThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    HjcThreadFactory(String name) {

        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        if (null == name || name.isEmpty()) {
            name = "pool";
        }

        namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

 

而後建立線程池時候以下:

package com.hjc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by cong on 2019/5/26.
 */
public class ThreadPoolExecutorTest {
    static ThreadPoolExecutor executorOne = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),new HjcThreadFactory("ASYN-ACCEPT-POOL"));
    static ThreadPoolExecutor executorTwo = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),new HjcThreadFactory("ASYN-PROCESS-POOL"));

    public static void main(String[] args) {

        //接受用戶連接模塊
        executorOne.execute(new  Runnable() {
            public void run() {
                System.out.println("接受用戶連接線程");
                throw new NullPointerException();
            }
        });
        //具體處理用戶請求模塊
        executorTwo.execute(new  Runnable() {
            public void run() {
                System.out.println("具體處理業務請求線程");
            }
        });

        executorOne.shutdown();
        executorTwo.shutdown();
    }
}

 

 而後運行執行結果以下:

 

 從運行結果拋出的異常,能夠看到從 ASYN-ACCEPT-POOL-1-thread-1 就能夠知道是接受連接線程池拋出的異常。

 

4.4 線程池中使用 ThreadLocal 致使的內存泄露

下面先看線程池中使用 ThreadLocal 的例子:

package com.hjc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Created by cong on 2019/5/26.
 */
public class ThreadPoolTest {

    static class LocalVariable {
        private Long[] a = new Long[1024 * 1024];
    }

    // (1)
    final static ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 5, 1, TimeUnit.MINUTES,
            new LinkedBlockingQueue<>());
    // (2)
    final static ThreadLocal<LocalVariable> localVariable = new ThreadLocal<LocalVariable>();

    public static void main(String[] args) throws InterruptedException {
        // (3)
        for (int i = 0; i < 50; ++i) {
            poolExecutor.execute(new Runnable() {
                public void run() {
                    // (4)
                    localVariable.set(new LocalVariable());
                    // (5)
                    System.out.println("use local varaible");
                    //localVariable.remove();

                }
            });

            Thread.sleep(1000);
        }
        // (6)
        System.out.println("pool execute over");
    }
}

 

代碼(1)建立了一個核心線程數和最大線程數爲 5 的線程池,這個保證了線程池裏面隨時都有 5 個線程在運行。

代碼(2)建立了一個 ThreadLocal 的變量,泛型參數爲 LocalVariable,LocalVariable 內部是一個 Long 數組。

代碼(3)向線程池裏面放入 50 個任務

代碼(4)設置當前線程的 localVariable 變量,也就是把 new 的 LocalVariable 變量放入當前線程的 threadLocals 變量。

因爲沒有調用線程池的 shutdown 或者 shutdownNow 方法因此線程池裏面的用戶線程不會退出,進而 JVM 進程也不會退出。

 

運行當前代碼,使用 jconsole 監控堆內存變化以下圖:

 

而後解開 localVariable.remove() 註釋,而後在運行,觀察堆內存變化以下:

 

從運行結果一可知,當主線程處於休眠時候進程佔用了大概 77M 內存,運行結果二則佔用了大概 25M 內存,可知運行代碼一時候內存發生了泄露,下面分析下泄露的緣由。

運行結果一的代碼,在設置線程的 localVariable 變量後沒有調用 localVariable.remove()方法,致使線程池裏面的 5 個線程的 threadLocals 變量裏面的 new LocalVariable() 實例沒有被釋放,雖然線程池裏面的任務執行完畢了,可是線程池裏面的 5 個線程會一直存在直到 JVM 進程被殺死。

這裏須要注意的是因爲 localVariable 被聲明瞭 static,雖然線程的 ThreadLocalMap 裏面是對localVariable的弱引用,localVariable也不會被回收。

運行結果二的代碼因爲線程在設置 localVariable 變量後及時調用了 localVariable.remove() 方法進行了清理,因此不會存在內存泄露。

總結:線程池裏面設置了 ThreadLocal 變量必定要記得及時清理,由於線程池裏面的核心線程是一直存在的,若是不清理,那麼線程池的核心線程的 threadLocals 變量一直會持有 ThreadLocal 變量。

相關文章
相關標籤/搜索