Executor線程池原理詳解

線程池

    線程池的目的就是減小多線程建立的開銷,減小資源的消耗,讓系統更加的穩定。在web開發中,服務器會爲了一個請求分配一個線程來處理,若是每次請求都建立一個線程,請求結束就銷燬這個線程。那麼在高併發的狀況下,就會有大量線程建立和銷燬,這就會下降系統的效率。線程池的誕生就是爲了讓線程獲得重複使用,減小了線程建立和銷燬的開銷,減小了線程的建立和銷燬天然的就提升了系統的響應速度,與此同時還提升了線程的管理性,使線程能夠獲得統一的分配,監控和調優。java

   線程建立和銷燬爲何會有開銷呢,由於咱們java運行的線程是依賴於計算機內核的核心線程的。java建立的線程是用戶層的線程,要依賴於線程調度去是用內核層的線程來執行,在執行銷燬的時候會經過TSS在用戶層和核心層的切換,這個切換就是很大的一筆開銷。具體結構以下圖:web

 

 

 

線程實現方式

線程主要經過實現Runnable或者Callable接口來實現.Runnable與Callable的區別在於後者有返回值,可是前者沒有返回值。數組

public interface Runnable {
    public abstract void run(); } publuic interface Callable<V>{ V call() throws Exception; }

下面咱們來看一下測試代碼:服務器

package com.test.excutor;

