java線程池源碼白話分析

1.計算機的基礎知識

位邏輯運算符:
&:
位與運算符,只有兩個操做數都是true,結果纔是true。
|:
位或運算符,只有兩個操做數都是false,結果纔是false。
~:
位非運算符:若是位爲0,結果是1,若是位爲1,結果是0.
^:
位異或運算:兩個數轉爲二進制,而後從高位開始比較,若是相同則爲0,不相同則爲1。
位移運算:
<<:
無符號左移
>>:
無符號右移
>>>:帶符號右移(沒有帶符號左移這種操做)
二進制:
二進制都是以補碼的形式表示的
正數的原碼,反碼,補碼都同樣;
要獲得負數的補碼,必須先求負數的反碼,負數的反碼;負數的反碼按位1改爲0,0改爲1;負數的補碼等於反碼+1

2.ThreadPoolExecutor簡單示例

public class ThreadPoolExecutorTest {

    public static void main(String[] args) {
        BlockingQueue b = new ArrayBlockingQueue(100);

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 500000, TimeUnit.SECONDS,
                b, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

            }
        });
        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(111111);
        });

        threadPoolExecutor.execute(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(2222222);
        });

    }
}

3.ThreadPoolExecutor屬性分析

public class ThreadPoolExecutor extends AbstractExecutorService {

    //用於保存線程運行狀態和當前線程池線程運行的數量
    //高3位用於表明線程的運行狀態,低29位表明線程池的線程最大數量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //32-3=29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //00011111111111111111111111111111,高3位爲0,低29位爲1,表明線程池的線程最大數量
    //參與運算用於獲得線程的運行狀態和線程池線程的數量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    //線程池運行的狀態,後面單獨分析
    //線程池處於運行狀態,11100000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
    //線程池處於shutdown狀態,00000000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //線程池處於結束狀態,00100000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    //線程池運行任務爲空的時候的狀態,01000000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    //線程處於完全終止的狀態,01100000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;
 
    //獲取線程池運行狀態,c表明ctl值
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //獲取線程池工做線程數量,c表明ctl值
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //獲取ctl值,rs表明線程的運行狀態,wc表明線程池工做線程數量,造成一個32位二進制數
    //高三位表明線程池運行狀態,低29位表明線程池工做線程數量
    private static int ctlOf(int rs, int wc) { return rs | wc; }

    //傳入ctl值和線程某個運行狀態,比較ctl值是否小於傳入的線程的某個運行狀態
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    //傳入ctl值和線程運行狀態,比較ctl值是否大於傳入的線程的某個運行狀態
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    //判斷線程運行狀態是不是運行狀態,由於RUNNING=-1是最小的狀態值
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    //經過CAS操做將工做線程數+1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    //經過CAS操做將工做線程數-1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    //do-while循環能夠強制讓工做線程數-1
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    //線程池的工做隊列,在構造方法中初始化
    private final BlockingQueue<Runnable> workQueue;
    
    private final ReentrantLock mainLock = new ReentrantLock();

    //保存worker的池子
    private final HashSet<Worker> workers = new HashSet<Worker>();

    private final Condition termination = mainLock.newCondition();

    //記錄線程池生命週期中,線程池運行的線程的最大數量
    private int largestPoolSize;

    //線程池完成任務數量
    private long completedTaskCount;

    //建立線程工廠
    private volatile ThreadFactory threadFactory;

    //線程中斷策列
    private volatile RejectedExecutionHandler handler;

    //在指定時間單位下,線程存活時間
    private volatile long keepAliveTime;

    //核心線程數
    private volatile int corePoolSize;

    //最大線程數
    private volatile int maximumPoolSize;

    //線程池滿了後的中斷策列
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

    private final AccessControlContext acc;
    
    //當從工做隊列中取不到任務時的時候,是否須要回收核心線程
    private volatile boolean allowCoreThreadTimeOut;
    
}    

4.ThreadPoolExecutor構造方法分析

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //會檢查參數,不符合條件拋出異常
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    //工做隊列,線程工廠,拒絕策列不能爲空
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

5.線程池建立線程順序分析

