簡單聊一聊ThreadPoolExecutor

本文首發於www.eumji025.comjava

簡介

線程池的誕生於JDK1.5,主要的目的是解決咱們在使用線程的時候一般都是重複的建立和銷燬,爲了讓線程可以獲得複用,避免咱們重複的建立和銷燬,提升咱們的效率,下降內存的開銷。沒錯又是Doug Lea大神又搞出了線程池這一強力工具。數據庫

咱們最熟悉的線程池使用案例應該就是數據庫鏈接池,以及咱們任務調度都是會使用線程池的。設計模式

Executors用來建立和管理咱們具體的ThreadPoolExecutor,這裏使用了典型的設計模式 - 工廠模式。ThreadPoolExecutor是真正線程池,繼承了AbstractExecutorService類,Java集合類和併發類都大量的使用了抽象類實現部分通用的功能。此處的AbstractExecutorService就實現了ExecutorService部分接口功能。最關鍵的execute方法交給子類去實現。和集合類的套路基本上是如出一轍。併發

看一下Executors的具體實現。ide

public static ExecutorService newFixedThreadPool(int var0) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0));
}
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
    return new ScheduledThreadPoolExecutor(var0);
}
複製代碼

隨便列舉了其中的幾個例子,這裏具體描述一下構造函數的幾個參數做用。函數

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
複製代碼

corePoolSize => 指代默認建立的線程數。工具

maximumPoolSize => 建立線程的最大數量。測試

keepAliveTime => 線程存活時間this

unit => 存活的時間,應該都很熟悉,包含日,時,分,秒等spa

workQueue => 存放線程的阻塞隊列

threadFactory => 建立線程的工廠,默認爲DefaultThreadFactory,主要是重寫ThreadFactory接口的newThread的方法。

handler => 拒絕策略,主要是指工做任務超過了workQueue的大小後,該執行哪一種策略進行處理。主要有一下幾種:

1.AbortPolicy => 默認的策略,直接拋出異常

2.DiscardPolicy => 放棄被拒絕的任務,其實就是啥也不幹

3.DiscardOldestPolicy => 放棄最老的任務,也就是立刻要執行的任務

4.CallerRunsPolicy => 直接執行被放棄的任務,我的不喜歡,赤裸裸的插隊(並且根本就沒有拒絕)

上面簡單的介紹了線程池的各個參數,如今就看一下到底能夠生成哪些線程池。

fixedThreadPool

fixedThreadPool => 固定大小線程池,一旦建立,數量就不會再改變,若是任務超過線程的數量,就會進入等待的隊列,使用的LinkedBlockingQueue就能夠認爲是無界的隊列了由於capacity等於Integer.MAX_VALUE

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
複製代碼

咱們簡單的測試一下就能夠發現其中的功能

static class MyThread extends Thread{
    @Override
    public void run() {
      try {
        Thread.sleep(500L);
        System.out.println(Thread.currentThread().getName() +" running !!!");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
}
static void fixedThreadPoolTest(){
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 20; i++) {
        executorService.submit(new MyThread());
    }
}
複製代碼

執行這個test方法的時候,會發現只會有5種線程名稱被打印。說明沒有沒有得到線程的任務就等待,並且是複用的。後續的例子都將使用MyThread作測試。

cachedThreadPool

newCachedThreadPool => 大小不固定,爲達到最大值時能夠動態生成線程,默認使用的是SynchronousQueue隊列,是一種同步隊列,指只能存放一個元素,添加了必須被消費了才能再添加。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼

下面簡單的使用一個例子進行說明。

static void cachedThreadPoolTest() throws InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();

    for (int i = 0; i < 5; i++) {
      executorService.submit(new MyThread());
    }
    Thread.sleep(1000L);
    for (int i = 0; i < 20; i++) {
      executorService.submit(new MyThread());
    }
}
複製代碼

上面測試的例子將複用前五個線程,並再新建15個線程,結果就不展現了。

singleThreadExecutor

singleThreadExecutor => 大小固定的且只有一個線程的線程池,能夠理解爲一個元素的fixedThreadPool。

public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
       (new ThreadPoolExecutor(1, 1,
                               0L, TimeUnit.MILLISECONDS,
                               new LinkedBlockingQueue<Runnable>()));
 }
複製代碼

就不進行測試代碼的展現了,由於和fixedThreadPool的道理相同,只不過只有一個線程。

scheduledThreadPool

scheduledThreadPool => 是一種大小不固定的定時任務線程池。使用的DelayedWorkQueue延時隊列進行任務記錄。DelayedWorkQueue是ScheduledThreadPoolExecutor的內部類

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}
複製代碼

