Java 併發編程 --- 線程池(ThreadPoolExecutor)

建立線程的幾種方式

在開始解析線程池以前,先簡單的總結一下建立線程的幾種方式:java

  • 繼承Thread類
  • 實現Runnable接口
  • 使用Future和Callable
  • 藉助線程池

上面是建立一個線程的四種方式,在實際的開發中是推薦使用線程池來實現多線程的併發操做。ThreadPoolExecutor是線程池的核心實現類,用來執行被提交的任務。編程

線程池的主要處理流程

在開始分析以前,先看一下線程池的主要處理流程,以下圖:數組

上圖是線程池的處理流程,先從宏觀上,看一下處理流程,下面結合代碼看看具體是如何實現的,那麼開始分析:安全

數據結構

繼承關係

public class ThreadPoolExecutor extends AbstractExecutorService {}

類的定義很是的簡單,只是繼承了一個抽象類 AbstractExecutorService 。並無很好的反映出它的一個繼承關係, 不要着急,看一下下面的這張圖:數據結構

經過上圖能夠發現,Executor纔是站在金子塔頂端的那個, 由它來掌控全局, 固然Executor這個接口也是很是的高冷, 只定義了一個 void execute(Runnable command) 方法,將任務的提交與任務的執行進行了分離。ExecutorService接口繼承了Executor 接口,添加了一些帶返回的submit()方法和關閉方法shutdown()等。 AbstractExecutorService抽象類實現了ExecutorService()提供了一些方法的默認實現,ThreadPoolExector繼承了抽象類,並實現了Executor接口的execute()方法。多線程

基本屬性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是ThreadPoolThread類中很重要的一個字段,它是一個特殊的int類型變量。 而且是一個原子操做,能保證線程的安全性。 ctl 這個變量包含了兩部分的信息 , 簡寫表示 c:併發

  • workerCount:線程池內有效線程的數量 : 簡寫表示 wc
  • runState:線程池的運行狀態 ,running ,shutdown 等 ,簡寫表示爲 rs

它是怎麼作到一個變量包含兩個信息的,詳細分析一下:oop

一個int類型的變量用二進制表示是32 位的,其中高3位表示的是線程池的狀態,低29位表示的線程中有效線程的數量。 這設計厲害了,還能夠這麼玩。ui

runState是一個生命週期的控制狀態,值以下:this

  • RUNNING : 接受新任務並處理隊列任務 , 數值表示 -1 << 29
  • SHUTDOWN :不接受新任務並處理隊列任務 ,數值表示 0 << 29
  • STOP : 不接受新任務,不處理隊列任務,而且會中斷正在處理的線程任務,數值表示 1 << 29
  • TIDYING : 全部任務終止,workCount爲0,清理狀態 數值表示 2 << 29
  • TERMINATED :terminated()方法執行後執行該狀態 ,數值表示 3 << 29

上面的5個常量,按照從小到大的順序依次性進行排列。線程池的運行狀態要小於線程池的其它狀態。

上面說了ctl中包含了兩部分信息,下面的代碼是它們的具體實現:
先看將兩部分合並的方法:

//將runState和workerCount 合併到一個ctl中
//RUNNING的值爲 -1 << 29  , wc 爲 0 
//rs的二進制表示爲:     11100000000000000000000000000000
//wc二進制表示爲:        00000000000000000000000000000000
//                     --------------------------------  | 運算
//因此二者 | 運算後:值爲 11100000000000000000000000000000 
//rs填充ctl的高三位, wc填充ctl的低29 位  ,初始化wc的值爲零
private static int ctlOf(int rs, int wc) { return rs | wc; }

再看將ctl分解爲runState和workCount方法

//該方法用於取出runState的值

//00011111111111111111111111111111  CAPACITY的二進制表示

