Java線程池ThreadPoolExecutor深度探索及源碼解析

咱們的程序裏,時常要使用多線程。所以多線程的管理變的尤其重要。ThreadPoolExecutor很好的解決了這一點。本篇文章主要從源碼入手,分析ThreadPoolExecutor的原理。算法

###1.標記和構造方法####編程

和不少狀態對象同樣,ThreadPoolExecutor也經過一個int的頭3位來記錄線程池的狀態,後面20多位來標記工做線程數量。而且提供通用的位運算接口來得到你所須要的數據。多線程

private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

咱們先來看下ThreadPoolExecutor的構造方法,這裏彷佛咱們又要老生常談了,網上已經有不少關於線程池各個參數的介紹了,這裏,非墨仍是會再說一遍,這樣加深一下你們的印象。oop

public ThreadPoolExecutor(int corePoolSize, //核心線程數量
                              int maximumPoolSize,//最大線程數量
                              long keepAliveTime,//存活時間
                              TimeUnit unit,//存活時間單位
                              BlockingQueue<Runnable> workQueue,//工做隊列
                              ThreadFactory threadFactory,//線程構造工廠
                              RejectedExecutionHandler handler//拒絕請求回調
) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

###2.執行流程####post

按照咱們熟知的線程池機制, 1.當請求被post到咱們的線程池中,咱們的線程池會先生成一個核心線程來執行它 2.當核心線程滿了的時候,將會把這個請求放入到咱們的工做請求隊列workQueue中。 3.若是你提供的隊列是一個有界隊列的時候,線程池將會判斷你的最大線程數是否超過你的核心線程。若是超過核心線程的話,線程池會生成新的線程去執行它。 4.若是這個時候,已經達到了最大線程數,那麼線程池將走到拒絕回調 5.若是線程池的最大線程數不大於核心線程數,而且工做隊列已滿,那麼將直接走拒絕回調ui

實際上這個流程已經在ThreadPoolExecutor.execute方法註釋中有詳細的說明。即使沒有說明,咱們也能夠從它的代碼流程簡單看出一些端倪:this

//code ThreadPoolExecutor.execute(Runnable)
        int c = ctl.get();//獲取當前運行線程數
        if (workerCountOf(c) < corePoolSize) {//若是當前線程數小於核心線程數
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//將請求命令加入到工做隊列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//加入到非核心線程中
            reject(command);//若是非核心線程沒有執行,那麼將走拒絕請求回調

###3.深刻源碼###線程

咱們以execute爲入口,深刻分析一下這個線程池的源碼。int c = ctl.get()方法咱們暫時不說,後面咱們將會補充,咱們暫且把它理解爲得到一個數量,而這個數量c將會傳入到workerCountOf方法中。這個方法名稱咱們就能知道其用意,就是爲了得到當前工做線程數量。code

private static int workerCountOf(int c)  { return c & CAPACITY; }

上文咱們說到,線程池會經過一個int的後幾位來記錄線程數量,而workerCountOf就是經過位運算來得到當前工做線程數。在得到當前線程數了之後,若是當前線程數小於 corePoolSize的話,將會經過addWorker方法把command加入到工做線程中。addWorker須要提供兩個參數,一個是你的command,另一個boolean量是爲了標識是不是往core線程中加。對象

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();//得到一個含有狀態和數量的值
            int rs = runStateOf(c);//得到當前線程池狀態
            ...
           for (;;) {//第二個for
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
   }

這裏,經過上面的代碼咱們能夠清晰的瞭解ctl變量的存在的目的: 1.首先,當從類型上看clt是一個原子類型,說明它是要支持多線程調用的 2.ctl裏面的值須要存儲兩個信息,一個是線程數量,一個是當前線程池的工做狀態。

這時候是否有讀者還在納悶,爲何個人線程數小於個人核心線程數,我往個人線程池裏加,仍是可能出現加不進去的狀況。事實上,「第二個for」循環已經很好的說明了這一點。由於線程池不能保證是同一個線程調用addWorker方法。線程池須要同步事後,才能保證是不是否往核心線程裏面加。這就是爲何在ThreadPoolExecutor.execute方法裏,在判斷完核心線程數量以後,若是失敗了,還要再取一次當前線程數的緣由。

if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();//再取一次
        }

好的,咱們繼續回到"第二個for"。咱們能夠看出,線程池在同步方面不只細化了粒度,並且用的是CAS算法。這種算法能夠勁量的避免因爲sync引發的線程阻塞。

for (;;) {//第二個for
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //當線程池數量超過核心線程的時候退出,返回false
                if (compareAndIncrementWorkerCount(c))
                    break retry;//當增長線程成功的時候,跳出循環
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;//狀態不一致繼續循環
                // else CAS failed due to workerCount change; retry inner loop
            }