下面演示一個簡單的例子演示如何。須要注意的是延時任務調用的方法會有點不一樣。

static void singleThreadScheduledTest(){
    ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
    for (int i = 0; i < 2; i++) {
      //延遲5秒執行,只執行一次
      executorService.schedule(new MyThread(),1,TimeUnit.SECONDS);
      //延遲5秒 執行5個週期執行
      //executorService.scheduleAtFixedRate(new MyThread(),5,3, TimeUnit.SECONDS);
    }
}
複製代碼

上面測試裏的1表明延時1秒執行,且只執行一次,若是想週期執行,可調用下面註釋的方法scheduleAtFixedRate方法,表示第一次延時5秒執行,後面的是以3秒爲一個週期的執行。

須要注意的是:

1.不會自動出現中止,除非發生異常或者手動的取消掉。

2.假如執行的週期比線程的執行時間短,則會以延時的任務的執行時間長度爲準。

singleThreadScheduledExecutor

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
      (new ScheduledThreadPoolExecutor(1));
}
複製代碼

能夠看出來singleThreadScheduledExecutor和ScheduledThreadPoolExecutor仍是有必定的區別的,singleThreadScheduledExecutor是單獨的一個實現類,不過本文不作具體分析。

方法解讀

上面大概的介紹了線程池中的幾種線程池,接下來咱們將介紹一下如何其中究竟是如何實現的。咱們只說一下常規的線程池的執行邏輯。

從上面的代碼咱們能夠看到,各類線程池的不一樣主要體如今線程的數量範圍和使用的workQueue不一樣。最終都會調用submit方法,首先看一下線程池中幾種不一樣參數的submit方法。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
}
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
複製代碼

submit方法支持多種類型的任務,最終都會包裝成RunnableFuture的task。這裏體現了一個重要的設計模式 - 適配器模式,下面看一下詳細代碼

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}
複製代碼

其實目的很簡單就是把Runnable最終包裝成Callable。

在介紹具體的方法以前,首先咱們看一下線程池中幾種狀態,由於後續會使用到。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));	//最小的負數
    private static final int COUNT_BITS = Integer.SIZE - 3;
    public static final int SIZE = 32;
	private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//29個1

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; //11100000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
    private static final int STOP       =  1 << COUNT_BITS;//100000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;//1000000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;//1100000000000000000000000000000

    // c & 29個0 其實就是獲取高三位
    private static int runStateOf(int c) { return c & ~CAPACITY; }
    private static int workerCountOf(int c) { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

複製代碼

這是一個很是經典的設計,咱們能夠看出,咱們的低29位都是用來記錄任務的。高3位表示狀態

RUNNING => 高3位值是111。 此狀態表示能夠接受新任務

SHUTDOWN => 高3位值是000。 此狀態不能接受新任務,但會繼續已有的任務

STOP => 高3位值是001。 此狀態的線程不接受也不處理任務,而且會中斷處理中的任務

TIDYING => 高3位值是010。 線程池全部的任務都已經終止,而且worker數量爲0,即將運行terminated方法

TERMINATED => 高3位值是011。在TIDYING狀態上,已經運行了terminated方法,線程池徹底的中止。

上面的這些數字很重要,必定要記住。

接着上面的講,最終不論是哪一種submit的方法,都會交給execute方法去執行真正的邏輯。

public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();
    int c = ctl.get();
   //若是尚未超過線程池的線程容量,直接分配線程執行
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true)) return;
      c = ctl.get();
    }
   //不然判斷線程池的狀態,若是是正在運行狀態,加入到workQueue等待執行
    if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      //再次校驗狀態,若是線程池不在運行狀態,移除任務,並執行拒絕策略。
      if (! isRunning(recheck) && remove(command))
        reject(command);
      //若是沒有正在運行的線程,添加一個線程
      else if (workerCountOf(recheck) == 0)
        addWorker(null, false); //直接開啓一個線程,由於任務已經在workQueue上了
    }
  //若是workQueue添加失敗,嘗試直接起一個worker,用於coreSize和MaxSize不等的狀況
    else if (!addWorker(command, false))
      reject(command);
}
複製代碼

簡單的概述一下上述代碼所作的事情:

1.若是當前的活動的線程小於設置的線程數,則直接啓動新線程執行任務,不然

2.若是線程池是處於運行狀態,且線程數爲corePoolSize,且workQueue沒滿,把任務加入到等待隊列中,若是執行成功,再次檢查線程的運行狀態,到第三步,不然到第四步

3.再次校驗狀態,若是沒有處於運行的狀態,把添加的任務剔除。