//11100000000000000000000000000000  ~ 按位取反操做,若是是1111 取反就是 0000
//11100000000000000000000000000000  c的二進制表示
//--------------------------------  運算
//11100000000000000000000000000000  返回結果
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//該方法用於取出workCount的值
//11100000000000000000000000000000
//00011111111111111111111111111111
//--------------------------------
//00000000000000000000000000000000   返回結果
private static int workerCountOf(int c)  { return c & CAPACITY; }

上面演示了在初始化線程池的時候,c , rs , wc 是如何運算的。若是沒有明白,請去看一下位移運算。

構造方法

ThreadPoolExecutor提供了四個構造方法,會涉及到幾個很是重要的參數,構造方法看下面的這一個就能夠了,由於其餘的三個構造方法,都是使用的this調用的下面這個方法。ThreadPoolExecutor的構造方法除了進行參數的合法性和賦值賦值操做外,並無其餘多餘的動做。

//corePoolSize : 核心線程池的實現大小
//maximumPoolSize : 最大線程池的實現大小
//keepAliveTime :線程活動保持時間,線程空閒超過這個時間就會終止
//unit : 線程活動保持時間的單位
//workQueue :用來暫時保存任務的工做隊列
//threadFactory:用於設置建立線程的工廠,能夠經過線程工廠給每一個線程設置有意義的名字
//handler : 當ThreadPoolExecutor已經關閉或已經飽和時,execute()方法將調用Handler
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        //判斷參數合法性,不合法拋出IllegalArgumentException異常
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        //隊列,線程工廠,handler爲空拋出空指針異常
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        //賦值操做
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}

上面只是簡單的介紹了一下參數的含義,下面具體說明一下:

  • corePoolSize:當提交一個任務時,線程池會建立一個線程來執行任務,即便有空閒線程也會建立一個新的,等到須要執行的任務數大於線程池基本大小時就再也不建立。若是調用了prestartAllCoreThreads()方法,線程池會提早建立並啓動全部基本的線程。

  • workQueue:用於保存等待執行任務的阻塞隊列

    • ArrayBlockingQueue:基於數組結構的有界阻塞隊列
    • LinkedBlockingQueue:基於鏈表結構的阻塞隊列,按照FIFO排序元素
    • SynchronousQueue:一個不存儲元素的阻塞隊列
    • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列
  • maximumPoolSize:線程池容許建立的最大線程數,若是隊列滿了,而且已經建立的線程數小於最大線程數,則線程池會建立新的線程去執行任務。

  • ThreadFactory:建立線程的工廠

  • RejectedExecutionHandler:當隊列和線程池都滿了,說明線程池已經處於飽和狀態,那麼必須採起一種策略來處理新提交的任務

    • AbortPolicy:直接拋出異常
    • CallerRunsPolicy:只用調用者所在的線程來運行任務
    • DiscardOldestPloicy:丟棄隊列裏最近的一個任務,並執行當前任務
    • DiscardPolicy:不處理,丟棄掉

向線程池提交任務

提交任務有兩個方法,一個是execute()一個是submit()方法,前一個不會返回值,後一個會返回值。submit()方法再也不本次解析的範圍內,感興趣的能夠自行研究簡單提一下,submit()方法在AbstractExecutorService抽象類中實現 ,藉助 FutureTask進行封裝,在execute() 中執行。

execute()方法
對於線程池,做爲使用方的咱們只能向線程池提交任務,而對於任務是否執行和何時執行,並不能控制。