當咱們執行execute方法提交一個線程的時候,首先會判斷當前線程池線程數是否超過核心線程數corePoolSize,如果沒有超過,則建立新線程,如果超過,則嘗試將Runnable提交到工做隊列workQueue中。若是工做隊列workQueue沒有超過容量,則Runnable提交到工做隊列中,若是超過了workQueue的容量,則嘗試建立線程。若是此時建立的線程小於最大線程數maximumPoolSize,則建立線程,若是超過了maximumPoolSize,則執行拒絕策列。面試

6.ThreadPoolExecutor.execute方法分析

public void execute(Runnable command) {
    //若是runnable爲空,拋出異常
    if (command == null)
        throw new NullPointerException();
 //獲取ctl值,該值高3位表明線程池運行狀態,低29位表明線程池當前運行線程數量
    int c = ctl.get();
    //CASE1:獲取線程池運行線程數量,若是小於核心線程數,則建立線程,addWorker傳入參數爲core
    //也就是說,線程池不是一上來就把核心線程建立了,必須在提交runnable任務到線程池的時候才一個一個建立
    if (workerCountOf(c) < corePoolSize) {
        //addWorker是建立線程的核心方法,關鍵點在Worker類的構造方法和runWorker方法的while循環
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //CASE2:條件1不成立說明核心線程數已滿,將任務添加到阻塞隊列中。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //CASE3:條件1和2都不成立,說明核心線程已建立徹底而且任務隊列已滿
    //調用addWorker建立非核心線程,若是返回false,說明線程數達到最大線程數,執行拒絕策列
    else if (!addWorker(command, false))
        reject(command);
}

6.1.Worker類分析

每一個線程的建立都離不開Worker類,該類徹底包裝了線程運行的所須要的屬性,並提供了初始化線程和從阻塞隊列中獲取被阻塞的線程並執行的一系列方法。spring

觀察該類代碼發現,該類繼承了AbstractQueuedSynchronizer並實現了Runnable接口,在建立線程的時候,實際Thread類的構造方法包裝的就是Worker類本身(咱們知道通常Runnable須要被傳入到Thread裏面的,如:Thread t = new Thread(runnable), t.start()啓動線程)。而從execute方法傳過來的Runnable實現只是被封裝到了firstTask中,建立出來的Thread在執行的時候,調用的start方法也只是啓動了該類的runWorker方法,而真正封裝咱們執行邏輯的firstTask這個Runnable類在後續調用中也只是執行本身的run方法而已,並再也不被Thread封裝。安全

worker爲何要繼承AbstractQueuedSynchronizer呢?ide

由於在runWork的方法內,在調用firstTask處理業務邏輯前,會給代碼加上獨佔鎖,加這個獨佔鎖的目的是什麼呢?由於咱們在調用shutDown方法的時候,並不會終止處於運行中的線程。shutDown方法會根據獨佔鎖來判斷當前worker是否正在工做。學習

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

 //須要建立的線程,該線程封裝的Runnable是本身
    final Thread thread;

    //execute傳入的Runnable,該runnable並不被Thread包裝,後續調用本身的run方法。
    Runnable firstTask;

    volatile long completedTasks;

    Worker(Runnable firstTask) {
        //設置線程池處於運行狀態
        setState(-1); 
        //封裝execute傳入進來的包含實際邏輯的Runnable
        this.firstTask = firstTask;
        //建立一個線程,這裏注意,線程封裝的Runnable是本身
        //示例使用Executors.defaultThreadFactory()
        this.thread = getThreadFactory().newThread(this);
    }
    //被thread屬性封裝後調用start方法後,會自動啓動該run方法,執行後續邏輯
    //後續邏輯會調用firstTask.run()方法啓動實際業務邏輯
    public void run() {
        runWorker(this);
    }

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

6.2.addWorker方法分析

addWorker是建立線程的核心方法,一個worker表明一個線程,而workers這個全局變量能夠表明線程池,只有向workers裏面添加worker成功的時候,才能表明建立線程成功了。addWorker在執行過程當中,會根據線程池狀態和線程池數量判斷是否能建立線程,建立線程成功會將記錄線程池狀態和數量的ctl值+1,並將worker加入到workers裏面,更新線程池生命週期內線程池線程的最大數量,而後啓動線程執行任務。ui

addWorker的core參數表明是不是在建立核心線程,core爲true表明建立核心線程,false表明阻塞隊列已滿,建立非核心線程。this

返回值: true表明建立線程並啓動成功,false表明建立線程失敗。spa

何時返回false呢?
CASE1.線程池狀態rs>SHUTDOWN;
CASE2.線程池狀態爲SHUTDOWN的時候,阻塞隊列沒有任務了,爲空;
CASE3.線程池狀態爲SHUTDOWN的時候,execute提交的Runnable(被封裝到firstTask裏面)不爲空;
CASE4.若是是建立核心線程,此時已經超過核心線程數;若是是建立非核心線程,此時已經超過最大線程數;
CASE5.ThreadFactory建立線程爲空,這裏通常是咱們自定義線程工廠的時候出的問題;
private boolean addWorker(Runnable firstTask, boolean core) {
    //retry代碼除了檢查是否能建立線程外,還負責將ctl值+1,若是不能建立線程,則返回false;
    retry:
    for (;;) {
        //獲取當前ctl值
        int c = ctl.get();
        //獲取當前線程池運行狀態
        int rs = runStateOf(c);
  //CASE1.線程池狀態rs>SHUTDOWN;返回false;
        //CASE2.線程池狀態爲SHUTDOWN的時候,阻塞隊列沒有任務了,爲空;
  //CASE3.線程池狀態爲SHUTDOWN的時候,execute提交的Runnable(被封裝到firstTask裏面)不爲空;
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //循環判斷是否能將ctl+1設置成功
        for (;;) {
            //獲取當前運行中的線程數量
            int wc = workerCountOf(c);
            //條件1:wc >= CAPACITY基本不可能,CAPACITY爲理論上的最大線程數,一個5億級的數字
            //CASE4.根據core參數,若是是建立核心線程,此時已經超過核心線程數,則返回false
            //若是是建立非核心線程,此時已經超過最大線程數,則返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //條件成立,說明給ctl修改+1成功了,表明給線程數+1設置成功了,能夠退出循環建立線程了
            //若是在執行這段代碼的時候,線程池狀態正巧被改了,這裏也會失敗,由於ctl的高3位表明的是線程狀態
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //若是上面設置線程數+1失敗,則實時獲取線程狀態並和當前的比較
            c = ctl.get();  
            //狀態被改變了跳到retry再次判斷是否容許建立線程
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
 //代碼走到這裏表明已經容許建立線程了
    
    //表示建立的worker是否已經啓動,啓動也表明線程建立成功了
    boolean workerStarted = false;
    //添加worker到worker隊列是否成功的狀態
    boolean workerAdded = false;
    //局部變量
    Worker w = null;
    try {
        //構建work對象
        w = new Worker(firstTask);
        //獲取worker的構造方法建立的線程
        final Thread t = w.thread;
        //這裏加了這段判斷是爲了防止本身實現的TreadFactory有bug致使建立線程失敗
        if (t != null) {
            //向works這個hashset裏面添加works的時候,須要全局加鎖,如下代碼線程並不安全
            //該段代碼個人理解就是爲了維護largestPoolSize這個值,記錄線程池生命週期中,
            //線程池運行的線程的最大數量
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //獲取當前線程池狀態
                int rs = runStateOf(ctl.get());
    //檢查線程池狀態必須是RUNNING或者處於SHUTDOWN的時候,並無提交具體的任務
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //防止人爲定義線程工廠建立線程並啓動了start方法的狀況
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //向線程池添加worker
                    workers.add(w);
                    //獲取線程池線程數量
                    int s = workers.size();
                    //若是線程池線程數量>記錄的值,更新線程池生命週期內最大線程記錄數
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //表示向線程池中添加線程成功了
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //啓動線程,該方法實際會啓動worker類的run方法,而後執行runWorker方法
                t.start();
                //設置線程啓動狀態爲成功
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            //後面再分析
            addWorkerFailed(w);
    }
    return workerStarted;
}

6.3.runWorker方法分析

addWorker方法建立線程成功,Worker類的Thread會調用start方法啓動本身的run方法,由於Worker類實現了Runnable接口,run方法裏面調用了runWorker方法。實際咱們execute方法傳入的Runnable被封裝到了Worker類的firstTask屬性裏面。而後在runWorker裏面調用run方法啓動具體的邏輯,注意這裏並沒用再用Thrad封裝Runnable了。線程啓動後,會一直運行While循環,循環第一次運行本身傳入的Runnable,第二次及以後則經過getTask方法從任務隊列種獲取具體的Runnable任務了。一旦While循環內發生異常或者getTask返回空,則會調用processWorkerExit執行線程銷燬邏輯。getTask方法獲取不到具體任務的線程均可被認爲是空閒線程。線程

final void runWorker(Worker w) {
    //wt=w.thread
    Thread wt = Thread.currentThread();
    //execute實際傳入的包含業務邏輯的Runnable,該Runnable再也不被Thread包裝,調用本身的run方法
    Runnable task = w.firstTask;
    //引用設置爲null,幫助gc
    w.firstTask = null;
    //先調用unlock方法設置當前獨佔線程爲空,線程運行狀態爲0
    w.unlock();
    //線程退出狀態,true表明線程由於異常退出了
    boolean completedAbruptly = true;
    try {
        //線程被建立並啓動後就一直執行while循環,直到發生異常或者退出
        //條件1:task != null,線程初創task不爲空
        //條件2:條件1不成立說明線程非初創而且核心線程數已滿,說明已經建立好線程,從隊列中取task任務
        while (task != null || (task = getTask()) != null) {
            //設置獨佔鎖,由於shutDown方法調用的時候不會馬上終止運行中的線程,
            //會根據是否持有獨佔鎖來判斷當前worker是否處於運行狀態
            w.lock();
            //線程池處於STOP/TIDYING/TERMINATION且當前線程沒有設置中斷狀態
            //給當前線程設一箇中斷狀態
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //鉤子方法,留給子類實現,在執行實際業務代碼以前
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //調用實際業務方法的邏輯
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //鉤子方法,留給子類實現,在執行實際業務代碼以後
                    afterExecute(task, thrown);
                }
            } finally {
                //將task設置爲空
                task = null;
                //每一個worker完成任務數量+1
                w.completedTasks++;
                //釋放掉鎖,正常狀況下會回到while循環繼續執行getTask,發生異常會執行下面的finally
                //getTask爲空也會退出while循環
                w.unlock();
            }
        }
        //若是getTask()返回空的時候,執行退出邏輯
        completedAbruptly = false;
    } finally {
        //線程退出邏輯
        //completedAbruptly=true由於異常退出
        //completedAbruptly=false正常退出
        processWorkerExit(w, completedAbruptly);
    }
}

6.4.getTask方法分析

當getTask返回空的時候,線程能夠執行銷燬邏輯了。netty

getTask何時返回空?

1.線程池處於SHUTDOWN狀態,工做隊列爲空的時候;
2.線程池處於STOP狀態以上的時候,將線程池線程數-1並返回空;
3.當工做隊列爲空的時候;

注意,線程池的allowCoreThreadTimeOut屬性會影響getTask方法,致使getTask方法一直阻塞在workQueue.take()這裏的,這樣就不會銷燬線程。

1.allowCoreThreadTimeOut=true,使用非阻塞方法從隊列獲取任務
2.allowCoreThreadTimeOut=false,線程池線程數還未達到核心線程數上限,使用阻塞方法獲取任務,這樣就可使得核心線程不會被銷燬,getTask方法一直阻塞等待獲取隊列種的任務。
3.allowCoreThreadTimeOut=false,線程池線程數達到核心線程數,使用非阻塞方法獲取任務
private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        //成立1:線程池處於SHUTDOWN狀態且工做隊列爲空的時候
        //成立2:線程池處於STOP狀態以上的時候,將線程池線程數-1並返回空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        //獲取線程池線程數
        int wc = workerCountOf(c);

        //1.allowCoreThreadTimeOut=true,使用非阻塞方法從隊列獲取任務
  //2.allowCoreThreadTimeOut=false,線程池線程數還未達到核心線程數,使用阻塞方法獲取任務,
        //3.allowCoreThreadTimeOut=false,線程池線程數達到核心線程數,使用非阻塞方法獲取任務
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //條件1-1:wc > maximumPoolSize,可能最大線程數設置的比核心線程數小,此時沒有空閒線程能夠接手
        //條件1-2:timed && timedOut的timedOut在poll(keepAliveTime, TimeUnit.NANOSECONDS)
        //方法獲取超時的時候,循環第二次執行的時候纔可能致使條件爲true
        //條件2:線程池還有其餘線程,工做隊列爲空返回true
        //以上1和2條件成立了,任務返回爲空。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //嘗試將線程數-1,並不必定會成功,可能被其餘線程改過,失敗則繼續循環嘗試-1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }


        try {
            //當timed=true的時候,使用poll獲取超時候致使r=null的時候,timedOut=true,
            //再次執行循環
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();//阻塞方式從隊列獲取,防止線程線程執行銷燬邏輯
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

6.5.processWorkerExit方法分析

該方法用於執行線程銷燬的邏輯。

1.加鎖,從workers隊列中移除一個worker,將線程池完成任務總數量+1, 釋放鎖;
2.判斷是否須要關閉線程池;
3.若是線程正常完成並退出,根據allowCoreThreadTimeOut判斷是否須要回收核心線程,
若allowCoreThreadTimeOut=true,只須要保證線程池最少有一個線程便可,也就是說超過1的空閒線程必定會被銷燬。
若allowCoreThreadTimeOut=false,在線程數未達到核心線程數上限的狀況下,因爲getTask方法的阻塞,不會執行線程銷燬的邏輯;當線程數達到核心線程數上限的狀況,且隊列也達到上限數,這以後建立的任何線程在getTask方法獲取不到具體任務的狀況下都會銷燬
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //由於異常退出,線程運行數-1
    if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //取出每一個worker執行的任務數量,並彙總到全局的任務執行數量中
        completedTaskCount += w.completedTasks;
        //將worker從池中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //嘗試關閉線程池並處理空閒線程
    tryTerminate();

    //獲取線程ctl值
    int c = ctl.get();
    //若是線程池當前狀態小於STOP狀態,說明線程池處於RUNNING,SHUTDOWN狀態
    if (runStateLessThan(c, STOP)) {
        //若是線程池是正常退出返回false走下面的流程
        if (!completedAbruptly) {
            //allowCoreThreadTimeOut表示是否容許核心線程銷燬
            //min表示線程池容許的最小線程數,最少爲1
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //條件1:min==0說明容許核心線程銷燬
            //條件2:工做隊列不爲空
            if (min == 0 && ! workQueue.isEmpty())
                //設置線程池最小線程數
                min = 1;
            //若是當前線程池線程數大於min的值,返回,這裏無論min是核心線程數仍是1
            //也就是說,超過核心線程的線程數在getTask方法從隊列取不到的時候必定會回收
            //而核心線程是否回收會根據allowCoreThreadTimeOut屬性來判斷
            if (workerCountOf(c) >= min)
                return; 
        }
        //上面從workers池中刪除了一個worker,這裏添加進去一個空任務的worker
        //核心線程數=0的狀況會執行到這裏,會維持核心線程數最少爲1
        addWorker(null, false);
    }
}

6.6.tryTerminate方法分析

嘗試關閉線程池方法並處理空閒線程,interruptIdleWorkers方法處理空閒線程,設置中斷狀態。每一個線程退出都會單獨調用該方法。

final void tryTerminate() {
    //自旋
    for (;;) {
        //獲取線程ctl值
        int c = ctl.get();
        //條件1:線程池處於RUNNING狀態說明線程池當前正常,直接返回
        //條件2:runStateAtLeast(c, TIDYING)說明已經有線程使得線程池由TIDYING -> TERMINATED狀態
        //轉換了,當前線程直接返回
        //條件3:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
        //說明線程池雖然處於SHUTDOWN狀態,但工做隊列不爲空,得等隊列處理完再嘗試關閉線程池的邏輯。
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //能走到這裏兩種狀況
        //1.線程池狀態>=STOP了
        //2.線程池狀態於SHUTDOWN狀態,但隊列爲空了
        if (workerCountOf(c) != 0) {
            //回收空閒的線程,由於執行runworer方法的時候worker會加鎖,因此沒加鎖的都是空閒的
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        //workerCountOf(c) == 0 時,會來到這裏,說明線程都已經銷燬了
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //設置線程池狀態爲TIDYING狀態。
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //鉤子方法,等用戶實現
                    terminated();
                } finally {
                    //鉤子方法設置線程池狀態爲TERMINATED狀態
                    ctl.set(ctlOf(TERMINATED, 0));
                    //喚醒調用 awaitTermination() 方法的線程。
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

6.7.interruptIdleWorkers方法分析

處理一個空閒線程方法。全部處於執行中的線程都會加鎖(w.lock())。上面咱們提過,核心線程被take方法阻塞的時候,咱們這裏設置線程t.interrupt(), 會解除take的阻塞。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //線程沒有中斷且嘗試加鎖成功,由於全部處於執行中的線程都會加鎖(w.lock())
            //未加鎖的說明處於空閒中了。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //設置線程中斷
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

6.8.awaitTermination方法分析

該方法是判斷線程池狀態狀態是不是TERMINATED,若是是則直接返回true,不然會await掛起當前線程指定的時間

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

6.9.shutDown和shutDownNow方法分析

shutDown方法會優雅的關閉線程池,設置線程池狀態爲SHUTDOWN,已經處於隊列中的任務會繼續等待執行完。

shutDownNow方法會當即關閉線程池,設置線程池狀態爲STOP。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    //獲取線程池全局鎖
    mainLock.lock();
    try {
        checkShutdownAccess();
        //設置線程池狀態爲SHUTDOWN
        advanceRunState(SHUTDOWN);
        //中斷空閒線程
        interruptIdleWorkers();
        //空方法,子類能夠擴展
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        //釋放線程池全局鎖
        mainLock.unlock();
    }
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    //返回值引用
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    //獲取線程池全局鎖
    mainLock.lock();
    try {
        checkShutdownAccess();
        //設置線程池狀態爲STOP
        advanceRunState(STOP);
        //中斷線程池中全部線程
        interruptWorkers();
        //導出未處理的task
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }

    tryTerminate();
    //返回當前任務隊列中 未處理的任務。
    return tasks;
}

7.ThreadPoolExecutor拒絕策列

默認有如下4中拒絕策列,用戶也能夠實現RejectedExecutionHandler接口自定義。

CallerRunsPolicy將任務交給調用者執行
AbortPolicy拋出異常
DiscardPolicy什麼都不作,直接丟棄
DiscardOldestPolicy丟棄老的,執行新的
public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    //交給主線程執行
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }

    //中斷拒絕
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

public static class DiscardPolicy implements RejectedExecutionHandler {

    public DiscardPolicy() { }

    //直接拋棄什麼都不作
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

    public DiscardOldestPolicy() { }

 //丟棄老的 ,執行新的
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

8.擴展:改變線程池的初始化過程

若是咱們想讓線程按核心線程,最大線程,最後再進隊列的方式初始化,應該怎麼作?

public void execute(Runnable command) {
    //若是runnable爲空,拋出異常
    if (command == null)
        throw new NullPointerException();
 //獲取ctl值,該值高3位表明線程池運行狀態,低29位表明線程池當前運行線程數量
    int c = ctl.get();
    //CASE1:獲取線程池運行線程數量,若是小於核心線程數,則建立線程,addWorker傳入參數爲core
    //也就是說,線程池不是一上來就把核心線程建立了,必須在提交runnable任務到線程池的時候才一個一個建立
    if (workerCountOf(c) < corePoolSize) {
        //addWorker是建立線程的核心方法,關鍵點在Worker類的構造方法和runWorker方法的while循環
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //CASE2:條件1不成立說明核心線程數已滿,將任務添加到阻塞隊列中。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //CASE3:條件1和2都不成立,說明核心線程已建立徹底而且任務隊列已滿
    //調用addWorker建立非核心線程,若是返回false,說明線程數達到最大線程數,執行拒絕策列
    else if (!addWorker(command, false))
        reject(command);
}

咱們在說execute方法初始化線程池過程當中。CASE2:workQueue.offer(command)會將任務加入到隊列。因此,咱們這裏只須要自定義BlockingQueue,改造offer方法,在裏面判斷,當線程池線程數還未達到最大線程數的時候返回false便可。

DubboEagerThreadPool自定義了一個BlockingQueue,在offer()方法中,若是當前線程池數量小於最大線程池時,直接返回false,這裏就達到了調節線程池執行順序的目的。

9.推薦

分享一個朋友的公衆號,有不少乾貨,包含netty,spring,線程,spring cloud等詳細講解,也有詳細的學習規劃圖,面試題整理等,我感受在講課這塊比我講的清楚:

相關文章
相關標籤/搜索