java線程池源碼分析

咱們在關閉線程池的時候會使用shutdown()和shutdownNow(),那麼問題來了:java

  1. 這兩個方法又什麼區別呢?
  2. 他們背後的原理是什麼呢?
  3. 線程池中線程超過了coresize後會怎麼操做呢?

爲了解決這些疑問咱們須要分析java線程池的原理。ide

1 基本使用

1.1 繼承關係

日常咱們在建立線程池常用的方式以下:oop

ExecutorService executorService = Executors.newFixedThreadPool(5);

看下newFixedThreadPool源碼, 其實Executors是個工廠類,內部是new了一個ThreadPoolExecuto:源碼分析

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

參數的意義就不介紹了,網上有不少內容,看源碼註釋也能夠明白。ui

線程池中類的繼承關係以下:this

「繼承關係」

2 源碼分析

2.1 入口

將一個Runnable放到線程池執行有兩種方式,一個是調用ThreadPoolExecutor#submit,一個是調用ThreadPoolExecutor#execute。其實submit是將Runnable封裝成了一個RunnableFuture,而後再調用execute,最終調用的仍是execute,因此咱們這裏就只從ThreadPoolExecutor#execute開始分析。.net

2.2 ctl和線程池狀態

ThreadPoolExecutor中有個重要的屬性是ctl線程

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示狀態,低29位表示線程池中線程的多少
    private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 左移29爲減1,即最終獲得爲高3位爲0,低29位爲1的數字,做爲掩碼,是二進制運算中經常使用的方法

    private static final int RUNNING    = -1 << COUNT_BITS; // 高三位111
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 高三位000
    private static final int STOP       =  1 << COUNT_BITS; // 高三位001
    private static final int TIDYING    =  2 << COUNT_BITS; // 高三位010
    private static final int TERMINATED =  3 << COUNT_BITS; // 高三位011

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; } // 保留高3位,即計算線程池狀態
    private static int workerCountOf(int c)  { return c & CAPACITY; } // 保留低29位, 即計算線程數量
    private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl

ThreadPoolExecutor中使用32位Integer來表示線程池的狀態和線程的數量,其中高3位表示狀態,低29位表示數量。若是對二進制運行不熟悉能夠參考:二進制運算。從上也能夠看出線程池有五種狀態,咱們關心前3中狀態rest

  1. RUNNING 接收task和處理queue中的task
  2. SHUTDOWN 再也不接收新的task,可是會處理完正在運行的task和queue中的task,不會interrupt正在執行的task,其實調用shutdown後線程池處於該狀態
  3. STOP 再也不接收新的task,也不處理queue中的task,同時正在運行的線程會被interrupt。調用shutdownNow後線程池會處於該狀態。

2.3 execute

明白了ctl和線程池的狀態後咱們來具體看下execute的處理邏輯code

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 線程數量小於coresize,那麼就調用addWorker
            if (addWorker(command, true)) // 這裏知道,返回true就不往下走了
                return;
            c = ctl.get();
        }

        // 不知足上述條件,即線程數量 >= coreSize,或者addWorker返回fasle,那麼走下面的邏輯
        if (isRunning(c) && workQueue.offer(command)) { // 能夠看到是往blockingqueue中放task
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        // 若是不知足上述條件,即blockingqueue也放不進去,那麼就走下面的邏輯
        else if (!addWorker(command, false))
            reject(command);
    }

從上面的代碼我們能夠看到線程池處理線程的基本思路是: 若是線程數量小於coresize那麼就執行task,不然就放到queue中,若是queue也放不下就走下面addWorker,若是也失敗了,那麼就調用reject策略。固然還涉及一些細節,須要進一步分析。

2.4 addWorker

execute中反覆調用的是addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) { 
            int c = ctl.get();
            int rs = runStateOf(c); // 計算線程池狀態

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&  // 先忽略
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c); // 線程數量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) // 可見若是超過了運行的最大線程數量則返回false
                    return false;
                if (compareAndIncrementWorkerCount(c)) // 若是成功,線程數量確定加1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask); // 將task封裝成了Worker
            final Thread t = w.thread; // 來獲取worker的thread
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 將worker添加到hashset中報存,關閉的時候要使用
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { // 通過一些檢查, 啓動了work的thread
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w); // 若是線程啓動失敗,則將線程數減1
        }
        return workerStarted;
    }