public void execute(Runnable command) {
     //驗證參數合法性
     if (command == null)
         throw new NullPointerException();
   
     //獲取ctl的值
     int c = ctl.get();

     //若是當前線程池中線程的數量小於核心線程的數量
     if (workerCountOf(c) < corePoolSize) {
            //執行addWorker方法,
            //addWorker 將任務添加到隊列,並啓動,成功返回true,失敗返回false
            if (addWorker(command, true))
                //addWorker返回true, 添加成功,結束execute()方法運行
                return;
            c = ctl.get();
     }
     
     //線程池的運行狀態已經不是RUNNING
     //線程池的狀態是RUNNING,wc > corePoolSize,隊列未滿
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         // 非RUNNING狀態 則從workQueue中移除任務並拒絕
         if (! isRunning(recheck) && remove(command))
              reject(command);
         // 線程池處於RUNNING狀態 || 線程池處於非RUNNING狀態可是任務移除失敗
         else if (workerCountOf(recheck) == 0)
             // 這行代碼是爲了SHUTDOWN狀態下沒有活動線程了,可是隊列裏還有任務沒執行這種特殊狀況。
            // 添加一個null任務是由於SHUTDOWN狀態下,線程池再也不接受新任務
            //但要運行任務隊列中的任務
              addWorker(null, false);
      }
       
      //線程池非RUNNING狀態
      //隊列滿了,啓動新的線程也失敗了,採用拒絕策略
      else if (!addWorker(command, false))
         reject(command);
}

上面的註釋已經詳細的標註了if中判斷的條件,這裏在簡單的總結一下,execute()方法的添加策略,分爲幾種情形:

  • wc < corePoolSize , 建立一個新的線程,並提交任務
  • wc = corePoolSize , 任務隊列未滿,則添加到阻塞隊列
  • corePoolSize < wc < maximumPoolSize ,阻塞隊列已滿,嘗試建立一個新的進程來執行任務
  • wc >= maximunPoolSize ,阻塞隊列已滿,則採用拒絕隊列。

上面的四種情形也和文章開始的流程圖相契合,能夠在回頭看一下。

addworker() 也在線程池的調度邏輯中扮演了很重要的角色,下面來看一下,它具體都進行了哪些操做。