因爲咱們如今只有一個線程在工做,不存在多線程競爭的狀況,所以咱們選擇跳出循環的邏輯。跳出循環之後,程序將真正意義上的生成一個Worker線程來執行指令。

//code private boolean addWorker(Runnable firstTask, boolean core)
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);//生成一個worker對象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//鎖定全局鎖
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());//檢查線程池狀態

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//將Worker線程歸入workers集合對象管理
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//從新賦值largestPoolSize變量
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

上面的代碼很是簡單,線程池將生成一個Worker的線程包裝類。不管是是不是核心線程,全部的線程都被歸入到workers集合對象進行管理。若是一切流程都正常workerAdded將爲true,Worker裏的線程將被啓動。啓動後Worker將執行線程的run方法,而在run方法中,又調用到Worker的runWorker(Worker)方法:

public void run() {
            runWorker(this);
        }

runworker是真正的線程執行流程的代碼段:

// code runworker
 try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//判斷當前線程池狀態,
                try {
                    beforeExecute(wt, task);//執行前切面
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//執行後切面
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }

runworker方法中引伸出來兩個方法beforeExecute和afterExecute。能夠經過繼承的方式來監控command的執行。至關於在command.run以前和以後切了兩個面,是一種面向方面的編程模式。當Task執行完成以後,因爲while循環,將再次執行while的判斷條件task = getTask()) != null; getTask方法是可能阻塞的,阻塞的時間是根據你在構造線程池的時候設置的超時時間來決定的。

private Runnable getTask() {
        boolean timedOut = false; //是否判斷超時

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //allowCoreThreadTimeOut變量用於控制是否讓核心線程也進行超時判斷
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?//經過timed變量來選擇使用poll方法仍是take方法
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;//若是poll獲取的r爲空,標記爲超時
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }


>getTask仍是一個循環操做,第一次執行的時候,會經過timed變量來判斷是否有超時檢查,若是有超時檢查的話將調用poll方法。若是poll在規定的時間內並無得到任何的執行對象,返回的r爲null,timedOut將被標記爲true。這時候,又再次進入循環。這時候,若是你是非核心線程,是擴展線程的話,那麼,if ((wc > maximumPoolSize || (timed && timedOut))這個判斷爲true,程序將返回一個null。
在runWorker方法中,若是getTask返回的對象爲null,runWorker將跳出while循環,執行finally語句:
finally {

            processWorkerExit(w, completedAbruptly);         }

>processWorkerExit方法須要傳遞兩個變量,第一個變量是Worker對象,第二個變量是completedAbruptly變量,這個變量是幹什麼用的呢?由於你的程序跳出可能存在兩種狀況,一種是正常跳出,一種是異常跳出,若是是異常跳出的話,這個時候你的workercount未必正常的執行decrement操做,所以經過這個變量來標記程序的執行狀態。

//code processWorkerExit final ReentrantLock mainLock = this.mainLock;         mainLock.lock();         try {             completedTaskCount += w.completedTasks;             workers.remove(w);         } finally {             mainLock.unlock();         }

mainLock是一個全局鎖,主要是爲了同步全局的workers變量。上面的代碼中,線程池將記錄一下task執行數據,而且將worker從workers隊列中刪除。
這個時候,基本上整個線程池的流程都已經概述完了,固然,咱們還確實一個變量,那就是RejectedExecutionHandler類型變量。這個得回到咱們的execute方法:

if (isRunning(c) && workQueue.offer(command)) {             int recheck = ctl.get();             if (! isRunning(recheck) && remove(command))                 reject(command);             else if (workerCountOf(recheck) == 0)                 addWorker(null, false);         }         else if (!addWorker(command, false))             reject(command);//拒絕請求

>當線程池拒絕請求的時候,將調用reject方法,而reject方法將會回調RejectedExecutionHandler的rejectedExecution方法:

final void reject(Runnable command) {         handler.rejectedExecution(command, this);     }

線程池提供一個默認的拒絕請求回調:

//code ThreadPoolExecutor private static final RejectedExecutionHandler defaultHandler =         new AbortPolicy(); //code AbortPolicy public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {             throw new RejectedExecutionException("Task " + r.toString() +                                                  " rejected from " +                                                  e.toString());         }

也就是採用異常的方式來拒絕請求。


這樣,ThreadPoolExecutor的主要源碼和結構已經分析完了,固然還有其餘的特性和功能須要看官們本身去探索。
                                                                                                        -----非墨
相關文章
相關標籤/搜索