4.線程池若是不處於運行狀態,或者workQueue已經滿了,workQueue滿了,還能夠再次嘗試執行分配一個線程(用於corePoolSize不等於maximumPoolSize的狀況下),若是仍是失敗,說明線程池已經到極限了和或者是已經關閉了線程池。

接下來須要看一下第一步中的addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 線程的狀態,running狀態的沒法進入
           //須要注意其中不能添加一個woker的條件。SHUTDOWN狀態下且不一樣時知足firstTask爲null,workQueue爲空的條件
            if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))
                return false;
            //思考可以到達這裏的條件
            for (;;) {
               //線程的數量
                int wc = workerCountOf(c);
               //超出了最大容量,或者線程超過規定的數量,失敗
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //數量+1,跳出循環
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //添加一個worker 裏面有玄機,後面介紹
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //再次檢查線程池的狀態
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 若是線程已經在運行中,
                            throw new IllegalThreadStateException();
                      //添加一個worker記錄
                        workers.add(w);
                        int s = workers.size();
                       //增長最大數量 ,默認爲0
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
              // 添加成功則啓動任務
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

addWorker方法看起來比較長,其實作的事情很是簡單。

1.判斷線程池的狀態,若是不是正常狀態(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))則添加失敗,不然進行第二步

2.根據條件判斷是否還能在添加線程,能夠則workCount加1成功跳出循環,執行worker邏輯,不然重試或者結束。

3.第二步成功,配置Worker,並在此檢查線程池的狀態,若是沒有問題,則設置worker相關信息,並啓動線程。

而後咱們在來看一下execute方法中的remove方法,我相信remove方法不只僅是從workQueue移除元素,否則也不會單獨寫個方法。

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}
複製代碼

首先從workQueue移除元素,而後嘗試關閉線程池。具體邏輯仍是在tryTerminate方法中。

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //若是是運行狀態,或者已經中止,或者是存於shutdown狀態,可是任務沒有處理完,都直接結束,也就證實嘗試中止失敗
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
           //若是還有工做的線程,把worker的中斷狀態設爲true,ONLY_ONE表示只中斷一個
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
			//沒有工做的線程了真的藥中止了
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              //設置爲TIDYING狀態
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                      //留給子類擴展的
                        terminated();
                    } finally {
                       //最終設置爲TERMINATED狀態
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }
複製代碼

tryTerminate是任何從workerQueue移除任務都會調用的方法,用來判斷當前線程池是否已經沒有活着的線程了,若是沒有了就關閉線程池。

再次回到execute方法中,reject方法就是調用拒絕策略中的rejectedExecution方法,默認的AbortPolicy就是拋個異常僅此而已。

後面也只是一些相同的方法就再也不多介紹了,最重要的仍是條件判斷。

shutdown方法

在看一下線程池的shutdown相關的方法。

主要包含三個方法:

1.shutdown方法 => 將線程池的狀態設置爲shutdown狀態

2.shutdownNow方法 => 直接中止線程池。

3.isShutdown方法 => 判斷當前線程池的狀態是否不是running狀態

下面跟隨着源碼分別看看這幾個方法的詳情。

public boolean isShutdown() {
  	return ! isRunning(ctl.get());
}
private static boolean isRunning(int c) {
  	return c < SHUTDOWN;
}
複製代碼

isShutdown方法仍是老套路,直接判斷是否小於SHUTDOWN狀態就能夠判斷是否爲Running狀態。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      checkShutdownAccess(); //檢查權限,
      advanceRunState(SHUTDOWN); //設置爲SHUTDOWN狀態
      interruptIdleWorkers(); //中斷等待任務的線程
      onShutdown(); // 空方法,爲ScheduledThreadPoolExecutor留的方法
    } finally {
      mainLock.unlock();
    }
    tryTerminate(); //嘗試關閉線程池
}
複製代碼

shutdown方法是將線程池的狀態設置爲SHUTDOWN,而且設置線程的中斷狀態,注意這裏的中斷只會中斷在等待中的線程(沒有上鎖的線程)。比較簡單裏面的詳情就不展現出來了。

public List<Runnable> shutdownNow() {
     List<Runnable> tasks;
     final ReentrantLock mainLock = this.mainLock;
     mainLock.lock();
     try {
       checkShutdownAccess();
       advanceRunState(STOP); //設置爲STOP狀態
       interruptWorkers(); //中斷全部運行中的且沒有被設置中斷標誌的線程
       tasks = drainQueue(); //獲取等待中的任務列表
     } finally {
       mainLock.unlock();
     }
     tryTerminate();
     return tasks;
 }
複製代碼

shutdownNow方法和上面的shutdown方法很類似,只是不一樣的是,shutdownNow更完全,直接將線程池的狀態設置爲STOP,而且會移除有全部的等待中的task,並且這裏設置的是全部運行中線程的中斷狀態。下面看一下drainQueue方法