添加任務的方法邏輯可能有一點繞, 多看幾遍就能夠了。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //獲取ctl的值
            int c = ctl.get();
            //計算rs的值,用於判斷線程池當前的狀態
            int rs = runStateOf(c);

            //下面的if判斷兩種狀況
            //1.rs >= SHUTDOWN  ==> 線程池處於非運行狀態
            //2.線程池狀態爲SHUTDOWN,此時再也不接收新的任務,但還要處理隊列中的任務
            //  firstTask !=null 再也不接收新任務, 隊列爲空,任務已經處理完成。都直接返回。 
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))
                return false;
            //執行到此處,說明線程處在RUNNING狀態
            //或者 SHUTDOWN狀態,隊列中還有任務要執行
            for (;;) {
                //計算線程池中有效線程的數量
                int wc = workerCountOf(c);
                //判斷wc的數量是否符合規則
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //cas ,+1 操做 ,成功返回true,失敗返回false
                if (compareAndIncrementWorkerCount(c))
                    break retry;  //退出循環
                c = ctl.get();  // Re-read ctl
                //+1 操做失敗, 重新檢測線程池的狀態,繼續循環
                if (runStateOf(c) != rs)
                    continue retry; 
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //workCount +1 操做成功,執行下面的步驟

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker: 實現了Runnable的私有內部類,將任務進行封裝
            w = new Worker(firstTask);
            final Thread t = w.thread;  //w.thread 由Work構造方法初始化
            if (t != null) {
                //獲取全局鎖,併發訪問workers ,加鎖處理
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //RUNNING狀態,SHUTDOWN狀態清理隊列中的任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //將work添加到HashSet集合中。
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //啓動線程執行任務
                    t.start(); //執行的是ThreadPoolExecuteor的runWorker方法
                    workerStarted = true;
                }
            }
        } finally {
            //線程啓動失敗
            if (! workerStarted)
                //刪除添加的任務
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面的註釋已經很清楚,可能還有人以爲有些混亂,那麼在串一下這個處理流程:

線程池的狀態在不斷的發生變化,添加新的任務時,先判斷一下當前的線程狀態。
在調用addWorker()以前,判斷了此時的wc數量是小於< corePoolSize的。那麼也就說只要線程池狀態是RUNNING, 就能夠直接建立新的線程來執行提交的任務。可是仍是要排除,線程池是非RUNNING的這種狀況。if (rs >= SHUTDOWN && ! (rs == SHUTDOWN &&firstTask == null && !workQueue.isEmpty()))便有了這個判斷條件。 因此若是addWorker()方法沒有建立新線程也沒有提交新任務,直接返回false的話,只能是下列情形中的一種:

  • 線程池是STOP|TIDYING|TERMINATED 中的一種狀態
  • 線程池是SHUNDOWN狀態,但 firstTask != null
  • 線程池是SHUNDOWN狀態,但 workQueue 是空
  • 線程的有效線程數量大於CAPACITY|| 大於(corePoolSize或者maximumPoolSize)

上面的 大於(corePoolSize或者maximumPoolSize)取決於隊列是否已滿

t.start()是調用的runWorker方法,由於Worker的run()方法,調用的是ThreadPoolExecutor類中的RunWorker()方法,看一下RunWorker的具體實現。
runWorker()

final void runWorker(Worker w) {
    //獲取當前運行的線程
    Thread wt = Thread.currentThread();
    //獲取Worker中包裝的Runnable
    Runnable task = w.firstTask;
    w.firstTask = null;
    //容許中斷,由於在work的構造方法中抑制了中斷
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
         //task 就是 w.firstTask ,也就是包裝後的任務
         //task 不是null 則進入while循環中
         //若是 task == null , 則從隊列中取出一個任務執行
         //若是隊列爲空,則task仍是null ,不進入while循環。
         while (task != null || (task = getTask()) != null) {
            //加鎖
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
                    //知足上面的條件,則執行中斷操做
                    wt.interrupt();
                try {
                    //任務提交以前是否須要一些操做,交由子類實現
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();  //執行任務,執行Runnable中的run()方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //交給子類的實現方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    //修改task值,獲取新的任務執行
                    task = null;
                    //完成任務數+1
                    w.completedTasks++;
                    //釋放鎖
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //清理線程的後續
            processWorkerExit(w, completedAbruptly);
    }
}

看一下上面方法的核心代碼,while循環。若是傳入的對象不爲null將直接運行傳入的任務,傳入任務完成以後,task變爲null, 則調用getTask()方法來獲取隊列中的任務,若是隊列中一直有任務存在這又是一個死循環,while會一直循環下去,固然這只是一種極端的狀況,在getTask()方法有返回null的條件,當getTask()方法返回null,則退出runWorker()方法。

getTask()

private Runnable getTask() {
        //線程等待超時變量,默認爲false
        boolean timedOut = false; // Did the last poll() time out?
        //死循環
        for (;;) {
            //獲取ctl的值
            int c = ctl.get();
            //獲取線程池當前的狀態值
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //判斷線程池當前所處的狀態
            //線程的狀態不是RUNNING,隊列爲空。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //工做線程的數量-1 
                decrementWorkerCount();
                return null;
            }
            //工做線程的數量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //沒有超時,獲取一個任務
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

上面的getTask()方法內部是一個死循環,結束死循環的話能夠經過break或者return關鍵字。上面方法使用了return null 來結束任務獲取,分爲如下情形。

  • 線程池狀態 >= SHUTDOWN && (rs >= STOP 任務隊列爲空)
  • 線程獲取任務等待超時或者任務隊列爲空了
  • 成功取到了不爲null的任務

關閉線程池

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //線程池狀態設置爲SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中斷全部空閒線程,等待隊列任務執行完成
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

能夠經過調用線程池的shutdown或者shutdownNow方法來關閉線程池,它們的原理是遍歷池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。shutdownNow首先將線程池的狀態設置爲STOP,而後嘗試中止全部正在執行的或者暫停的任務並返回等待執行的任務列表。而shutdown只是將線程池的狀態設置成SHUTDOWN,而後中斷全部沒有在執行的任務線程。

參考:
書籍:《JAVA併發編程藝術》
博客: http://blog.csdn.net/clevergump/article/details/50688008

相關文章
相關標籤/搜索