JDK源碼學習2-ThreadPoolExecutor學習,源碼一篇就夠了

寫在開篇

剛開始新學一門框架,直接看源碼是比較痛苦的,也諮詢過一些前輩,怎樣看源碼,他們說從「入口」看,那麼什麼是入口呢?我摸索了好久,我認爲入口是如下兩個java

  1. 初始化的過程。能夠是一個類的構造器,也能夠是Bean的initMethod,通常這裏會作一些變量的賦值,或者起一些服務。
  2. 調用流程。好比本篇咱們講線程池,2000行代碼,從頭看會至關吃力,若是是我,我會從execute(task),也就是從提交一個任務的時候開始看。

    這是我總結的兩個「入口」,也是我看源碼的主線,有更好的經驗能夠一塊兒交流。下面我將從這兩條主線來開始源碼的分析。編程

1. 初始化

看一下構造器的內容併發

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
      TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory                       
      threadFactory,RejectedExecutionHandler handler) {
      
    if (corePoolSize < 0 ||maximumPoolSize <= 0 || maximumPoolSize < corePoolSize ||
    keepAliveTime < 0) 
        throw new IllegalArgumentException();
        
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
        
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

線程池的初始化須要7個參數,看過上一篇文章的朋友應該對這些參數很熟悉了,在此就不贅述了。框架

2. 提交任務後,線程池在幹什麼

2.1 提交任務

在上一篇文章(2.5 入隊)中,咱們知道了當提交一個新任務,什麼狀況下會新建線程,何時塞入工做隊列,什麼狀況下會拒絕任務。請務必提早了解基礎概念,再看下文的源碼說明:ide

首先,咱們要了解下ctl變量表示什麼,代碼中會大量用到。ctl變量是一個AtomicInteger,就意味着,ctl是原子性,一個int有32位,前3位表示當前線程池的狀態(RUNNING,STOP…),後29位表示當前的線程數(具體實現能夠百度,這裏不是重點),爲何要這樣作呢,由於狀態和線程數是兩個變量,而且這兩個變量的關係是息息相關的,若是分開賦值,那麼將沒法保證原子性。若是要保證原子性,就得上鎖,這樣會大大下降性能。AtomicInteger類型作到了既保證了原子性,也無需上鎖,是否是很方便(以前我也不理解爲何,直到看到一篇博文,比較贊同這種說法,若是有不一樣意見,能夠一塊兒討論哈)性能

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException()
        //先獲取ctl
        int c = ctl.get();
        //1.若是池的工做線程數小於core size,就新建線程來處理這個任務
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                 return;
                //若是新建線程失敗,就更新ctl
             c = ctl.get();
        }
        //2.若是池的工做線程數大於等於core size,而且線程池的狀態是RUNNING,
             //而且能夠塞進工做隊列,就再從新拉一下ctl,作recheck
        //2.1若是線程池的狀態不是RUNNING,就從隊列中刪除這個任務,再拒絕這個任務
        //2.2若是線程池的狀態是RUNNING,線程池又沒有線程在運行,就新建一個工做線程來處理隊列裏的任務
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
             if (! isRunning(recheck) && remove(command))
                 reject(command);
             else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
       //3.若是池的工做線程數大於等於core size,但沒法塞入工做隊列,
            //那麼就新建工做線程來處理這個任務,若是無法新建線程,
            //意味着這個線程數SHUT DOWN,或者線程池飽和了,咱們就拒絕這個任務
        else if (!addWorker(command, false))
            reject(command);
}

2.2 新建線程

新建線程主要在addWorker這個方法中完成,若是傳入task,表示新建的線程直接來處理這個新task,若是傳入的是null,那麼這個新建線程就是來處理工做隊列裏的任務。新建線程主要分爲3步:this

1)不停地check線程狀態和線程數,而後將線程數+1(修改ctl)。由於以前的全部操做都沒有上鎖,直到修改線程數以前,任何線程都有機會修改ctl。

2)新建包裝類Worker。在此先認爲Worker包裝了一個Thread,一個處理任務的勞動力,詳細的後面再說。而後將worker加入workers。spa

3)啓動worker。線程

