併發編程之 源碼剖析 線程池 實現原理

前言

在上一篇文章中咱們介紹了線程池的使用,那麼如今咱們有個疑問:線程池究竟是怎麼實現的?畢竟好奇是人類的天性。那咱們今天就來看看吧,扒開 他的源碼,一探究竟。java

1. 從 Demo 入手

上圖是個最簡單的demo,咱們從這個 demo 開始看源碼,首先一步一步來看。spring

首先咱們手動建立了線程池,使用了有數量限制的阻塞隊列,使用了線程池工廠提供的默認線程工廠,和一個默認的拒絕策略,咱們看看默認的線程工廠是如何建立的?併發

默認的線程工廠從當前線程中獲取線程組,設置了默認的線程名字前綴 pool-xxx-thread-xxx,強制設置爲非守護線程,強制設置爲默認優先級。ui

而後咱們看看ThreadPoolExecutor 的構造方法:this

沒有什麼特殊的東西,主要是一些判斷。spa

好了,那麼咱們看看 execute 方法是如何實現的。線程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // c = -536870911
    int c = ctl.get();
    // 工做線程數量小於核心線程池設定數,則建立線程。
    if (workerCountOf(c) < corePoolSize) {
        // 若是添加成功則直接返回
        if (addWorker(command, true))
            return;
        // 不然再次獲取活動線程數量
        c = ctl.get();
    }
    // 若是線程池正在運行,而且添加進隊列成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次對線程池狀態檢查, 由於上面 addWorker 過了而且失敗了,因此須要檢查
        int recheck = ctl.get();
        // 若是狀態不是運行狀態,且從隊列刪除該任務成功並嘗試中止線程池
        if (! isRunning(recheck) && remove(command))
            // 拒絕任務
            reject(command);
        // 若是當前工做線程數量爲0(線程池已關閉),則添加一個 null 到隊列中
        else if (workerCountOf(recheck) == 0)
            // 添加個空的任務
            addWorker(null, false);
    }
    // 若是添加隊列失敗,則建立一個任務線程,若是失敗,則拒絕
    else if (!addWorker(command, false))
        // 拒絕
        reject(command);
    }
}
複製代碼

首先,空判斷。設計

而後判斷,若是正在工做的線程小於設置的核心線程,則建立線程並返回,若是正在工做的線程數量大於等於核心線程數量,則試圖將任務放入隊列,若是失敗,則嘗試建立一個 maximumPoolSize 的任務。注意,在remove 方法中,該方法已經試圖中止線程池的運行。3d

從這段代碼中,能夠看到,最重要的方法就是 addWorker 和 workQueue.offer(command) 這段代碼,一個是建立線程,一個是放入隊列。後者就是將任務添加到阻塞隊列中。代理

那麼咱們就看看 addWorker 方法。

2. addWorker 方法-----建立線程池

private boolean addWorker(Runnable firstTask, boolean core)

該方法很長,樓主說一下這個方法的兩個參數,第一個參數爲 Runnable 類型,表示線程池中某個線程的第一個任務,第二個參數是若是是 true,則建立 core 核心線程,若是是 false ,則建立 maximumPoolSize 線程。這兩個線程的生命週期是不一樣的。

樓主截取該方法中最終的代碼:

其中,在該方法中,建立一個 Worker 對象,該對象代理了任務對象,咱們看看該類的構造方法:

經過線程工廠建立線程,注意,傳遞的是 this ,所以,在上面的代碼中國,調用了 worker 對象的 thread 屬性的 start 方法,實際上就是調用了該類的 run 方法。那麼改類的 run 方法是怎麼實現的呢?

調用了自身的 runWorker 方法。這個方法很是的重要。

3. Worker.runWorker(Worker w) 方法-------線程池的最核心方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted. This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
複製代碼

首先說該方法的主要邏輯:

  1. 首先執行 firstTask 的 run 方法。
  2. 而後循環獲取阻塞隊列中的任務,並調用他們的 run 方法。
  3. 若是線程池中的任務異常,就拋出異常並中止運行線程池。

這個方法能夠說就是線程池的核心,在最開始的設定的核心任務數都是直接調用 start 方法啓動線程的,啓動以後,這個線程並不關閉,而是一直在阻塞隊列上等待,若是有任務就執行任務的run 方法,而不是 start 方法,這點很重要。

而該方法中有幾個注意的地方就是線程池留給咱們擴展的,在執行任務以前,會執行 beforeExecute 方法,該方法默認爲空,咱們能夠實現該方法,在任務執行結束後,在 finally 塊中有 afterExecute 方法,一樣也是空的,咱們能夠擴展。

樓主看到這裏的代碼後,大爲讚歎,Doug Lea 能夠說神通常的人物。

那麼,線程池還有一個方法, submit 是如何實現的呢?其實核心邏輯也是 runWorker 方法,否則樓主也不會說這個方法是線程池的核心。

那咱們看看 submit 方法是如何實現的。

4. submit 方法實現原理。

該方法最終也是走 execute 方法的,所以邏輯基本相同,不一樣的是什麼呢?咱們看看。咱們看到,第二行代碼建立了 一個 RunnableFuture 對象,RunnableFuture 是一個接口,具體的實現是什麼呢?咱們看看:

FutureTask

FutureTask 對象,該對象也是一個線程對象:

那咱們就看看該方法的 run 方法。

該方法核心邏輯樓主已經框起來了,其中調用了 call 方法,返回一個返回值,並在set 方法中,將返回值設置在一個變量中,若是是異常,則將異常設置在變量中。咱們看看set方法:

該方法經過CAS將任務狀態狀態從new變成 COMPLETING,而後,設置 outcome 變量,也就是返回值。最後,調用 finishCompletion 方法,完成一些變量的清理工做。

那麼,若是從submit 中得到返回值呢?這要看get方法:

該方法會判斷狀態,若是狀態尚未完成,那麼就調用 awaitDone 方法等待,若是完成了,調用 report 返回值結果。

看見了剛剛設置的 outcome 變量,若是狀態正常,則直接返回,若是狀態爲取消,則拋出異常,其他狀況也拋出異常。

咱們回到 awaitDone 方法,看看該方法如何等待的。

該方法有一個死循環,直到有一個肯定的狀態返回,若是狀態大於 COMPLETING ,也就是 成功了,就返回該狀態,若是正在進行中,則讓出CPU時間片進行等待。若是都不是,則讓該線程阻塞等待。在哪裏喚醒呢?在 finishCompletion 方法中會喚醒該線程。

該方法循環了等待線程鏈表的鏈表,並喚醒鏈表中的每一個線程。

還有一個須要的注意的地方就是,在任務執行完畢會執行 done 方法,JDK 默認是空的,咱們能夠擴展該方法。好比 Spring 的併發包 org.springframework.util.concurrent 就有2個類重寫了該方法。

5. 總結

好了,到這裏,線程池的基本實現原理咱們知道了,也解開了樓主一直以來的疑惑,能夠說,線程池的核心方法就是 runWorker 方法 配合 阻塞隊列,當線程啓動後,就從隊列中取出隊列中的任務,執行任務的 run 方法。能夠說設計的很是巧妙。而回調線程 callback 也是經過該方法,JDK 封裝了 FutureTask 類來執行他們的 call 方法。

good luck!!!!

相關文章
相關標籤/搜索