上面的代碼看起來比較複雜,可是若是咱們忽略具體的細節,從大體思路上看,其實也比較簡單。上面代碼的主要思路就是:除了一些狀態檢查外,首先將線程數量加1,而後將runnable分裝成一個worker,去啓動worker線程,若是啓動失敗則再將線程數量減1。返回false的緣由多是線程數量大於容許的數量。因此addWorker調用成功,則會啓動一個work線程,且線程池中線程數量加1

2.5 worker

woker是線程池中真正的線程實體。線程池中的線程不是自定義的Runnable實現的線程,而是woker線程,worker在run方法裏調用了自定義的Runnable的run方法

Worker繼承了AQS,並實現了runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); // 這個時候回頭看看addWorker中t.start(), 就明白了啓動的實際是一個Woker線程,而不是用戶定義的Runnable
    }

    public void run() {
        runWorker(this);
    }
}

Worker中firstTask存儲了用戶定義的Runnable,thread是以他自身爲參數的Thread對象。getThreadFactory()默認返回是Executors#DefaultThreadFactory,用來新建線程,並定義了線程名稱的前綴等:

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-"; //
        }

        public Thread newThread(Runnable r) { //  調用後新建一個線程
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

2.6 runWoker

Worker的run方法調用了runWorker,並將自身做爲參數傳了進去,下面看看問題的關鍵:runWorker:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { // 注意這裏的while循環,這裏很關鍵。這裏注意,若是兩個條件都知足了,那麼線程就結束了
                w.lock(); // 注意worker繼承了AQS,至關於本身實現了鎖,這個在關閉線程的時候有用
            
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); // 僅僅是回調了Runnable的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null; // 重點,task執行完後就被置位null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); // 注意while循環結束後worker線程就結束了
        }
    }

runWorker中有個while循環,while中判斷條件爲(task != null || (task = getTask()) != null)。假設咱們按照正常的邏輯,即task != null,則會調用task.run方法,執行完run方法後而後在finally中task被置爲null;接着又進入while循環判斷,此次task == null,因此不符合第一個判斷條件,則會繼續判斷 task == getTask()) != null。咱們來看下getTask作了什麼。

2.7 getTask

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 當調用shutdown()方法的時候,線程狀態就爲shutdown了; 當調用shutdownow()的時候,線程狀態就爲stop了
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) { // 經過死循環設置狀態
                int wc = workerCountOf(c);

                // 設置容許core線程timeout或者線程數量大於coresize,則容許線程超時
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                // 若是線程數量 <= 最大線程數 且 沒有超時和容許超時 則跳出死循環
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {

                // 這裏是關鍵,若是容許超時則調用poll從queue中取出task,不然就調用take可阻塞的獲取task
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null) // 獲取到task則返回,而後runWorker的while循環就繼續執行,並調用task的run方法
                    return r;
                timedOut = true; // 不然設置爲timeOut,繼續循環,可是下次循環會走到if (compareAndDecrementWorkerCount(c)) 處,並返回null。
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

忽略掉具體細節,getTask的總體思路是: 從blockqueue中拿去task,若是queue中沒有task則分兩種狀況:

  1. 若是容許超時則調用poll(keepAliveTime, TimeUnit.NANOSECONDS),在規定時間沒有返回了則getTask返回null,runWorker結束while循環,work線程結束。當線程數量大於coresize且blockqueue滿的時候且小於maxsize的時候,新建立的線程即是走這個邏輯;或者容許core線程超時的時候也是走這個邏輯
  2. 若是不容許超時,則會一直阻塞直到blockqueue中有了新的task。take方法阻塞則表示worker線程也阻塞,也就是在沒有task執行的狀況下,worker線程便會阻塞等待。core線程走的就是這個邏輯。

這個時候回頭再看下runWorker,若是task != null,那麼就會執行task的run方法,執行完後task就會爲被置爲null,再次進入while循環執行getTask阻塞在這裏了。經過這種方式保留住了線程。若是while循環結束了,那麼worker線程也就結束了。

2.8 再看addWorker


分析到這裏咱們再來看下addWoker。addWorker能夠將第一個參數設置爲null。例如ThreadPoolExecutor#prestartAllCoreThreads:

public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true)) // addWorker第一個參數是null
            ++n;
        return n;
    }