private boolean addWorker(Runnable firstTask, boolean core) {
    //第一步:        
    retry:
    for (;;) {
        //再次拉取最新的線程池狀態
        int c = ctl.get();
        int rs = runStateOf(c);
        //若是線程池的狀態是RUNNING,或者雖然是SHUTDOWN的狀態,但firstTask是空,工做隊列非空
        //(說明這個方法的調用是爲了增長工做線程來處理隊列任務),那就繼續,不然返回false,也就是addWorker失敗
        if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
            return false;
        for (;;) {
            //獲取線程數,根據傳入的core來和core size/max size作對比,好比在上一步提交任務的時候,
           //當前線程數 < core size,因而以新建線程來處理新任務的模式傳參給addWorker(),
           //如今發現線程數變成 >= core size了,固然是不能繼續下去了,以此類推。
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //上一步recheck沒問題,那麼就將線程數 +1,跳出全部循環   
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //線程數+1失敗,說明有其餘線程改變了這個線程數,須要再次判斷當前線程池的狀態和線程數    
            c = ctl.get();  // Re-read ctl
            
           //線程狀態若是沒變,咱們就只重複當前循環check線程數,若是狀態也變了,
            //咱們就要跳到retry標誌的位置,也就是最外層的循環裏,從新check 線程池的狀態和線程數。
            if (runStateOf(c) != rs) 
                continue retry; 
        }
    }
    //第二步: 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null; 
    try {
        //在此先認爲Worker包裝了一個Thread,一個處理任務的勞動力,詳細的後面再說,
        //這裏須要注意的是,firstTask和thread沒有關係。
        w = new Worker(firstTask);
        final Thread t = w.thread;
        //若是勞動力建立成功
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //如今咱們拿到了mainLock的鎖了,由於接下來,咱們要處理workers,在此以前,
                //須要再次確認線程數與狀態,以前都沒上鎖,萬一線程池被停了呢?
                int rs = runStateOf(ctl.get());
                //若是狀態是RUNNING或者雖然狀態是SHUTDOWN,可是沒傳入新任務.
                if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
                    //若是線程已經開始跑了(注意,咱們尚未手動啓動線程,執行start方法),就拋出異常
                    if (t.isAlive()) // precheck that t is startable  
                        throw new IllegalThreadStateException();
                    //加入到workers中,workers是一個hashset,裏面儲存了這些幹活勞動力
                    //而後更新largestPoolSize,就釋放鎖啦
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //第三步:
            //若是一切順利,線程數改了,勞動力也添加成功了,就來啓動線程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //若是線程啓動失敗,那就回滾以前所作的一些修改操做,並檢查,是否是須要中止這個線程池
        if (! workerStarted)
            addWorkerFailed(w);
    }
    //返回新線程的啓動結果
    return workerStarted;
}

2.3 處理任務

在上一步,咱們知道了處理任務的勞動力Worker,這裏簡單介紹一下Worker,詳細地不說,這裏咱們只瞭解在整個流程中用到的部分。調試

首先Worker實現了Runnable接口,繼承自AbstractQueuedSynchronizer,其中有兩個變量Thread threadRunnable firstTask,首先,咱們來看下初始化:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

firstTask是咱們在一開始就提交的任務,若是咱們addWorker()傳入的是null,那麼這個firstTask也是null。thread就是線程自己,ThreadFactory可由線程池初始化的時候傳入自定義的工廠,若是沒有,線程池內部會有一個默認的工廠,默認狀況下,會new Thread()

這裏引伸一下,自定義的工廠類能夠作什麼呢?雖然咱們能夠藉此修改線程優先級或者守護狀態,可是不建議你這麼作。經常使用的作法,能夠經過自定義工廠傳入自定義的Thread,這裏咱們能夠定製線程的行爲,好比,修改線程的名字,設置自定義 UncaughtExceptionHandlerLogger寫入信息,維護一些統計信息(包括有多少線程被建立和銷燬),以及在線程被建立或終止時把調試消息寫入日誌。

---------------摘自《Java併發編程實戰》

如今咱們來看一下Workerrun()方法,爲何會調用run()方法呢?上一步啓動線程不是使用thread.start()方法嗎?咱們來看一下這段註釋(來自thread.run()):

/**
* If this thread was constructed using a separate
* <code>Runnable</code> run object, then that
* <code>Runnable</code> object's <code>run</code> method is called;
* otherwise, this method does nothing and returns.
* <p>
* Subclasses of <code>Thread</code> should override this method.
*
* @see     #start()
* @see     #stop()
* @see     #Thread(ThreadGroup, Runnable, String)
*/
@Override
public void run() {
    if (target != null) {
        target.run();
    }
}

也就是,若是Thread對象使用了Runnable作初始化,那麼在調用Thread.run()的時候,會調用這個Runnable對象的run方法,還記得在Worker類的構造器中的 this.thread = getThreadFactory().newThread(this);這句嗎?所以,當啓動工做線程的時候,咱們實際是運行的Worker.run()

Worker.run()方法內部只有一句「runWorker(this)」,因此咱們直接來看runWorker()方法:

這個方法的主要任務,就是不斷從隊列中領取任務,運行任務。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //若是這個線程在構造的時候沒有傳入task,而且也沒辦法經過getTask()拿到新任務,那麼這個線程就被回收了
        //getTask()在這個能夠認爲是從隊列中領取任務,具體實現,下文再說
        while (task != null || (task = getTask()) != null) {
            //在運行任務前,咱們須要上鎖,除非線程池STOP了,不然不容許打斷這個線程
            w.lock();
            // 若是線程池的狀態是STOP了,確認當前線程是否被打斷,若是沒有被打斷,就打斷這個線程
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&          
                                                      runStateAtLeast(ctl.get(), STOP))) && 
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //beforeExecute方法拋出的異常會致使線程回收
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //run
                    task.run();
                    } catch (RuntimeException x) {
                        thrown = x; 
                        throw x;
                    } catch (Error x) {
                        thrown = x; 
                        throw x;
                    } catch (Throwable x) {
                        thrown = x; 
                        throw new Error(x);
                    } finally {
                        //運行中的任務若是拋出異常,雖然咱們會將異常傳入Thrown,可是也會致使線程的回收
                        afterExecute(task, thrown);
                    }
            } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