import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class ExcutorTest { public static void main(String[] args) { Thread t =new Thread( new RunTask()); t.start(); FutureTask<Object> ft=new FutureTask<Object>(new CallTask()); Thread f=new Thread(ft); f.start(); try { System.out.println("callTask output:"+(String)ft.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block  e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block  e.printStackTrace(); } } } class RunTask implements Runnable{ @Override public void run() { System.out.println(" RunTask Thread Name is "+Thread.currentThread().getName()); } } class CallTask implements Callable<Object>{ @Override public Object call() throws Exception { TimeUnit.SECONDS.sleep(1); //System.out.println("This is callTask"); return "callTask answer"; } }
//運行結果:

RunTask Thread Name is Thread-0
callTask output:callTask answer多線程

 

何時使用線程池:

一、單個任務處理時間比較短併發

二、須要處理的任務數量很大框架

Executor框架

Executor接口是Executor框架的一個最基本的接口,Executor框架的大部分類都直接或間接地實現了此接口。ide

它只有一個方法函數

void execute(Runnable command): 在將來某個時間執行給定的命令。該命令可能在新的線程、已入池的線程或者正調用的線程中執行,這由 Executor 實現決定。高併發

如下是框架圖:

 

 從以上圖中能夠看出ExecutorService就是繼承了Executor接口的一個重要接口類。在這個接口類中定義了線程池的具體行爲:

一、execute(Runnable command):履行Ruannable類型的任務,
二、submit(task):可用來提交Callable或Runnable任務,並返回表明此任務的Future對象
三、shutdown():在完成已提交的任務後封閉辦事,再也不接管新任務,
四、shutdownNow():中止全部正在履行的任務並封閉辦事。
五、isTerminated():測試是否全部任務都履行完畢了。
六、isShutdown():測試是否該ExecutorService已被關閉。 
七、awaitTermination(long timeout, TimeUnit unit):阻塞,直到關閉後全部任務都已完成執行。請求,或發生超時,或當前線程中斷,以先發生者爲準。
八、submit(Callable<T> task):提交一個返回值的任務用於執行,返回一個表示任務的未決結果的 Future。 該 Future 的 get 方法在成功完成時將會返回該任務的結果。若是想當即阻塞任務的等待,則可使用 result = exec.submit(aCallable).get(); 形式的構造。
九、submit(Runnable task, T result):提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功完成時將會返回給定的結果。
十、submit(Runnable task):提交一個 Runnable 任務用於執行,並返回一個表示該任務的 Future。該 Future 的 get 方法在成功 完成時將會返回 null。

十一、invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException:執行給定的任務,當全部任務完成時,返回保持任務狀態和結果的 Future 列表。返回列表的全部元素的 Future.isDone() 爲 true。注意,能夠正常地或經過拋出異常來終止已完成 任務。若是正在進行此操做時修改了給定的 collection,則此方法的結果是不肯定的。

十二、invokeAll(Collection<? extends Callable<T>> tasks, long timeout,TimeUnit unit) throws InterruptedException:超時等待,同上。

1三、invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException,ExecutionException:與 invokeAll的區別是,任務列表裏只要有一個任務完成了,就當即返回。並且一旦正常或異常返回後,則取消還沒有完成的任務。

1四、invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) throws InterruptedException: 超時等待,同上

線程池的建立

 線程池的建立是經過Executors來建立的。好比說你須要建立一個固定大小的線程池咱們可使用Executors.newFixedThreadPool(n)來實現。固然還有不少其餘的方法

咱們來看一下:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
//只容許一個線程執行 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } //能夠無限的建立線程,會把建立的線程放在一個特殊的隊列中去排隊 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
//定時線程池 
public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
 
 

 

 
 
newFixedThreadPool(int nThreads)方法中的幾個參數以下:
nThreads:核心線程數量
nThreads:最大線程池的數量 0L: 若是線程已經工做完畢,沒有任務調用,該線程最大的存活時間
TimeUnit.MILLISECONDS:計時時間,單位爲ms new LinkedBlockingQueue<Runnable>():當核心線程全都在工做,沒有空閒,此時會將多餘的線程放到阻塞隊列中排隊,當核心線程執行完成之後再從阻塞隊列當中拿出來繼續執行。
下面咱們來跑一段代碼:
public class ExecutorTest {    
    public static void main(String[] args) { Thread t =new Thread( new RunTask()); t.start(); FutureTask<Object> ft=new FutureTask<Object>(new CallTask()); Thread f=new Thread(ft); f.start(); try { System.out.println("callTask output:"+(String)ft.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block  e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block  e.printStackTrace(); } System.out.println("Executor Test Start"); ExecutorService exe=Executors.newFixedThreadPool(5); for(int i=0;i<20;i++){ exe.execute(new RunTask()); exe.submit(new RunTask()); } /*//驗證線程建立慢 if(exe.isShutdown()){ System.out.println("Thread 已經stop,等待線程建立"); exe.execute(new RunTask()); System.out.println("Executor Test end"); }else{ exe.shutdownNow(); } */ } } //結果以下: RunTask Thread Name is Thread-0 callTask output:callTask answer Executor Test Start RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-2 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-5 RunTask Thread Name is pool-1-thread-1 RunTask Thread Name is pool-1-thread-4 RunTask Thread Name is pool-1-thread-3 RunTask Thread Name is pool-1-thread-2

根據代碼的測試結果咱們能夠看出線程1,線程2已經跑了很久了線程3,4,5才建立好,這個從側面驗證了線程的建立是很耗時的。

具體咱們來看一下線程池類的工做流程和工做原理:

線程池線程的大小=核心線程+非核心線程

 

 線程池的工做原理如上圖所示已經很是清晰了,文字描述一下具體步驟:

一、啓動線程池執行任務的時候先建立核心線程來執行任務;

二、核心線程數量建立達到規定值之後,還有任務沒有線程執行的話就將任務放到阻塞隊列中取排隊等待;

三、等到隊列排滿了還有線程須要執行的話就建立非核心線程;

四、非核心線程還不夠執行任務的話就直接執行拒絕策略。

線程池的重要屬性

  在ThreadPoolExecutor中有如下幾個比較重要的屬性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

  ctl是一個控制狀態,對線程池的運行狀態和線程池中的有效線程數量進行控制的一個字段。它包含兩部分的信息:

  一、線程池的運行狀態(runState),高3位保存運行狀態.相關方法是private static int runStateOf(int c){ return c& ~CAPACITY;}

  二、線程池內的有效線程數量(workerCount),ctl 是個Integer類型的數據,低29位保存;相關方法是 private static int workerCountOf(int c){ return c& ~CAPACITY;}

  三、控制狀態的方法是:private static int ctlOf(int rs,int wc){return rs|wc;}

private static final int COUNT_BITS = Integer.SIZE - 3;( Integer.SIZE=31

  count_bits=29,1<<29,也就是說workerCount最大值(2^29)-1(約5億)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

 容量=2^29-1。

線程池的5種狀態:

一、RUNNING:private static final int RUNNING = -1 << COUNT_BITS;高3位111

狀態說明:線程池處於Running狀態的時候,可以接收新的任務,而且對已添加的任務進行處理,任務不能大於規定的最大任務數;

狀態切換:線程池的初始狀態是RUNNING,也就是說線程池一旦被建立就處於RUNNING狀態,而且線程池中的任務數量爲0;

二、SHUTDOWN:private static final int SHUTDOWN = 0 << COUNT_BITS;高三位爲000

狀態說明:線程池處於SHUTDOWN狀態的時候,不接收新的任務,可是能夠處理已經添加的任務;

狀態切換:調用線程池的shutDown()方法的時候,線程狀態由RUNNING ---->>>>SHUTDOWN
三、 STOP :private static final int STOP = 1 << COUNT_BITS;高三位爲001

狀態說明:線程池處於該狀態的時候,不接收新的任務,不處理已接收的任務,而且還會中斷正在處理的任務,中斷並不表明線程被殺死了,而且清空阻塞隊列。

狀態切換:線程池調用shutDownNow()接口的時候,線程池由RUNNING(SHUTDOWN)-------->>>STOP
四、TIDYING :private static final int TIDYING = 2 << COUNT_BITS;高三位爲010

狀態說明:當全部的任務已經終止,ctl的值爲0,線程池會變成TIDYING狀態,當線程池處於該狀態的時候會執行鉤子函數terminated().terminated()方法在ThreadPoolExecutor中是空的,用戶想在線程池變爲TIDYING狀態的時候處理東西,能夠經過重載terminated()方法實現。

狀態切換:當線程池處於SHUTDOWN狀態,而且阻塞隊列中的任務爲0,就會SHUTDOWN----->>>TIDYING,當線程池處於STOP狀態下,線程池中執行任務數量爲0,那麼線程池狀態STOP----->>>TIDYING.
五、TERMINATED:private static final int TERMINATED = 3 << COUNT_BITS;高三位爲011

狀態說明:線程池已經完全終止,就會變成TERMINATED狀態

狀態切換:線程池處於TIDYING狀態,執行完terminated()方法之後,就會實現TIDYING----->>>TERMINATED

進入該狀態的條件以下:

1)線程池不是RUNNING狀態;

2)線程池不是TIDYING狀態或者TERMINATED狀態;

3)若是線程池狀態是SHUTDOWN,而且阻塞隊列中的任務數量爲0;

4)workerCount=0;

