Java中線程池ThreadPoolExecutor原理探究

1、 前言

線程池主要解決兩個問題:一方面當執行大量異步任務時候線程池可以提供較好的性能,這是由於使用線程池可使每一個任務的調用開銷減小(由於線程池線程是能夠複用的)。另外一方面線程池提供了一種資源限制和管理的手段,好比當執行一系列任務時候對線程的管理,每一個ThreadPoolExecutor也保留了一些基本的統計數據,好比當前線程池完成的任務數目。數組

2、 類圖結構

圖片描述

Executors實際上是個工具類,裏面提供了好多靜態方法,根據用戶選擇返回不一樣的線程池實例。
ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是個Integer的原子變量用來記錄線程池狀態 和 線程池線程個數,相似於ReentrantReadWriteLock使用一個變量存放兩種信息。
Integer類型是32位二進制標示,其中高3位用來表示線程池狀態,後面 29位用來記錄線程池線程個數。安全

clipboard.png

clipboard.png

線程池狀態含義:多線程

RUNNING:接受新任務而且處理阻塞隊列裏的任務
SHUTDOWN:拒絕新任務可是處理阻塞隊列裏的任務
STOP:拒絕新任務而且拋棄阻塞隊列裏的任務同時會中斷正在處理的任務
TIDYING:全部任務都執行完(包含阻塞隊列裏面任務)當前線程池活動線程爲0,將要調用terminated方法
TERMINATED:終止狀態。terminated方法調用完成之後的狀態
線程池狀態轉換:併發

RUNNING -> SHUTDOWN
顯式調用shutdown()方法,或者隱式調用了finalize(),它裏面調用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP
顯式 shutdownNow()方法
SHUTDOWN -> TIDYING
當線程池和任務隊列都爲空的時候
STOP -> TIDYING
當線程池爲空的時候
TIDYING -> TERMINATED
當 terminated() hook 方法執行完成時候
線程池參數:異步

corePoolSize:線程池核心線程個數
workQueue:用於保存等待執行的任務的阻塞隊列。
好比基於數組的有界ArrayBlockingQueue、,基於鏈表的無界LinkedBlockingQueue,最多隻有一個元素的同步隊列SynchronousQueue,優先級隊列PriorityBlockingQueue,具體可參考 https://www.atatech.org/artic...
maximunPoolSize:線程池最大線程數量。
ThreadFactory:建立線程的工廠
RejectedExecutionHandler:飽和策略,當隊列滿了而且線程個數達到maximunPoolSize後採起的策略,好比AbortPolicy(拋出異常),CallerRunsPolicy(使用調用者所在線程來運行任務),DiscardOldestPolicy(調用poll丟棄一個任務,執行當前任務),DiscardPolicy(默默丟棄,不拋出異常)
keeyAliveTime:存活時間。若是當前線程池中的線程數量比基本數量要多,而且是閒置狀態的話,這些閒置的線程能存活的最大時間
TimeUnit,存活時間的時間單位
線程池類型:函數

newFixedThreadPool工具

建立一個核心線程個數和最大線程個數都爲nThreads的線程池,而且阻塞隊列長度爲Integer.MAX_VALUE,keeyAliveTime=0說明只要線程個數比核心線程個數多而且當前空閒則回收。oop

clipboard.png

newSingleThreadExecutor
建立一個核心線程個數和最大線程個數都爲1的線程池,而且阻塞隊列長度爲Integer.MAX_VALUE,keeyAliveTime=0說明只要線程個數比核心線程個數多而且當前空閒則回收。源碼分析

clipboard.png

newCachedThreadPool
建立一個按需建立線程的線程池,初始線程個數爲0,最多線程個數爲Integer.MAX_VALUE,而且阻塞隊列爲同步隊列,keeyAliveTime=60說明只要當前線程60s內空閒則回收。這個特殊在於加入到同步隊列的任務會被立刻被執行,同步隊列裏面最多隻有一個任務,而且存在後立刻會拿出執行。性能

clipboard.png

newSingleThreadScheduledExecutor

建立一個最小線程個數corePoolSize爲1,最大爲Integer.MAX_VALUE,阻塞隊列爲DelayedWorkQueue的線程池。

clipboard.png

其中Worker繼承AQS和Runnable是具體承載任務的對象,Worker繼承了AQS本身實現了簡單的不可重入獨佔鎖,其中status=0標示鎖未被獲取狀態也就是未被鎖住的狀態,state=1標示鎖已經被獲取的狀態也就是鎖住的狀態。