2.4 線程回收

由上面的代碼能夠看出,一個工做線程的大體工做流程是,首先看這個線程初始化的時候有沒有傳入task,沒有的話,就經過getTask()方法嘗試獲取task。若是拿到這個task,就會先上鎖,而後執行task.run(),若是沒拿到,那麼這個線程就會經過processWorkerExit()回收掉。

首先,咱們看下getTask()方法,有的人可能以爲無非是從隊列裏面取任務,的確這個方法的主要任務是這個,可是要記住線程池大部分的狀況下是不上鎖的,因此咱們要常常檢查線程池的狀態和線程數,還記得在初始化線程池的時候傳入的keep-alive time嗎?這裏就用到了這個參數。

咱們先看一下注釋:

getTask()方法是用於經過阻塞或者等待時間來獲取任務,至於選擇哪一種模式根據配置而定。getTask()方法在如下4種狀況下會返回null(也就是這個工做線程會被回收):

1)若是當前線程數大於max size(可能由於setMaximumPoolSize方法改變了線程池的max size)。

2)線程池停下了(STOP)。

3)線程池SHUTDOWN而且隊列空了。

4)線程等待超時(設置了allowCoreThreadTimeOut或者線程數 > core size)

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //檢查是否符合第2,3種狀況,是就刪除線程數,返回null
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        //檢查是否符合第4種狀況
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //檢查是否符合第1種狀況  或者這個線程以前嘗試過獲取任務而且符合第四種狀況。
        //若是當前線程數> 1或者隊列已經沒有任務了,就刪除線程數,返回null
        if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //若是設置了allowCoreThreadTimeOut或者當前線程數 > core size,就使用poll方法拿任務
            //不然就用take方法阻塞拿任務
            Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            //若是poll方法超時,沒拿到任務,就將timedOut設置爲true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

接下來,咱們看下,若是一個Worker被回收以前會接受怎樣的處理:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //若是是因異常而被回收的線程,那就沒有經歷過線程數的-1的處理,這裏補充完成
    if (completedAbruptly)
        decrementWorkerCount();
        //接着從workers中刪除這個勞動力,和添加同樣,須要上鎖。而且記錄總的完成任務數
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    
    //嘗試終止這個線程池,上文提到幾種終止線程池的狀況,
    //這個方法就是判斷此刻線程池的狀態是否須要終止,
    //線程被回收有可能意味着要終止線程,因此此處須要判斷,詳細的後文再說
    tryTerminate();
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        //若是是線程是正常退出
        if (!completedAbruptly) {
            //獲取線程池容許的最小線程數
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
                //若是當前線程數不小於規定的最小值,就讓這個線程正常退出
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //若是這個勞動力不是正常退出,或者即便是正常退出,可是當前線程數小於規定的最小值,
        //就補充一個勞動力去處理隊列的任務
        addWorker(null, false);
    }
}

總結一下:

  1. 判斷線程池狀態,若是達到終止條件就終止這個線程池
  2. 無論這個線程是正常退出仍是非正常退出,若是當前線程數小於設定的最小值,就新建一個Worker,不然就正常回收。

2.5 拒絕任務

上文提到,咱們有四種拒絕任務的策略,如今咱們具體來看一看:

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

reject()方法內部調用的handler.rejectedExecution()方法根據handler的不一樣而具體實現也不相同。

​ 1)AbortPolicy:默認策略,直接拋出異常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
 }

​ 2)CallerRunsPolicy:調用execute(),嘗試將任務丟給線程池處理的線程自己處理任務

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

​ 3)DiscardOldestPolicy:將隊列頭部的舊任務丟掉,而後再次嘗試將新任務丟入線程池:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

​ 4)DiscardPolicy:忽略這個任務,空方法。

2.6 終止線程池

前文提到tryTerminate()方法是用於檢測線程池的當前狀態是否是達到終止條件,若是符合條件,就終止這個線程池。咱們先看下哪些方法調用了tryTerminate()

圖片描述

這些方法都有可能致使線程池的終止,如今,咱們來看下tryTerminate方法的具體實現:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //檢查線程池的狀態,若是線程池RUNING,或者是TIDYING,TERMINATED狀態
        //或者是SHUTDOWN狀態,可是隊列非空,就退出
        if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //若是符合終止的條件,可是線程池還有線程,因而打斷其中一個線程,返回
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //若是符合終止條件,而且線程池已經沒有線程池了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //先將狀態改成TIDYING,執行terminated方法(可被改寫,默認爲空),再將狀態改成TERMINATED
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

不管是shutdown()仍是shutdownNow()方法內部都是使用tryTerminate()來終止線程池,因此在此就再也不贅述。

相關文章
相關標籤/搜索