源碼學習第一篇(線程池)

你們面試過程當中確定被問道過線程池。爲何要使用線程池呢?由於在系統中頻繁建立線程會形成很大的CPU消耗。並且用完的線程要等待GC回收也會形成消耗。面試

下面咱們就來學習下最經常使用的線程池 ThreadPoolExecutor,緩存

首先先來看看它的構造方法:安全

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

參數說明:併發

corePoolSize:核心線程數,線程池維持的正常活躍(保障不會線程超時)的最小線程數,不容許爲0,除非設置容許線程超時等待(若是提交的任務數量大於這個參數時,提交的任務將被放入緩存隊列)。
maximumPoolSize:最大線程數,即爲線程池能夠容納的最大線程數。
keepAliveTime: 空閒的線程存活時間。當存在超過corePoolSize或設置了容許核心線程空閒超時的時候,線程將在等待這麼長時間後自動銷燬(默認以納秒爲單位)。
unit: 空閒的線程存活時間的單位。
workQueue: 等待隊列,注意這個隊列是阻塞的,當隊列中沒有任務時線程獲取任務將被阻塞,直到有任務時候被喚醒。
threadFactory: 線程工廠,工廠模式你們都懂的,若是有須要自定義線程,實現ThreadFactory接口從newThread方法返回你自定義的線程類就行了。
handler: 拒絕策略處理器,在線程池沒法接收新的工做時候會調用該處理器的拒絕方法拒絕掉一些任務。oop

線程池拒絕策略處理接口:RejectedExecutionHandler
實現類:
直接拒絕策略(不會報錯):DiscardPolicy
直接拒絕策略(將拋出一個RejectedExecutionException錯誤):AbortPolicy
讓放入的線程本身運行任務:CallerRunsPolicy
拋棄等待時間最長的任務:DiscardOldestPolicy學習

看完了這些參數後是否是仍是一頭霧水?哈哈,其實我也是這樣。要了解線程池,先要了解它的狀態。ui

    private static final int COUNT_BITS = Integer.SIZE - 3;
    //CAPACITY 值爲 1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
    // (1 << 29) -1 = 0001 1111 1111 1111 1111 1111 1111 1111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    //線程池狀態
    // -1 << 29 = 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

線程池狀態:
RUNNING:取30位到32位111表明運行中,能夠接收新的任務而且會處理等待隊列中的任務。
SHUTDOWN:取30位到32位000表明關閉中,不中斷正在執行的任務,可是會中斷等待中的線程。而後等待隊列中剩餘的任務處理完就關閉。
STOP: 取30位到32位001表明中止,不接受新任務也不處理等待隊列中的任務。而且會中斷正在運行的任務。
TIDYING: 清取30位到32位010表明理中,最後的掃尾工做,關閉中到結束的中間狀態,全部任務已經完成,工做線程數workerCount爲0,線程池將執行結束方法。
TERMINATED: 取30位到32位011表明結束,結束方法完成,線程池結束。
線程池狀態轉換:
RUNNING -> SHUTDOWN: 調用了shutdown()方法以後,線程池會開始拒絕接收新的任務,從運行中轉變爲關閉中的狀態。通常是在線程池完成工做須要銷燬時調用。
RUNNING -> STOP: 調用了shutdownNow()方法以後,線程池會拒絕接受任務並中斷因此正在執行的任務,從運行狀態轉入中止狀態。
SHUTDOWN -> STOP: 調用了shutdownNow()方法以後,線程池會拒絕接受任務並中斷因此正在執行的任務,從關閉中轉爲中止狀態。
SHUTDOWN -> TIDYING: 線程池自己處於中止中的狀態,隊列和線程池都爲空的時候。線程進入清理狀態。
STOP -> TIDYING: 當線程池爲空時,線程池從中止中狀態轉入清理狀態。
TIDYING -> TERMINATED: 當線程清理完成會回調 terminated() 方法,完成後線程正式結束。this