DefaultThreadFactory是線程工廠,newThread方法是對線程的一個分組包裹,其中poolNumber是個靜態的原子變量,用來統計線程工廠的個數,threadNumber用來記錄每一個線程工廠建立了多少線程。

3、 源碼分析

3.1 添加任務到線程池exectue方法

clipboard.png

clipboard.png

若是當前線程池線程個數小於corePoolSize則開啓新線程
不然添加任務到任務隊列
若是任務隊列滿了,則嘗試新開啓線程執行任務,若是線程個數>maximumPoolSize則執行拒絕策略。

重點看addWorkder方法:

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 檢查隊列是否只在必要時爲空.(1)
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    //循環cas增長線程個數
    for (;;) {
        int wc = workerCountOf(c);

        //若是線程個數超限則返回false
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        //cas增長線程個數,同時只有一個線程成功
        if (compareAndIncrementWorkerCount(c))
            break retry;
        //cas失敗了,則看線程池狀態是否變化了,變化則跳到外層循環重試從新獲取線程池狀態,否者內層循環從新cas。
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
    }
}

//到這裏說明cas成功了,(2)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    //建立worker
    final ReentrantLock mainLock = this.mainLock;
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {

        //加獨佔鎖,爲了workers同步,由於可能多個線程調用了線程池的execute方法。
        mainLock.lock();
        try {

            //從新檢查線程池狀態,爲了不在獲取鎖前調用了shutdown接口(3)
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                //添加任務
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        //添加成功則啓動任務
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;}

代碼比較長,主要分兩部分,第一部分雙重循環目的是經過cas增長線程池線程個數,第二部分主要是併發安全的把任務添加到workers裏面,而且啓動任務執行。

先看第一部分的(1)

clipboard.png

展開!運算後等價於

clipboard.png

也就是說下面幾種狀況下會返回false:

當前線程池狀態爲STOP,TIDYING,TERMINATED
當前線程池狀態爲SHUTDOWN而且已經有了第一個任務
當前線程池狀態爲SHUTDOWN而且任務隊列爲空
內層循環做用是使用cas增長線程個數,若是線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,若是變了,則從新進入外層循環從新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。

到了第二部分說明CAS成功了,也就是說線程個數加一了,可是如今任務還沒開始執行,這裏使用全局的獨佔鎖來控制workers裏面添加任務,其實也可使用併發安全的set,可是性能沒有獨佔鎖好(這個從註釋中知道的)。這裏須要注意的是要在獲取鎖後從新檢查線程池的狀態,這是由於其餘線程可可能在本方法獲取鎖前改變了線程池的狀態,好比調用了shutdown方法。添加成功則啓動任務執行。

3.2 工做線程Worker的執行

先看下構造函數:

clipboard.png

這裏添加一個新狀態-1是爲了不當前線程worker線程被中斷,好比調用了線程池的shutdownNow,若是當前worker狀態>=0則會設置該線程的中斷標誌。這裏設置了-1因此條件不知足就不會中斷該線程了。運行runWorker時候會調用unlock方法,該方法吧status變爲了0,因此這時候調用shutdownNow會中斷worker線程。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // status設置爲0,容許中斷
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {

            w.lock();
            // 若是線程池當前狀態至少是stop,則設置中斷標誌;
            // 若是線程池當前狀態是RUNNININ,則重置中斷標誌,重置後須要從新
            //檢查下線程池狀態,由於當重置中斷標誌時候,可能調用了線程池的shutdown方法
            //改變了線程池狀態。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();


            try {
                //任務執行前幹一些事情
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();//執行任務
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //任務執行完畢後幹一些事情
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                //統計當前worker完成了多少個任務
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {

        //執行清了工做
        processWorkerExit(w, completedAbruptly);
    }
}

若是當前task爲空,則直接執行,否者調用getTask從任務隊列獲取一個任務執行,若是任務隊列爲空,則worker退出。

private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

retry:
for (;;) {
    int c = ctl.get();
    int rs = runStateOf(c);

    // 若是當前線程池狀態>=STOP 或者線程池狀態爲shutdown而且工做隊列爲空則,減小工做線程個數
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
        decrementWorkerCount();
        return null;
    }

    boolean timed;      // Are workers subject to culling?

    for (;;) {
        int wc = workerCountOf(c);
        timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (wc <= maximumPoolSize && ! (timedOut && timed))
            break;
        if (compareAndDecrementWorkerCount(c))
            return null;
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }

    try {

        //根據timed選擇調用poll仍是阻塞的take
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
            return r;
        timedOut = true;
    } catch (InterruptedException retry) {
        timedOut = false;
    }
}}