5)設置TIDYING狀態成功;

 線程池狀態的切換以下圖所示:

 

線程池的默認實現-ThreadPoolExecutor

建立方法:

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

 

任務的提交:

  public void execute():提交任務沒有返回值

public Future<?> submit():提交任務有返回值

參數解釋:

corePoolSize:核心線程數,當提交一個任務的時候,建立一個核心線程,直到達到核心線程的最大數量。

maximumPoolSize:線程池最大容量,即該線程池最大容許的線程數量,噹噹前的阻塞隊列滿的時候,還有任務繼續提交,則建立新的非核心線程繼續執行,可是線程總數不能大於最大容量maximumPoolSize。

keepAliveTime:空餘線程存活時間,即當線程池的核心線程數達到最大的時候,沒有新的任務提交,那麼非核心線程不會當即銷燬,而是等待,等待時間大於keepAliveTime纔會進行銷燬。
unit:keepAliveTime的時間單位

BlockingQueue<Runnable> workQueue:阻塞隊列,當核心線程數達到最大值的時候,還有新任務提交則將這些線程放到該隊列中進行排隊等待,提交的任務必須實現Runnable接口。除了這個阻塞隊列之外,還有如下的阻塞隊列:

一、ArrayBlockingQueue:基於數組結構的有界隊列,遵循FIFO原則。
二、LinkedBlockingQueue:基於鏈表的有界隊列,遵循FIFO原則,吞吐量高於ArrayBlockingQueue。

三、SynchronousQueue:一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene;

四、priorityBlockingQuene:具備優先級的無界阻塞隊列;

threadFactory:用來建立線程,默認使用Executors.defaultThreadFactory() 來建立線程使用默認的ThreadFactory來建立線程時,會使新建立的線程具備相同的NORM_PRIORITY優先級而且是非守護線程,同時也設置了線程的名稱。

RejectedExecutionHandler handler:線程池的拒絕策略,線程池的拒絕策略有4種:

一、AbortPolicy:直接拋出異常,這是默認策略;

二、CallerRunsPolicy:用調用者所在的線程來執行任務:

三、DiscardOldestPolicy:拋棄阻塞隊列中最靠前的任務,並執行當前任務;