通過前面的分析,咱們知道addWoker用來啓動一個worker線程,worker線程調用runWorker來執行,而runWorker中有個while循環,判斷條件是(task != null || (task = getTask()) != null)。由於咱們傳入的task爲null,因此就會判斷task = getTask()) != null,而getTask就是去blockqueue中拿去數據,若是沒有任務就會阻塞住。這個時候就是一個阻塞的線程在等待task的到來了。因此傳入參數爲null表示建立一個空的線程,什麼都不執行。

2.9 再看execute

已經知道了線程池內部的大概工做狀況,咱們再來看下若是全部core線程都建立好了且處於空置狀態,這個時候新放入一個線程的執行流程。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // core線程都建立好了,因此判斷條件不知足
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        // 會走到這裏,會經過offer往blockingqueue裏放置一個task。這個時候阻塞的core線程會經過blockingqueue的take拿到task執行,相似一個生產者消費者的狀況
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        // 若是blockingqueue添加失敗,則建立線程直到maxsize
        else if (!addWorker(command, false))
            reject(command);
    }

可見,線程和execute經過blockingqueue來通訊,而不是其餘方式,execute往blockingqueue中放置task,線程經過take來獲取。總體線程池的邏輯以下圖

線程池

2.10 shutdown

這個時候咱們終於能夠來看看shutdown和shutdownNow了

看下shutdown

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); // 重點,將線程狀態置爲shutdown,這樣getTask等workqueue爲空後就返回null了
            interruptIdleWorkers(); // 重點
            onShutdown(); // 什麼都沒作
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
}

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;

                // 線程沒有中斷 且 獲取到worker的鎖
                if (!t.isInterrupted() && w.tryLock()) { 
                    try {
                        t.interrupt(); // 調用interrup,中斷線程
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
  1. shutdown的核心方法在interruptIdleWorkers裏,這裏能夠看到在t.interrupt的時候有個判斷添加,一個是線程沒有設置中斷標記,第二個是獲取到worker的鎖,咱們注意下第二個條件。回頭看下runWorker,while中執行task的run方法的時候,會先獲取到worker線程的鎖,因此若是線程正在執行task的run方法,則shutdown的時候會獲取鎖失敗,也就不會中斷線程了。這裏能夠得出結論:shutdown不會中斷正在執行的線程
  2. 若是blockingqueu中有task還沒執行完呢? 這個時候while中的take並不會阻塞,也不會被中斷,shutdown中也沒有清空blockingqueue的操做。因此能夠得出結論:shutdown會等blockingqueue中的task執行完成再關閉。能夠說shutdown是一種比較溫柔的關閉方式了。
  3. 若是core線程都阻塞在take方法上了,即沒有正在執行的task了,那麼這個時候 t.interrupt則會中斷take方法,worker線程的while循環結束,worker線程結束。當全部的worker線程都結束後線程池就關閉了

總結下就是: shutdown會把它被調用前放到線程池中的task所有執行完。

2.11 shutdownNow

再來看下shutdownNow

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); // 重點,將線程狀態置爲stop
            interruptWorkers(); // 重點
            tasks = drainQueue(); // 重點
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
}

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
}

void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 沒有去獲取woker的鎖
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
}

private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList); // 將blockingqueue中的task清空
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

從上面的代碼能夠看出:

  1. shutdownNow不會去獲取worker的鎖,因此shutdownNow會致使正在運行的task也被中斷
  2. shutdownNow會將blockingqueue中的task清空,因此在blockingqueue中的task也不會被執行

總結就是shutdownNow比較粗暴,調用他後,他會將全部以前提交的任務都interrupt,且將blockingqueue中的task清空

另外就是不管是shutdown仍是shutdownNow都是調用Thread的interrupt()方法。若是task不響應中斷或者忽略中斷標記,那麼這個線程就不會被終止。例如在run中執行如下邏輯

poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    System.out.println("b");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.printf("不處理"); // 忽略中斷
                    }
                }

            }
        });

運行結果是,即便調用了shutdownNow也終止不了線程運行

b
0
不處理b
b
b
b
b
....

3 總結

  1. 線程經過while循環不停的從blockingqueue中獲取task來保留線程,避免重複重建線程

4 參考

  1. https://blog.csdn.net/cleverGump/article/details/50688008
  2. https://zhuanlan.zhihu.com/p/30108890
相關文章
相關標籤/搜索