如今不明白這些狀態是幹什麼用的,能夠先放到一邊,接下來咱們要看一下線程池的幾個比較核心的方法。爲了讓代碼更容易理解,我在上面加入了註釋。spa

    // 獲取方法狀態, 取計數器的30-32位做爲線程池的狀態
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //取計數器的後29位做爲線程數
    private static int workerCountOf(int c)  { return c & CAPACITY; }


    //線程池執行方法
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException();
        }
        //獲取計數器的數值
        int c = ctl.get();

        /*
         * 若是運行的線程數少於corePoolSize核心線程數,就啓動一個新的線程來執行該任務。
         * addWorker方法將以原子方式檢查runState(運行狀態)和workerCount(工做中總線程數),
         * 若是運行狀態爲關閉或者工做中總線程數等於核心線程數返回false。
         */
        //workerCountOf 取計數器的後29位做爲線程數 和核心線程數進行對比
        if (workerCountOf(c) < corePoolSize) {
            //啓動一個新的線程來完成工做
            if (addWorker(command, true)){
                return;
            }
            //獲取計數器的最新數值
            c = ctl.get();
        }
        /*
         * isRunning 方法是經過原子的計數器獲取的數值來判斷運行狀態(注意若是是運行狀態前三位必定是111整個數值是個負數),
         * 若是運行狀態是運行狀態,就嘗試向等待隊列末尾插入一個任務。
         */
        if (isRunning(c) && workQueue.offer(command)) {
            //獲取計數器的最新數值
            int recheck = ctl.get();
            //再次判斷線程池狀態,若是這時候線程池進入其餘狀態,折刪除剛剛添加的任務,而且在remove中調用了中斷空閒線程的方法
            if (!isRunning(recheck) && remove(command)){
                //調用拒絕策略
                reject(command);
            }
            /*
             * 若是線程池還處於運行中 或 者線程處於關閉中,可是等待隊列任務刪除失敗。
             * 判斷工做線程的數量爲0的話,就建立一個空閒線程放入池中,讓他將隊列中的任務執行完後
             * 線程池在轉入清理狀態
             */
            else if (workerCountOf(recheck) == 0){
                addWorker(null, false);
            }
        }
        /*
         * 若是運行狀態爲關閉再次嘗試調用 addWorker 方法運行這個任務,這時addWorker 
         * 會返回false。這時候執行拒絕策略(reject 方法底層其實調用的拒絕策略的拒絕方法)。
         * 若是線程狀態爲運行中,可是由於等待隊列滿了,插入失敗,再次調用addWorker 嘗試運行
         * 任務,而且該方法參數 core爲false若線程數未超過 最大線程數會建立一個新的線程來運行任務
         * addWorker 方法下面有介紹
         */
        else if (!addWorker(command, false)){
            reject(command);
        }
    }

    //將任務加入工做中的方法,線程池的核心方法之一
    //參數 core 表明是否是比較核心線程數的意思。
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //獲取計數器的數值
            int c = ctl.get();
            //取30位-32位判斷線程狀態
            int rs = runStateOf(c);
            /*
             * 若是線程池處於 中止,清理中,結束 等狀態,直接返回false拒絕該任務
             * 若是線程處於 關閉中 且傳入的任務不爲空 也直接拒絕該任務。
             * 若是傳入的任務爲空,可是等待隊列爲空,證實沒任務了不要建立新的線程,直接返回結束該方法。
             */
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())){
                return false;
            }
            for (;;) {
            
                //獲取當前線程數
                int wc = workerCountOf(c);
                //判斷線程數是否超過極限容量, 若是沒超過 且core爲true 在看看是否超過核心線程數
                // 若是 且core爲false 則和最大線程數進行比較。 若是超過了就返回false拒絕該任務。
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)){
                    return false;
                }
                //修改原子計數器的線程數 若是修改爲功 就不必在無限循環了 直接斷開循環跳到最外層不在執行循環
                if (compareAndIncrementWorkerCount(c)){
                    break retry;
                }
                //若是上面的修改沒有成功 再次獲取原子計數器數值
                c = ctl.get();  // Re-read ctl
                //判斷當前的狀態十分和以前的同樣若是同樣繼續循環修改 若是不同則結束循環跳到最外層大循環,重新開始
                if (runStateOf(c) != rs){
                    continue retry;
                }
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //獲取主鎖,爲了保證在併發下添加任務的安全性
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //獲取到鎖後 重新獲取線程池運行狀態
                    int rs = runStateOf(ctl.get());
                    //若是線程處於運行狀態 或者 者線程處於關閉中,可是等待隊列任務刪除失敗。且線程池中線程已全被中斷
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        //判斷線程是不是活躍的
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //將新建的工做線程加入工做隊列
                        workers.add(w);
                        //獲取工做線程數量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //設置工做線程加入成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //若是工做線程加入成功啓動新的線程 並將線程狀態設置爲 true
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面順便提一句,爲何阿里規範不讓用 Executors.newSingleThreadExecutor(); 這樣的方法建立線程池。線程

咱們能夠看看他的源碼:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
LinkedBlockingQueue 這是一個阻塞無限隊列,也就是說這個隊列只要不是OOM了就能夠一直往裏面放。這樣會形成若是線程數到達核心線程數之後仍是處理不過來並不會繼續建立線程,而是會一直往隊列中塞任務,直到內存溢出。
相關文章
相關標籤/搜索