完全弄懂 Java 線程池原理

概述

這篇文章是我在閱讀源碼時整理的一些筆記,對源碼的關鍵點進行了比較詳細的註釋,而後加上一些本身對線程池機制的理解。最終目的是要弄清楚下面這些問題:安全

  • 線程池有 execute() 和 submit() 方法,執行機制分別是什麼?
  • 如何新建線程?
  • 任務如何執行?
  • 線程如何銷燬?超時機制如何實現?

首先須要介紹一下線程池的兩個重要成員:bash

ctl

AtomicInteger 類型。高3位存儲線程池狀態,低29位存儲當前線程數量。workerCountOf(c) 返回當前線程數量。runStateOf(c) 返回當前線程池狀態。 線程池有以下狀態:測試

  • RUNNING:接收新任務,處理隊列任務。
  • SHUTDOWN:不接收新任務,但處理隊列任務。
  • STOP:不接收新任務,也不處理隊列任務,而且中斷全部處理中的任務。
  • TIDYING:全部任務都被終結,有效線程爲0。會觸發terminated()方法。
  • TERMINATED:當terminated()方法執行結束。

Worker

這個線程在線程池中的包裝類。一個 Worker 表明一個線程。線程池用一個 HashSet 管理這些線程。ui

須要注意的是,Worker 自己並不區分核心線程和非核心線程,核心線程只是概念模型上的叫法,特性是依靠對線程數量的判斷來實現的 Worker 特性以下:this

  • 繼承自 AQS,自己實現了一個最簡單的不公平的不可重入鎖。
  • 構造方法傳入 Runnable,表明第一個執行的任務,能夠爲空。構造方法中新建一個線程。
  • 實現了 Runnable 接口,在新建線程時傳入 this。所以線程啓動時,會執行 Worker 自己的 run 方法。
  • run 方法調用了 ThreadPoolExecutor 的 runWorker 方法,負責實際執行任務。

submit() 方法的執行機制

submit 返回一個 Future 對象,咱們能夠調用其 get 方法獲取任務執行的結果。代碼很簡單,就是將 Runnable 包裝成 FutureTask 而已。能夠看到,最終仍是調用 Execute 方法:spa

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
複製代碼

FutureTask 的代碼就不貼了,簡述一下原理:線程

  • FutureTask 實現了 RunnableFuture 接口,RunnableFuture 繼承自Runnable。執行任務時會調用 FutureTask 的 run 方法,run 方法中執行真正的任務代碼,執行完後調用 set 方法設置結果。
  • 若是任務執行完畢,get 方法會直接返回結果,若是沒有,get 方法會阻塞並等待結果。
  • set 方法中設置結果後會取消阻塞,使 get 方法返回結果。

execute() 方法的執行機制

這個機制你們應該都很熟了,再簡述一遍:code

  1. 工做線程數小於核心線程數時,直接新建核心線程執行任務;
  2. 大於核心線程數時,將任務添加進等待隊列;
  3. 隊列滿時,建立非核心線程執行任務;
  4. 工做線程數大於最大線程數時,拒絕任務

具體的代碼分析以下:對象

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //小於核心線程數
    if (addWorker(command, true)) //啓動核心線程並執行任務
        return;
    c = ctl.get(); //執行失敗時從新獲取值
}
if (isRunning(c) && workQueue.offer(command)) { //檢查運行狀態並將任務添加到隊列
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command)) //從新檢查,防止狀態有變化。若是有,移出隊列並拒絕任務
        reject(command);
    else if (workerCountOf(recheck) == 0) //若是線程數爲0,建立非核心線程,第一個參數爲空時會從隊列中取任務執行
        addWorker(null, false);
}
else if (!addWorker(command, false)) //添加到隊列失敗,說明隊列已滿,建立非核心線程執行任務
    reject(command); //執行失敗說明達到最大線程數,拒絕任務
複製代碼

新任務如何添加進隊列?

線程池使用 addWorker 方法新建線程,第一個參數表明要執行的任務,線程會將這個任務執行完畢後再從隊列取任務執行。第二參數是核心線程的標誌,它並非 Worker 自己的屬性,在這裏只用來判斷工做線程數量是否超標。繼承

