高頻面試考點:ThreadPoolExector源碼分析

線程池源碼也是面試常常被提問到的點,我會將全局源碼作一分析,而後告訴你面試考啥,怎麼答。java

爲何要用線程池?

簡潔的答兩點就行。面試

  1. 下降系統資源消耗。
  2. 提升線程可控性。

如何建立使用線程池?

JDK8提供了五種建立線程池的方法:緩存

  1. 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
複製代碼
  1. (JDK8新增)會根據所需的併發數來動態建立和關閉線程。可以合理的使用CPU進行對任務進行併發操做,因此適合使用在很耗時的任務。

注意返回的是ForkJoinPool對象。安全

public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}

什麼是ForkJoinPool:

public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
使用一個無限隊列來保存須要執行的任務,能夠傳入線程的數量,
不傳入,則默認使用當前計算機中可用的cpu數量,
使用分治法來解決問題,使用fork()和join()來進行調用
複製代碼
  1. 建立一個可緩存的線程池,可靈活回收空閒線程,若無可回收,則新建線程。
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複製代碼
  1. 建立一個單線程的線程池。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複製代碼
  1. 建立一個定長線程池,支持定時及週期性任務執行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
複製代碼

上層源碼結構分析

Executor結構:
bash



Executor

一個運行新任務的簡單接口併發

public interface Executor {

    void execute(Runnable command);
}
複製代碼

ExecutorService

擴展了Executor接口。添加了一些用來管理執行器生命週期和任務生命週期的方法框架


AbstractExecutorService

對ExecutorService接口的抽象類實現。不是咱們分析的重點。async

ThreadPoolExecutor

Java線程池的核心實現。oop

ThreadPoolExecutor源碼分析

屬性解釋

// AtomicInteger是原子類 ctlOf()返回值爲RUNNING;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 高3位表示線程狀態
private static final int COUNT_BITS = Integer.SIZE - 3;
// 低29位表示workerCount容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 能接收任務且能處理阻塞隊列中的任務
private static final int RUNNING    = -1 << COUNT_BITS;
// 不能接收新任務,但能夠處理隊列中的任務。
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 不接收新任務,不處理隊列任務。
private static final int STOP       =  1 << COUNT_BITS;
// 全部任務都終止
private static final int TIDYING    =  2 << COUNT_BITS;
// 什麼都不作
private static final int TERMINATED =  3 << COUNT_BITS;

// 存聽任務的阻塞隊列
private final BlockingQueue<Runnable> workQueue;
複製代碼

值的注意的是狀態值越大線程越不活躍。源碼分析

線程池狀態的轉換模型:

構造器

public ThreadPoolExecutor(int corePoolSize,//線程池初始啓動時線程的數量 int maximumPoolSize,//最大線程數量 long keepAliveTime,//空閒線程多久關閉? TimeUnit unit,// 計時單位 BlockingQueue<Runnable> workQueue,//聽任務的阻塞隊列 ThreadFactory threadFactory,//線程工廠 RejectedExecutionHandler handler// 拒絕策略) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
複製代碼

在向線程池提交任務時,會經過兩個方法:execute和submit。

本文着重講解execute方法。submit方法放在下次和Future、Callable一塊兒分析。

execute方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // clt記錄着runState和workerCount
    int c = ctl.get();
    //workerCountOf方法取出低29位的值,表示當前活動的線程數
    //而後拿線程數和 核心線程數作比較
    if (workerCountOf(c) < corePoolSize) {
        // 若是活動線程數<核心線程數
        // 添加到
        //addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷
        if (addWorker(command, true))
            // 若是成功則返回
            return;
        // 若是失敗則從新獲取 runState和 workerCount
        c = ctl.get();
    }
    // 若是當前線程池是運行狀態而且任務添加到隊列成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 從新獲取 runState和 workerCount
        int recheck = ctl.get();
        // 若是不是運行狀態而且 
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            //第一個參數爲null,表示在線程池中建立一個線程,但不去啓動
            // 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize
            addWorker(null, false);
    }
    //再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize
    else if (!addWorker(command, false))
        //若是失敗則拒絕該任務
        reject(command);
}
複製代碼

總結一下它的工做流程:

  1. workerCount < corePoolSize,建立線程執行任務。

  2. workerCount >= corePoolSize&&阻塞隊列workQueue未滿,把新的任務放入阻塞隊列。

  3. workQueue已滿,而且workerCount >= corePoolSize,而且workerCount < maximumPoolSize,建立線程執行任務。

  4. 當workQueue已滿,workerCount >= maximumPoolSize,採起拒絕策略,默認拒絕策略是直接拋異常。

經過上面的execute方法能夠看到,最主要的邏輯仍是在addWorker方法中實現的,那咱們就看下這個方法:

addWorker方法

主要工做是在線程池中建立一個新的線程並執行