四、DiscardPolicy:直接丟棄任務

線程池監控:

 public int getPoolSize():獲取當前線程池的大小

public int getActiveCount():線程池中正在執行任務的線程數量

public int getLargestPoolSize():獲取線程池中出現出現過的最大線程數量

public long getTaskCount():獲取線程池已執行和未執行的線程總數

 public long getCompletedTaskCount():獲取已經完成的任務的數量

 
 線程池的工做原理圖以下:

 

 詳細的工做原理圖請參考線程池的執行過程圖。線程池中建立線程並執行用的方法是addWorker(Runnable firstTask, boolean core)方法。其中的firstTask用於指定新增線程執行的第一個任務,若是沒有任務執行能夠爲null;boolean core 主要是判斷當前線程池中活動的核心線程數是否達到最大,若是達到最大的話就建立非核心線程,該值爲false,若是沒有達到最大核心線程數量,則爲true,建立核心線程。

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
       //獲取線程池的運行狀態
int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary.當rs的值>=SHUTDOWN說明線程池再也不接收新的任務了,而後判斷如下三個條件:
      一、rs==SHUTDOWN 表示此時是關閉狀態,再也不接收新的任務,可是能夠繼續處理阻塞隊列中的任務
      二、firstTask==null:firstTask爲空
      三、!workQueue.isEmpty():阻塞隊列不爲空
      以上三個條件有一個不知足,則返回false,不建立新線程。
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c);//獲取工做線程數 if (wc >= CAPACITY || //判斷線程數是否大於初始容量,即ctl的低29位都爲1 wc >= (core ? corePoolSize : maximumPoolSize))//根據core的值,爲true,則將wc與核心線程數來比較,若是爲false,就跟線程池最大容量相比 return false; //若是以上兩個條件知足則說明線程池已經滿了,不能建立新線程,返回false if (compareAndIncrementWorkerCount(c))//嘗試增長workerCount,增長成功跳出此層for循環 break retry; c = ctl.get(); // Re-read ctl 增長workerCount失敗,從新獲取ctl值 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask);//根據firstTask建立worker final Thread t = w.thread;//每一個worker建立一個線程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get());//獲取線程池的運行狀態 if (rs < SHUTDOWN || //狀態是RUNNING狀態 (rs == SHUTDOWN && firstTask == null)) {//或者狀態處於SHUTDOWN,且firstTask爲空(SHUTDOWN不接收新任務可是任然執行隊列中的任務) if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//向線程池中添加線程,workers是一個HashSet int s = workers.size();//獲取線程池數量 if (s > largestPoolSize)//largestPoolSize記錄着線程池中出現過的最大線程數量,若是此時的線程數量大於以前的largestPoolSize,則從新賦值 largestPoolSize = s; workerAdded = true;//線程添加成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//運行線程 workerStarted = true;//線程運行成功 } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

 Worker類

   worker類封裝了線程池中的每個線程,線程池ThreadPool本質上維護的就是worker類,該類繼承了AQS,實現了Runnable接口。worker類包含屬性thread,firstTask,completedTasks三個屬性。

firstTask:保存傳入的任務

thread:執行任務建立的線程,在構造方法中經過 getThreadFactory().newThread(this)建立線程,傳入的是this,其實worker自己就實現了Runnable接口,因此worker自己就是一個線程,在線程啓動的時候就會調用worker類的run方法。Worker類繼承AQS,而不是ReetrantLo是由於AQS使用的是獨佔鎖,是不能夠重入的。lock一旦獲取到了獨佔鎖,就代表線程在運行中,就不該該中斷。若是線程不是在獨佔狀態,那麼就說明該線程是空閒線程,能夠進行中斷。線程池在執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閒的線interruptIdleWorker方法會使用tryLock方法來判斷線程池中的線程是不是空閒狀態;之因此設置爲不可重入,是由於咱們不但願任務在調用像setCorePoolSize這樣的線程池控制方法時從新獲取鎖。若是使用ReentrantLock,它是可重入的,這樣若是在任務中調用瞭如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線程。
         final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker是由於AQS中默認的state是0,若是剛建立了一個Worker對象,尚未執行任務時,這時就不該該被中斷
        
      this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
        }