這個方法能夠分紅兩部分,第一部分進行一些前置判斷,並使用循環 CAS 結構將線程數量加1。代碼以下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry: //這個語法不經常使用,用於給外層 for 循環命名。方便嵌套 for 循環中,breakcontinue 指定是外層仍是內層循環
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        // firstTask 不爲空表明這個方法用於添加任務,爲空表明新建線程。SHUTDOWN 狀態下不接受新任務,但處理隊列中的任務。這就是第二個判斷的邏輯。
        if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;
        
        // 使用循環 CAS 自旋,增長線程數量直到成功爲止
        for (;;) {
        int wc = workerCountOf(c);
        //判斷是否超過線程容量
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        //使用 CAS 將線程數量加1
        if (compareAndIncrementWorkerCount(c))
            break retry;
        //修改不成功說明線程數量有變化
        //從新判斷線程池狀態,有變化時跳到外層循環從新獲取線程池狀態
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        //到這裏說明狀態沒有變化,從新嘗試增長線程數量
        }
    }
    ... ...
}
複製代碼

第二部分負責新建並啓動線程,並將 Worker 添加至 Hashset 中。代碼很簡單,沒什麼好註釋的,用了 ReentrantLock 確保線程安全。

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s; //這個參數是測試用的,不用管它
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w); //添加失敗時移除 Worker 並將線程數量減 1
}
return workerStarted;
}
複製代碼

任務如何執行?

在 addWorker 方法中,線程會被啓動。新建線程時,Worker 將自身傳入,因此線程啓動後會執行 Worker 的 run 方法,這個方法調用了 ThreadPoolExecutor 的 runWorker 方法執行任務,runWorker 中會循環取任務執行,執行邏輯以下:

  • 若是 firstTask 不爲空,先執行 firstTask,執行完畢後置空;
  • firstTask 爲空後調用 getTask() 從隊列中取任務執行;
  • 一直執行到沒有任務後,退出 while 循環
  • 調用 processWorkerExit() 方法,將 Worker 移除出 HashSet,此時線程執行完畢,也再也不被引用,會自動銷燬。

具體代碼分析以下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    //task 爲咱們傳給 execute 的任務。task 爲空時從隊列中取任務執行
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            //這段邏輯很是繞。實際上它實現瞭如下邏輯:
            //1.若是線程池已中止且線程未中斷,條件成立,中斷線程
            //2.若是線程池未中止,線程爲中斷狀態,將線程狀態重置,並從新進行1的判斷
            //3.若是線程池未中止,線程不爲中斷狀態,條件不成立
            //Thread.interrupted() 會重置中斷狀態,保證
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            //beforeExecute 和 afterExecute 爲空方法,交給子類實現
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); //執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //執行到這裏時說明線程執行完畢,此方法將線程從 HashSet 中移出。線程終止且沒有引用,會被自動回收。
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

線程如何銷燬?超時機制如何實現?

在 runWorker 方法中 getTask 方法返回 null 以後會致使線程執行完畢,被移除出 HashSet,從而被系統銷燬。 線程的超時機制也是在這個方法實現的,藉助於 BlockingQueue 的 poll 和 take 方法。

  • poll 方法能夠設置一個超時時間,當隊列爲空時,在此時間內阻塞等待,超時後返回 null
  • take 方法在隊列爲空時直接拋出異常

超時機制實現原理以下:

  • 當 allowCoreThreadTimeOut 爲 true,全部線程都會超時,所有調用 poll 方法,傳入 keepAliveTime 參數。
  • 當 allowCoreThreadTimeOut 爲 false 時,若是工做線程數量大於核心線程數,將此線程看成非核心線程處理,調用 poll 方法
  • 當 allowCoreThreadTimeOut 爲 false 且工做線程數量小於等於核心線程數時,將此線程看成核心線程處理,調用 take 方法,隊列爲空時拋出異常,進入下一次循環。若是隊列一直爲空,核心線程會一直在此循環等待任務進行處理。

具體代碼以下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
    
        int wc = workerCountOf(c);
    
        // 容許核心線程超時或者線程數大於核心線程
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
        // timed && timedOut 這兩個參數結合起來控制超時機制
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
    
        try {
            // 隊列爲空時,poll 方法會阻塞等待,超過 keepAliveTime 時返回空值。take 方法會直接返回異常。
            // 當 allowCoreThreadTimeOut 爲 true 時,核心線程和非核心線程沒有區別,一概調用poll方法
            // 當 allowCoreThreadTimeOut 爲 false 時,線程數量超過核心線程數纔會進入超時機制,若是不超過,則將當前線程看成核心線程處理,調用 take,拋出異常後進入下一次循環。若是隊列爲空,此處會一直循環。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼
相關文章
相關標籤/搜索