private List<Runnable> drainQueue() {
     BlockingQueue<Runnable> q = workQueue;
     ArrayList<Runnable> taskList = new ArrayList<Runnable>();
     q.drainTo(taskList);//將workQueue的對象轉移到taskList(會清空q裏的元素)
     if (!q.isEmpty()) {
       //若是q還有新offer的元素
       for (Runnable r : q.toArray(new Runnable[0])) {
         if (q.remove(r))
           taskList.add(r); //添加到taskList中
       }
     }
     return taskList;
 }
複製代碼

主要作的事情就是從workQueue中獲取全部的任務放到taskList中,並從workQueue中刪除。

補充

前面咱們瞭解線程是如何執行任務和關閉線程池的方法,可是咱們須要思考這樣一個場景,就是當咱們有任務被放在workQueue裏的時候,上述的方法並無講述這樣的狀況下是如何執行的,這裏須要介紹一下其中的邏輯。這時候就能夠看一下留在addWorker的玄機了。。。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
複製代碼

能夠從構造函數看出,thread對象其實指向的就是當前的worker,因此addWorker方法後面的thread.start就會調用worker.run方法。還有一點值得注意的是Worker繼承了AbstractQueuedSynchronizer,下面詳細看一下run方法中的實現

public void run() {
  	runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; //取出worker中的任務
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      //獲取任務,
        while (task != null || (task = getTask()) != null) {
            w.lock();
           //若是線程池中止了,當前 線程沒有終端,將當前線程設爲中斷狀態
            if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                wt.interrupt();
            try {
              //空方法,留給子類的
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                   //執行task的邏輯
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

從上面的方法能夠看出,worker的run方法主要作了幾件事。

1.循環獲取任務,並執行,發生異常則拋出異常

2.若是沒有問題最終關閉worker。

首先看一下getTask方法是如何操做的。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

      //檢查隊列是否爲空,或者線程池是否處理關閉狀態
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           //遞減worker的數量
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判斷是否已經設置超時
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       //線程超過了最大數量或者超時,說明不可用了,幹掉
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
           //獲取任務
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
           //不然獲取任務超時
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

上述獲取任務的方法仍是比較複雜的,特別是狀態的判斷,簡單的總結一下:

1.若是線程池的狀態是STOP或者工做的隊列爲空,循環去一個一個的減小worker的數量,此處只是減小數量。並無結束裏面的worker。

2.若是不知足第一條,開始校驗是否設置了超時關閉線程或者說線程數超過了設置的值。這時候判斷去判斷線程1.是否超過了線程數的最大值或者知足了超時的條件 2.線程數大於1或者已經沒有待處理的工做了。知足這些條件就去掉一個worker

3.若是2也沒有知足,就嘗試獲取task,獲取到了就返回,不然就設置timeOut爲true,說明取task失敗了。

上面介紹瞭如何獲取任務和管理worker的getTask方法,下面咱們在看一下任務執行完後的processWorkerExit方法。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   //若是completedAbruptly爲true就減小worker的數量,產生於runWorker發生異常。
    if (completedAbruptly) decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
      //刪除這個worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //完成任務了就嘗試關閉線程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
           //判斷有沒有設置超時
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
          //判斷還有沒有線程能夠工做
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
      //執行addworker
        addWorker(null, false);
    }
}
複製代碼

上述的方法主要是worker進行退出,主要作的幾件事以下

1.判斷是否正常的結束,若是不是就要刪減worker。

2.記錄完成任務的數量並移除worker。

3.嘗試關閉線程池,而後判斷線程池的狀態,若是尚未處於中止的狀態,繼續判斷是否是正常的結束,若是是的話去檢查線程池裏線程的狀態,若是正常就結束,若是不知足最好都添加一個worker。

總結

1.本文首先介紹了線程池的幾種類型的線程池,從代碼均可以看到其實共用用的同一個構造方法,不一樣的只是參數的不用。

2.分析了線程池的幾種狀態,這裏是比較重要的,特別是高三位表示狀態,低29位表示線程數。

3.分析了submit和execute方法,經過corePoolSize,maximumPoolSize,workQueue來判斷新任務是新啓一個線程仍是加入到workQueue中,或是執行拒絕策略。

4.分析了shutdown相關的方法邏輯

5.分析了worker究竟是如何工做的,這裏主要是對內部類worker的run及其涉及的方法進行解讀。

本篇只是我的看源碼的一些觀點,若是存在不清晰或者表述錯誤的觀點歡迎你們反饋。

與君共勉!!!

相關文章
相關標籤/搜索