在上述內容中我提到了,線程池中建立的線程主要是worker,線程的執行也是worker,worker在執行的時候調用的run方法,代碼裏的run方法調用的就是runWorker()方法,下面咱們就來解讀一下runWorker()方法:

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts 將state設置爲0
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//任務不爲空進入循環 getTask()從阻塞隊列中獲取任務
                w.lock();//線程運行上鎖
                /*若是線程正在中止,保證線程處於中斷狀態,若是不是的話保證當前線程不是中斷狀態;
           這裏須要考慮執行If語句期間也執行了SHUTDOWNNOW方法,將狀態直接置爲STOP,同時還會中斷線程池中的全部線程wt.interrupted()來判斷是否中斷,是爲了確保在RUNNING和SHUTDOWN狀態的時候是處於非中斷狀態。
If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted. This requires a recheck in second case to deal with shutdownNow race while clearing interrupt
          */
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();//復位中斷 try { beforeExecute(wt, task);//本身去實現,該類中此方法是空的 Throwable thrown = null; try { task.run();//運行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++;//完成任務數量+1 w.unlock();//釋放鎖 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

 根據上述的代碼以及註釋,如今來總結一下,線程運行的runWorker()方法整體的運行流程以下:

一、while循環,加鎖,從阻塞隊列中獲取任務(getTask())

二、若是線程正在中止,保證當前線程處於中斷狀態,若是不是則保證當前線程不是中斷狀態

三、運行任務內容

四、任務運行完成,釋放鎖。

五、當獲取的任務爲空跳出while循環,執行 processWorkerExit(w, completedAbruptly)方法。

 除了runWorker以外,上述還有提到getTask(),從阻塞隊列中獲取任務:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?表示上次從阻塞隊列中取值的時候是否有超時

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.線程池是SHUTDOWN以上的狀態的時候,判斷線程池是否中止,或者阻塞隊列是否爲空,若是都是的話那麼workerCout減1
        而且返回空值。由於在線程池在SHUTDOWN以上的狀態的時候應該,不接收新的任務,也不容許往阻塞隊列當中添加新的任務。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }        //獲取線程的數量 int wc = workerCountOf(c); // Are workers subject to culling?timed判斷是否須要進行超時控制。allowCoreThreadTimeOut參數是容許核心線程超時標識,默認是false.同時也判斷當前線程
數量是不是大於核心線程的數量的。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        //噹噹前工做線程數量大於規定線程池最大容量,或者超時控制爲真的時候,同時判斷當前線程數大於1或者阻塞隊列是空的兩個條件,當兩個條件中有一個爲真,那麼將workerCount-1,
c成功的話就返回null值,失敗的話就重試。該判斷比較重要,主要是當線程池中的線程數量處於大於corePoolSize,可是又小於maximumPoolSize的狀態下,獲取任務超時,說明阻塞隊列爲空,也就說明
如今線程池不須要那麼多線程來執行任務,須要把corePoolSize的線程銷燬,讓線程數量維持在corePoolSize便可。
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();//timed =true 那麼就經過阻塞隊列的poll方法進行超時控制,不然的話就從隊列中獲取任務。 if (r != null) return r;//獲取任務成功,返回獲取的任務。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

在runWorker()方法中,getTask失敗,是會跳出while 循環,執行processWorkerExit()方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // 若是執行線程出現異常,那麼workerCount-1
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;//completedTaskCount統計完成任務的線程數,同時移除已經執行完任務的worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {//線程的狀態是RUNNING或者SHUTDOWN進行如下判斷
            if (!completedAbruptly) {//若是線程非異常結束進行如下操做
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//若是容許核心線程超時,那麼min=0,若是不容許那麼min=核心線程的數量
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;//若是核心線程容許超時,min==0且阻塞隊列不爲空的,min=1,即至少保證線程池中有1個worker
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);//若是worker異常結束那麼就addWorker
        }
    }

綜上所述,線程池中線程的執行過程大體以下:

 

 也就是說,當線程池在執行Executors.newFixedThreadPool(n).execute(Runnable)的方法的時候,就進入到線程池ThreadPoolExecutor中去執行execute(Runnable)方法,該方法主要addWorker(),在執行addWorker的時候,worker類會建立線程getThreadFactory().newThread(this),建立好線程之後,線程會啓動,t.start()實際調用的就是worker類中的run()方法,該方法的實質是運行runWorker()方法,在執行該方法的時候就會從阻塞隊列中獲取任務,獲取任務成功之後執行線程,完成任務便可。

相關文章
相關標籤/搜索