參數定義:

  • firstTask the task the new thread should run first (or null if none). (指定新增線程執行的第一個任務或者不執行任務)
  • core if true use corePoolSize as bound, else maximumPoolSize.(core若是爲true則使用corePoolSize綁定,不然爲maximumPoolSize。 (此處使用布爾指示符而不是值,以確保在檢查其餘狀態後讀取新值)。)
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {

        int c = ctl.get();
        // 獲取運行狀態
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 若是狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務)
        // 而且 若是 !(rs爲SHUTDOWN 且 firsTask爲空 且 阻塞隊列不爲空)
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 返回false
            return false;

        for (;;) {
            //獲取線程數wc
            int wc = workerCountOf(c);
            // 若是wc大與容量 || core若是爲true表示根據corePoolSize來比較,不然爲maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增長workerCount(原子操做)
            if (compareAndIncrementWorkerCount(c))
                // 若是增長成功,則跳出
                break retry;
            // wc增長失敗,則再次獲取runState
            c = ctl.get();  // Re-read ctl
            // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回從新執行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 根據firstTask來建立Worker對象
        w = new Worker(firstTask);
        // 根據worker建立一個線程
        final Thread t = w.thread;
        if (t != null) {
            // new一個鎖
            final ReentrantLock mainLock = this.mainLock;
            // 加鎖
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 獲取runState
                int rs = runStateOf(ctl.get());
                // 若是rs小於SHUTDOWN(處於運行)或者(rs=SHUTDOWN && firstTask == null)
                // firstTask == null證實只新建線程而不執行任務
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 若是t活着就拋異常
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 不然加入worker(HashSet)
                    //workers包含池中的全部工做線程。僅在持有mainLock時訪問。
                    workers.add(w);
                    // 獲取工做線程數量
                    int s = workers.size();
                    //largestPoolSize記錄着線程池中出現過的最大線程數量
                    if (s > largestPoolSize)
                        // 若是 s比它還要大,則將s賦值給它
                        largestPoolSize = s;
                    // worker的添加工做狀態改成true 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 若是worker的添加工做完成
            if (workerAdded) {
                // 啓動線程
                t.start();
                // 修改線程啓動狀態
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回線啓動狀態
    return workerStarted;
複製代碼
爲何須要持有mainLock?

由於workers是HashSet類型的,不能保證線程安全。

w = new Worker(firstTask);如何理解呢

Worker.java

private final class Worker extends AbstractQueuedSynchronizer implements Runnable 複製代碼

能夠看到它繼承了AQS併發框架還實現了Runnable。證實它仍是一個線程任務類。那咱們調用t.start()事實上就是調用了該類重寫的run方法.

Worker爲何不使用ReentrantLock來實現呢?

tryAcquire方法它是不容許重入的,而ReentrantLock是容許重入的。對於線程來講,若是線程正在執行是不容許其它鎖重入進來的。

線程只須要兩個狀態,一個是獨佔鎖,代表正在執行任務;一個是不加鎖,代表是空閒狀態。

public void run() {
    runWorker(this);
}
複製代碼

run方法又調用了runWorker方法:

final void runWorker(Worker w) {
    // 拿到當前線程
    Thread wt = Thread.currentThread();
    // 拿到當前任務
    Runnable task = w.firstTask;
    // 將Worker.firstTask置空 而且釋放鎖
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 若是task或者getTask不爲空,則一直循環
        while (task != null || (task = getTask()) != null) {
            // 加鎖
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted. This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // return ctl.get() >= stop 
            // 若是線程池狀態>=STOP 或者 (線程中斷且線程池狀態>=STOP)且當前線程沒有中斷
            // 其實就是保證兩點:
            // 1. 線程池沒有中止
            // 2. 保證線程沒有中斷
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 中斷當前線程
                wt.interrupt();
            try {
                // 空方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 執行run方法(Runable對象)
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 執行完後, 將task置空, 完成任務++, 釋放鎖
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 退出工做
        processWorkerExit(w, completedAbruptly);
    }
複製代碼

總結一下runWorker方法的執行過程:

  1. while循環中,不斷地經過getTask()方法從workerQueue中獲取任務
  2. 若是線程池正在中止,則中斷線程。不然調用3.
  3. 調用task.run()執行任務;
  4. 若是task爲null則跳出循環,執行processWorkerExit()方法,銷燬線程workers.remove(w);

這個流程圖很是經典:

除此以外,ThreadPoolExector還提供了tryAcquiretryReleaseshutdownshutdownNowtryTerminate、等涉及的一系列線程狀態更改的方法有興趣能夠本身研究。大致思路是同樣的,這裏不作介紹。

Worker爲何不使用ReentrantLock來實現呢?

tryAcquire方法它是不容許重入的,而ReentrantLock是容許重入的。對於線程來講,若是線程正在執行是不容許其它鎖重入進來的。

線程只須要兩個狀態,一個是獨佔鎖,代表正在執行任務;一個是不加鎖,代表是空閒狀態。

在runWorker方法中,爲何要在執行任務的時候對每一個工做線程都加鎖呢?

shutdown方法與getTask方法存在競態條件.(這裏不作深刻,建議本身深刻研究,對它比較熟悉的面試官通常會問)

高頻考點

  1. 建立線程池的五個方法。
  2. 線程池的五個狀態
  3. execute執行過程。
  4. runWorker執行過程。(把兩個流程圖記下,理解後說個大該就行。)
相關文章
相關標籤/搜索