private void processWorkerExit(Worker w, boolean completedAbruptly){
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

//統計整個線程池完成的任務個數
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
} finally {
    mainLock.unlock();
}

//嘗試設置線程池狀態爲TERMINATED,若是當前是shutdonw狀態而且工做隊列爲空
//或者當前是stop狀態當前線程池裏面沒有活動線程
tryTerminate();

//若是當前線程個數小於核心個數,則增長
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}}

3.3 shutdown操做
調用shutdown後,線程池就不會在接受新的任務了,可是工做隊列裏面的任務仍是要執行的,可是該方法馬上返回的,並不等待隊列任務完成在返回。

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    //權限檢查
    checkShutdownAccess();

    //設置當前線程池狀態爲SHUTDOWN,若是已是SHUTDOWN則直接返回
    advanceRunState(SHUTDOWN);

    //設置中斷標誌
    interruptIdleWorkers();
    onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
    mainLock.unlock();
}
//嘗試狀態變爲TERMINATED
tryTerminate();
}

若是當前狀態>=targetState則直接返回,否者設置當前狀態爲targetState
private void advanceRunState(int targetState) {

for (;;) {
    int c = ctl.get();
    if (runStateAtLeast(c, targetState) ||
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
        break;
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

設置全部線程的中斷標誌,主要這裏首先加了全局鎖,同時只有一個線程能夠調用shutdown時候設置中斷標誌,而後嘗試獲取worker本身的鎖,獲取成功則設置中斷標示

private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    for (Worker w : workers) {
        Thread t = w.thread;
        if (!t.isInterrupted() && w.tryLock()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            } finally {
                w.unlock();
            }
        }
        if (onlyOne)
            break;
    }
} finally {
    mainLock.unlock();
}}

3.4 shutdownNow操做
調用shutdown後,線程池就不會在接受新的任務了,而且丟棄工做隊列裏面裏面的任務,正在執行的任務會被中斷,可是該方法馬上返回的,並不等待激活的任務執行完成在返回。返回隊列裏面的任務列表。

調用隊列的drainTo一次當前隊列的元素到taskList,
可能失敗,若是調用drainTo後隊列海不爲空,則循環刪除,並添加到taskList
public List<Runnable> shutdownNow() {

List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    checkShutdownAccess();//權限檢查
    advanceRunState(STOP);// 設置線程池狀態爲stop
    interruptWorkers();//中斷線程
    tasks = drainQueue();//移動隊列任務到tasks
} finally {
    mainLock.unlock();
}
tryTerminate();
return tasks;

}

調用隊列的drainTo一次當前隊列的元素到taskList,
可能失敗,若是調用drainTo後隊列海不爲空,則循環刪除,並添加到taskList
private List<Runnable> drainQueue() {

BlockingQueue<Runnable> q = workQueue;
List<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
    for (Runnable r : q.toArray(new Runnable[0])) {
        if (q.remove(r))
            taskList.add(r);
    }
}
return taskList;

}

3.5 awaitTermination操做

等待線程池狀態變爲TERMINATED則返回,或者時間超時。因爲整個過程獨佔鎖,因此通常調用shutdown或者shutdownNow後使用。

public boolean awaitTermination(long timeout, TimeUnit unit)

throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

4、總結

線程池巧妙的使用一個Integer類型原子變量來記錄線程池狀態和線程池線程個數,設計時候考慮到將來(2^29)-1個線程可能不夠用,到時只須要把原子變量變爲Long類型,而後掩碼位數變下就能夠了,可是爲啥如今不一勞永逸的定義爲Long那,主要是考慮到使用int類型操做時候速度上比Long類型快些。

經過線程池狀態來控制任務的執行,每一個worker線程能夠處理多個任務,線程池經過線程的複用減小了線程建立和銷燬的開銷,經過使用任務隊列避免了線程的阻塞從而避免了線程調度和線程上下文切換的開銷。

另外須要注意的是調用shutdown方法做用僅僅是修改線程池狀態讓如今任務失敗並中斷當前線程,這個中斷並非讓正在運行的線程終止,而是僅僅設置下線程的中斷標誌,若是線程內沒有使用中斷標誌作一些事情,那麼這個對線程沒有影響。

相關文章
相關標籤/搜索