【萬字圖文-原創】 | 學會Java中的線程池,這一篇也許就夠了!

碎碎念

關於JDK源碼相關的文章這已是第四篇了,原創不易,粉絲從幾十人到昨天的666人,真的很感謝以前幫我轉發文章的一些朋友們。java

關注人數.png

從16年開始寫技術文章,到如今博客園已經發表了222篇文章,大多數都是原創,共有800多粉絲,基本上每月都會有文章的產出。面試

博客園信息.png

博客園文章月份記錄.png

回顧這幾年以來寫做的心路歷程,一直都是偷偷的寫,偷偷的發,懼怕被人知道,怕被人罵文章寫的太水(以前心理太脆弱了,哈哈)。後面和cxuan聊事後,他建議我給他投稿試試,因而就有了那一篇的萬字的AQS文章。數據庫

最近也有好多讀者加到個人微信,問一些文章中的問題,我也都會認真解答,看到有人閱讀個人文章並有所收穫,我真的挺欣慰,這就是寫做的動力吧。數組

幫助別人的同時也是在幫助本身,本身學的技術和理解的內容都是有侷限性的。經過寫文章結識到了不少朋友,聽聽別人的分析和看法,我也能學到不少。安全

答疑.png

每次看到博客中有人留言都很激動,也會第一時間去回覆。感謝下面的公衆號大佬們以前無私的幫助,你們也能夠關注一下他們,都是很nice的大佬:微信

Java建設者、Java團長、程序猿石頭、碼象全棧、Java3y、JAVA小咖秀、Bella的技術輪子、石杉的架構筆記、武培軒、程序通事架構

前言

Java中的線程池已經不是什麼神祕的技術了,相信在看的讀者在項目中也都有使用過。關於線程池的文章也是數不勝數,咱們站在巨人的肩膀上來再次梳理一下。併發

本文仍是保持原有的風格,圖文解析,儘可能作到多畫圖!全文共20000+字,建議收藏後細細品讀,閱讀期間搭配源碼食用效果更佳!ide

讀完此文你將學到:函數

  1. ThreadPoolExecutor中經常使用參數有哪些?
  2. ThreadPoolExecutor中線程池狀態和線程數量如何存儲的?
  3. ThreadPoolExecutor有哪些狀態,狀態之間流轉是什麼樣子的?
  4. ThreadPoolExecutor任務處理策略?
  5. ThreadPoolExecutor經常使用的拒絕策略有哪些?
  6. Executors工具類提供的線程池有哪些?有哪些缺陷?
  7. ThreadPoolExecutor核心線程池中線程預熱功能?
  8. ThreadPoolExecutor中建立的線程如何被複用的?
  9. ThreadPoolExecutor中關閉線程池的方法shutdownshutdownNow的區別?
  10. ThreadPoolExecutor中存在的一些擴展點?
  11. ThreadPoolExecutor支持動態調整核心線程數、最大線程數、隊列長度等一些列參數嗎?怎麼操做?

本文源碼基於JDK1.8

線程池基本概念

線程池是一種池化思想的產物,如同咱們數據庫有鏈接池、Java中的常量池。線程池能夠幫助咱們管理線程、複用線程,減小線程頻繁新建、銷燬等帶來的開銷。

在Java中是經過ThreadPoolExecutor類來建立一個線程池的,通常咱們建議項目中本身去定義線程池,不推薦使用JDK提供的工具類Executors去構建線程池。

查看阿里巴巴開發手冊中也有對線程池的一些建議:

【強制】建立線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。 正例:自定義線程工廠,而且根據外部特徵進行分組,好比,來自同一機房的調用,把機房編號賦值給whatFeaturOfGroup

public class UserThreadFactory implements ThreadFactory {

	private final String namePrefix;
	private final AtomicInteger nextId = new AtomicInteger(1);

	UserThreadFactory(String whatFeaturOfGroup) {
		namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
	}

	@Override
	public Thread newThread(Runnable task) {
		String name = namePrefix + nextId.getAndIncrement();
		Thread thread = new Thread(null, task, name, 0, false);
		System.out.println(thread.getName());
		return thread;
	}
}

【強制】線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。

說明:線程池的好處是減小在建立和銷燬線程上所消耗的時間以及系統資源的開銷,解決資源不足的問題。 若是不使用線程池,有可能形成系統建立大量同類線程而致使消耗完內存或者「過分切換」的問題。

【強制】線程池不容許使用 Executors 去建立,而是經過 ThreadPoolExecutor 的方式,這 樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。

說明:Executors 返回的線程池對象的弊端以下: 1) FixedThreadPool 和 SingleThreadPool: 容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM。 2) CachedThreadPool: 容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM。

線程池使用示例

下面是一個自定義的線程池,這是以前公司在用的一個線程池,修改其中部分屬性和備註作脫敏處理:

public class MyThreadPool {
	static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);

	private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

	private static final String THREAD_POOL_NAME = "MyThreadPool-%d";

	private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
			.daemon(true).build();

	private static final int DEFAULT_SIZE = 500;

	private static final long DEFAULT_KEEP_ALIVE = 60L;

	private static ExecutorService executor;

	private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);

	static {
		try {
			executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
					TimeUnit.SECONDS, executeQueue, FACTORY);

			Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
				@Override
				public void run() {
					LOGGER.info("MyThreadPool shutting down.");
					executor.shutdown();

					try {
						if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
							LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
							executor.shutdownNow();
						}
					} catch (InterruptedException e) {
						LOGGER.error("MyThreadPool shutdown interrupted.");
						executor.shutdownNow();
					}

					LOGGER.info("MyThreadPool shutdown complete.");
				}
			}));
		} catch (Exception e) {
			LOGGER.error("MyThreadPool init error.", e);
			throw new ExceptionInInitializerError(e);
		}
	}

	private MyThreadPool() {
	}

	public static boolean execute(Runnable task) {

		try {
			executor.execute(task);
		} catch (RejectedExecutionException e) {
			LOGGER.error("Task executing was rejected.", e);
			return false;
		}

		return true;
	}

	public static <T> Future<T> submitTask(Callable<T> task) {

		try {
			return executor.submit(task);
		} catch (RejectedExecutionException e) {
			LOGGER.error("Task executing was rejected.", e);
			throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
		}
	}
}

這裏主要就是使用調用ThreadPoolExecutor構造函數來構造一個線程池,指定自定義的ThreadFactory,裏面包含咱們本身線程池的poolName等信息。重寫裏面的execute()submitTask()方法。 添加了系統關閉時的鉤子函數shutDownHook(),在裏面調用線程池的shutdown()方法,使得系統在退出(使用ctrl c或者kill -15 pid)時可以優雅的關閉線程池。

若是有看不懂的小夥伴也沒有關係,後面會詳細分析ThreadPoolExecutor中的源碼,相信看完後面的代碼再回頭來看這個用例 就徹底是小菜一碟了。

線程池實現原理

經過上面的示例代碼,咱們須要知道建立線程池時幾個重要的屬性:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);

corePoolSize: 線程池核心線程數量 maximumPoolSize: 線程池最大線程數量 workQueue: 線程池中阻塞隊列,通常指定隊列大小

線程池中數據模型能夠簡化成下圖所示,其中Thread應該是添加的一個個Worker,這裏標註的Thread是爲了方便理解:

線程池中數據模型.png

線程池中提交一個任務具體執行流程以下圖:

執行流程.png

提交任務時,比較當前線程池中線程數量和核心線程數的大小,根據比較結果走不一樣的任務處理策略,這個下面會有詳細說明。

線程池中核心方法調用鏈路:

方法調用鏈路.png

TheadPoolExecutor源碼初探

TheadPoolExecutor中經常使用屬性和方法較多,咱們能夠先分析下這些,而後一步步往下深刻,經常使用屬性和方法以下:

線程池常見屬性和方法.png

具體代碼以下:

public class ThreadPoolExecutor extends AbstractExecutorService {

	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	private static final int COUNT_BITS = Integer.SIZE - 3;
	private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

	private static final int RUNNING    = -1 << COUNT_BITS;
	private static final int SHUTDOWN   =  0 << COUNT_BITS;
	private static final int STOP       =  1 << COUNT_BITS;
	private static final int TIDYING    =  2 << COUNT_BITS;
	private static final int TERMINATED =  3 << COUNT_BITS;

	private static int runStateOf(int c)     { return c & ~CAPACITY; }
	private static int workerCountOf(int c)  { return c & CAPACITY; }
	private static int ctlOf(int rs, int wc) { return rs | wc; }

	private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

     private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), defaultHandler);
    }
}
  1. ctl

ctl表明當前線程池狀態和線程池線程數量的結合體,高3位標識當前線程池運行狀態,後29位標識線程數量。ctlOf方法就是rs(線程池運行狀態)和wc(線程數量)按位或操做

  1. COUNT_BITS

COUNT_BITS = Integer.SIZE - 3 = 29,在ctl中,低29位用於存放當前線程池中線程的數量

  1. CAPACITY

CAPACITY = (1 << COUNT_BITS) - 1 咱們來計算一下: 1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000 (1 << 29) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111 這個屬性是用來線程池能裝載線程的最大數量,也能夠用來作一些位運算操做。

  1. 線程池幾種狀態

RUNNING:

(1) 狀態說明:線程池處在RUNNING狀態時,可以接收新任務,以及對已添加的任務進行處理。 (2) 狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被建立,就處於RUNNING狀態,而且線程池中的任務數爲0

SHUTDOWN:

(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。 (2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN

STOP:

(1) 狀態說明:線程池處在STOP狀態時,不接收新任務,不處理已添加的任務,而且會中斷正在處理的任務。 (2) 狀態切換:調用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP

TIDYING:

(1) 狀態說明:當全部的任務已終止,ctl記錄的"任務數量"爲0,線程池會變爲TIDYING狀態。當線程池變爲TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變爲TIDYING時,進行相應的處理;能夠經過重載terminated()函數來實現。 (2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列爲空而且線程池中執行的任務也爲空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務爲空時,就會由STOP -> TIDYING

TERMINATED:

(1) 狀態說明:線程池完全終止,就變成TERMINATED狀態。 (2) 狀態切換:線程池處在TIDYING狀態時,執行完terminated()以後,就會由 TIDYING -> TERMINATED

狀態的變化流轉:

線程池的狀態流轉.png

  1. runStateOf()

計算線程池運行狀態的,就是計算ctl前三位的數值。`unStateOf() = c & ~CAPACITY,CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111,那麼~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000,它與任何數按位與的話都是隻看這個數前三位

  1. workerCountOf()

計算線程池的線程數量,就是看ctl的後29位,workerCountOf() = c & CAPACITY, CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111與任何數按位與,就是看這個數的後29位

  1. ctlOf(int rs, int wt)

在獲取當前線程池ctl的時候會用到,在後面源碼中會有不少地方調用, 傳遞的參數rs表明線程池狀態,wt表明當前線程次線程(worker)的數量

  1. runStateLessThan(int c, int s)

return c < s,c通常傳遞的是當前線程池的ctl值。比較當前線程池ctl所表示的狀態是否小於某個狀態s

  1. runStateAtLeast(int c, int s)

return c >= s,c通常傳遞的是當前線程池的ctl值。比較當前線程池ctl所表示的狀態,是否大於等於某個狀態s

  1. isRunning(int c)

c < SHUTDOWN, 判斷當前線程池是不是RUNNING狀態,由於只有RUNNING的值小於SHUTDOWN

  1. compareAndIncrementWorkerCount()/compareAndDecrementWorkerCount()

使用CAS方式 讓ctl值分別加一減一 ,成功返回true, 失敗返回false

  1. decrementWorkerCount()

將ctl值減一,這個方法用了do...while循環,直到成功爲止

  1. completedTaskCount

記錄線程池所完成任務總數 ,當worker退出時會將 worker完成的任務累積到completedTaskCount

  1. Worker

線程池內部類,繼承自AQS且實現Runnable接口。Worker內部有一個Thread thread是worker內部封裝的工做線程。Runnable firstTask用來接收用戶提交的任務數據。在初始化Worker時候會設置state爲-1(初始化狀態),經過threadFactory建立一個線程。

  1. ThreadPoolExecutor初始化參數

corePoolSize: 核心線程數限制 maximumPoolSize: 最大線程限制 keepAliveTime: 非核心的空閒線程等待新任務的時間 unit: 時間單位。配合allowCoreThreadTimeOut也會清理核心線程池中的線程。 workQueue: 任務隊列,最好選用有界隊列,指定隊列長度 threadFactory: 線程工廠,最好自定義線程工廠,能夠自定義每一個線程的名稱 handler: 拒絕策略,默認是AbortPolicy

execute()源碼分析

當有任務提交到線程池時,就會直接調用ThreadPoolExecutor.execute()方法,執行流程以下:

執行流程.png

從流程圖可看,添加任務會有三個分支判斷,源碼以下:

java.util.concurrent.ThreadPoolExecutor.execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

c在這裏表明線程池ctl的值,包含工做任務數量以及線程池的狀態,上面有解釋過。

接着看下面幾個分支代碼:

分支一: if (workerCountOf(c) < corePoolSize) ,條件成立表示當前線程數量小於核心線程數,這次提交任務,直接建立一個新的worker

if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}

若是線程數小於核心線程數,執行addWorker操做,這個後面會講這個方法的細節,若是添加成功則直接返回,失敗後會從新計算ctl的值,而後執行分支二。

針對addWorker()執行失敗的狀況,有如下幾種可能:

  1. 存在併發狀況,execute()方法是可能有多個線程同時調用的,當多個線程同時workerCountOf(c) < corePoolSize成立後,就會向線程池中建立worker,這個時候線程池的核心線程數可能已經達到,在addWorker中還會再次判斷,因此會有任務添加失敗。

addWorker()失敗場景一.png

  1. 當前線程池狀態發生改變,例如線程A執行addWorker()方法時,線程B修改線程池狀態,致使線程池不是RUNNING狀態,此時線程A執行addWorker()就有可能失敗。

addWorker()失敗場景二.png

分支二: if (isRunning(c) && workQueue.offer(command)) {}

經過分支一流程的分析,咱們能夠知道執行到這個分支說明**當前線程數量已經達到corePoolSize或者addWorker()執行失敗,咱們先看看分支二執行流程:

超過核心線程數往隊列中添加任務流程圖.png

首先判斷當前線程池是否處於RUNNING狀態,若是是則嘗試將task放入到workQueue中,workQueue是咱們在初始化ThreadPoolExecutor時傳入進來的阻塞隊列。

若是當前任務成功添加到阻塞隊列中,再次獲取ctl賦值給recheck變量,而後執行:

if (!isRunning(recheck) && remove(command))
    reject(command);

再次判斷當前線程池是否爲RUNNINT狀態,若是不是則說明提交任務到隊列以後,線程池狀態被其餘線程給修改了,好比調用shutdown()/shutdownNow()等。這種狀況就須要把剛剛提交到隊列中的的任務刪除掉。

再看下remove()方法:

public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate();
    return removed;
}

若是任務提交到隊列以後,線程池中的線程還未將這個任務消費,那麼就能夠remove成功,調用reject()方法來執行拒絕策略。 若是在改變線程池狀態以前,隊列中的數據已經被消費了,此時remove()就會失敗。

移除隊列中Task任務.png

接着走else if中的邏輯:

else if (workerCountOf(recheck) == 0)
    addWorker(null, false);

走這個else if邏輯有兩種可能,線程池是RUNNING狀態或者線程池狀態被改變且workQueue中添加的任務已經被消費致使remove()失敗。 若是是RUNNING狀態,線程池中的線程數量是0,此時workQueue中還有待執行的任務,就須要新增一個worker(addWorker裏面會有建立線程的操做),繼續消費workqueue中的任務。

添加新任務.png

這裏要注意一下addWorker(null, false),也就是建立一個線程,但並無傳入任務,由於任務已經被添加到workQueue中了,因此worker在執行的時候,會直接從workQueue中獲取任務。在workerCountOf(recheck) == 0時執行addWorker(null, false)也是爲了保證線程池在RUNNING狀態下必需要有一個線程來執行任務,能夠理解爲一種擔保兜底機制。

至於線程池中線程爲什麼能夠爲0?這個若是咱們設置了allowCoreThreadTimeOut=true,那麼核心線程也是容許被回收的,後面getTask()中代碼有說起。

分支三: else if (!addWorker(command, false)) {}

經過分支一和分之二的分析,進入這個分支的前置條件:線程數超過核心線程數且workQueue中數據已滿。

else if (!addWorker(command, false)),執行添加worker操做,若是執行失敗就直接走reject()拒絕策略。這裏添加失敗多是線程數已經超過了maximumPoolSize

分支三執行流程.png

addWorker()源碼分析

上面分析提交任務的方法execute()時屢次用到addWorker方法,接收任務後將任務添加到Worker中。

WorkerThreadPoolExecutor中的內部類,繼承自AQS且實現了Runnable接口。 類中包含Thread thread,它是worker內部封裝的工做線程,還有firstTask屬性,它是一個可執行的Runnable對象。在Worker的構造函數中,使用線程工廠建立了一個線程,當thread啓動的時候,會以worker.run()爲入口啓動線程,這裏會直接調用到runWorker()中。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    private static final long serialVersionUID = 6138294804551838833L;

    final Thread thread;
    Runnable firstTask;
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1);
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }
}

流程以下圖:

添加Worker.png

這裏再回頭看下addWorker(Runnable firstTask, boolean core) 方法,這個方法主要是添加一個Worker到線程池中並執行,firstTask參數用於指定新增的線程執行的第一個任務,core參數爲true表示在新增線程時會判斷當前活動線程數是否少於corePoolSizefalse表示在新增線程時會判斷當前活動線程數是否少於maximumPoolSize

addWorker方法總體執行流程圖以下: addWorker流程圖.png

接着看下源碼:

java.util.concurrent.ThreadPoolExecutor.addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

這裏是有兩層for循環,外層循環主要是判斷線程池的狀態,若是狀態不合法就直接返回false.

只有兩種狀況屬於合法狀態:

  1. RUNNING狀態
  2. SHUTDOWN狀態時,隊列中還有未處理的任務,且提交的任務爲空。SHUTDOWN含義就是再也不接收新任務,能夠繼續處理阻塞隊列的任務。

第二層循環是經過CAS操做更新workCount數量,若是更新成功則往線程池中中添加線程,這個所謂的線程池就是一個HashSet數組。添加失敗時判斷失敗緣由,CAS失敗有兩種緣由:線程池狀態被改變或者併發狀況修改線程池中workCount數量,這兩種狀況都會致使ctl值被修改。若是是第二種緣由致使的失敗,繼續自旋更新workCount數量。

接着繼續分析循環內部的實現,先看看第一層循環:c表明線程池ctl值,rs表明線程池運行狀態。

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
    return false;

條件一:rs >= SHUTDOWN 成立, 說明當前線程池狀態不是RUNNING狀態

條件二: !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())

咱們以前提到過,建立任務有兩種狀況: 1)RUNNING狀態能夠提交任務, 2)SHUTDOWN狀態下若是傳遞的任務是空且阻塞隊列中還有任務未處理的狀況纔是容許建立任務繼續處理的,由於阻塞隊列中的任務仍然須要繼續處理。

上面的條件一和條件二就是處理SHUTDOWN狀態下任務建立操做的判斷。

接着分析第二層循環,先是判斷線程池workCount數量是否大於可建立的最大值,或者是否超過了核心線程數/最大線程數,若是是則直接返回,addWorker()操做失敗。

接着使用compareAndIncrementWorkerCount(c)將線程池中workCount+1,這裏使用的是CAS操做,若是成功則直接跳出最外層循環。

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();
    if (runStateOf(c) != rs)
        continue retry;
}

若是CAS失敗,說明此時有競爭,會從新獲取ctl的值,判斷競爭失敗的緣由是添加workCount數量仍是修改線程池狀態致使的,若是線程池狀態未發生改變,就繼續循環嘗試CAS增長workCount數量,接着看循環結束後邏輯:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive())
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (!workerStarted)
        addWorkerFailed(w);
}

這裏workerStarted表明worker是否已經啓動,workerAdded表明建立的worker是否添加到池子中,這裏所謂的池子就是全局定義的一個HashSet結構的workers變量。

接着根據傳遞的firstTask來構建一個Worker,在Worker的構造方法中也會經過ThreadFactory建立一個線程,這裏判斷t != null是由於用戶能夠自定義ThreadFactory,若是這裏用戶不是建立線程而是直接返回null則會出現一些問題,因此須要判斷一下。

w = new Worker(firstTask);
final Thread t = w.thread;

if (t != null) {

}

在往池子中添加Worker的時候,是須要先加鎖的,由於針對全局的workers操做並非線程安全的。

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

繼續看下面代碼,rs表明當前線程池的狀態,這裏仍是判斷線程池的狀態,若是rs < SHUTDOWN表明線程池狀態是RUNNING狀態,此時能夠直接操做。 若是是SHUTDOWN狀態,須要知足firstTask == null才能夠繼續操做。由於在SHUTDOWN狀態時不會再添加新的任務,但仍是能夠繼續處理workQueue中的任務。

t.isAlive() 當線程start後,線程isAlive會返回true,這裏仍是防止自定義的ThreadFactory建立線程返回給外部以前,將線程start了,因而可知Doug lea考慮問題真的很全面。

int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
    (rs == SHUTDOWN && firstTask == null)) {
    if (t.isAlive())
        throw new IllegalThreadStateException();
    workers.add(w);
}

接着將建立的Worker添加到workers集合中,設置largestPoolSize,這個屬性是線程池生命週期內線程數最大值,通常是作統計數據用的。 最後修改workerAdded = true,表明當前提交的任務所建立的Worker已經添加到池子中了。

添加worker成功後,調用線程的start()方法啓動線程,由於Worker中重寫了run()方法,最後會執行Worker.run()。最後設置workerStarted = true後釋放全局鎖。

int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
	
	workers.add(w);
	int s = workers.size();
	if (s > largestPoolSize)
        largestPoolSize = s;
        
    orkerAdded = true;
}

這裏再回頭看看workerAdded = false的情形,若是線程池在lock以前,狀態發生了變化,致使添加失敗。此時workerAdded也會爲false,最後執行addWorkerFailed(work)操做,這個方法是將Workworkers中移除掉,而後將workCount數量減一,最後執行tryTerminate()來嘗試關閉線程池,這個方法後面會細說。

runWorker()源碼分析

Worker類中的run方法調用了runWorker來執行任務。上面addWorker()方法正常的執行邏輯會建立一個Worker,而後啓動Worker中的線程,這裏其實就會執行到runWorker方法。

方法調用關係.png

runWorker的執行邏輯很簡單,啓動一個線程,執行當前傳遞的task任務,執行完後又不斷的從workQueue中獲取任務繼續執行,若是當前workCount數量小於核心線程數且隊列中沒有了任務,當前線程會被阻塞,這個就是getTask()的邏輯,一會會講到。

若是當前線程數大於核心線程數且隊列中沒有任務,就會返回null,在runWorker這邊退出循環,回收多餘的worker數據。

runWorker流程.png

源碼以下:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

這裏w.unlock()是爲了初始化當前Workstate==0,而後設置獨佔線程爲null,由於在shutDown()方法中會嘗試獲取Worker中的鎖,若是獲取成功表明當前線程沒有被加鎖處於空閒狀態,給當前線程一箇中斷信號。因此這裏在執行線程任務的時候須要加鎖,防止調用shutDown()的時候給當前worker線程一箇中斷信號。

判斷task是否爲空,若是是一個空任務,那麼就去workQueue中獲取任務,若是二者都爲空就會退出循環。

while (task != null || (task = getTask()) != null) {}

最核心的就是調用task.run()啓動當前任務,這裏面還有兩個可擴展的方法,分別是beforeExecute()/afterExecute(),咱們能夠在任務執行前和執行後分別自定義一些操做,其中afterExecute()能夠接收到任務拋出的異常信息,方便咱們作後續處理。

while (task != null || (task = getTask()) != null) {
    try {
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
            task.run();
        } catch (RuntimeException x) {
            thrown = x; throw x;
        } catch (Error x) {
            thrown = x; throw x;
        } catch (Throwable x) {
            thrown = x; throw new Error(x);
        } finally {
            afterExecute(task, thrown);
        }
    } finally {
        task = null;
        w.completedTasks++;
        w.unlock();
    }
}

若是退出循環,說明getTask()方法返回null。會執行到finally中的processWorkerExit(w, completedAbruptly)方法,此方法是用來清理線程池中添加的work數據,completedAbruptly=true表明是異常狀況下退出。

try {
    while (task != null || (task = getTask()) != null) {
        
    }
    completedAbruptly = false;
} finally {
    processWorkerExit(w, completedAbruptly);
}

runWorker()中只是啓動了當前線程工做,還須要源源不斷經過getTask()方法從workQueue來獲取任務執行。在workQueue沒有任務的時候,根據線程池workCount和核心線程數的對比結果來使用processWorkerExit()執行清理工做。

getTask()源碼分析

getTask方法用於從阻塞隊列中獲取任務,若是當前線程小於核心線程,那麼當阻塞隊列中沒有任務時就會阻塞,反之會等待keepAliveTime後返回。

這個就是keepAliveTime的使用含義:非核心的空閒線程等待新任務的時間,固然若是這裏設置了allowCoreThreadTimeOut=true也會回收核心線程。

具體代碼以下:

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

這裏核心代碼就是從workQueue中取任務,採用poll仍是take取決於allowCoreThreadTimeOut和線程數量,allowCoreThreadTimeOut在構造ThreadLocalExecutor後設置的,默認爲false。若是設置爲true則表明核心線程數下的線程也是能夠被回收的。若是使用take則代表workQueue中沒有任務當前線程就會被阻塞掛起,直到有了新的任務纔會被喚醒。

workQueue數據取出流程.png

在這裏擴展下阻塞隊列的部分方法的含義,這裏主要是看poll()take()的使用區別: 阻塞隊列插入方法:

boolean add(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,拋出異常 java.lang.IllegalStateException: Queue full。 boolean offer(E e):隊列沒有滿,則插入數據並返回true;隊列滿時,返回false。 void put(E e):隊列沒有滿,則插入數據;隊列滿時,阻塞調用此方法線程,直到隊列有空閒空間時此線程進入就緒狀態。 boolean offer(E e, long timeout, TimeUnit unit):隊列沒有滿,插入數據並返回true;隊列滿時,阻塞調用此方法線程,若指定等待的時間內還不能往隊列中插入數據,返回false。

阻塞隊列移除(獲取)方法:

E remove():隊列非空,則以FIFO原則移除數據,並返回該數據的值;隊列爲空,拋出異常 java.util.NoSuchElementException。

E poll(): 隊列非空,移除數據,並返回該數據的值;隊列爲空,返回null。

E take(): 隊列非空,移除數據,並返回該數據的值;隊列爲空,阻塞調用此方法線程,直到隊列爲非空時此線程進入就緒狀態。

E poll(long timeout, TimeUnit unit):隊列非空,移除數據,並返回該數據的值;隊列爲空,阻塞調用此方法線程,若指定等待的時間內隊列都沒有數據可取,返回null。

阻塞隊列檢查方法:

E element(): 隊列非空,則返回隊首元素;隊列爲空,拋出異常 java.util.NoSuchElementException。 E peek(): 隊列非空,則返回隊首元素;隊列爲空,返回null。

processWorkerExit()源碼分析

此方法的含義是清理當前線程,從線程池中移除掉剛剛添加的worker對象。

processWorkerExit執行前置條件.png

執行processWorkerExit()表明在runWorker()線程跳出了當前循環,通常有兩種狀況:

  1. task.run()內部拋出異常,直接結束循環,而後執行processWorkerExit()
  2. getTask()返回爲空,表明線程數量大於核心數量且workQueue中沒有任務,此時須要執行processWorkerExit()來清理多餘的Worker對象
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly)、
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return;
        }
        addWorker(null, false);
    }
}

針對於線程池workers的操做都會進行加鎖處理,而後將當前Worker從池子中移除,累加當前線程池完成的任務總數completedTaskCount

接着調用tryTerminate()嘗試關閉線程池,這個方法後面有詳細說明。

接着判斷if (runStateLessThan(c, STOP)) {},含義是當前線程池狀態小於STOP,即當前線程池狀態當前線程池狀態爲 RUNNINGSHUTDOWN,判斷當前線程是不是正常退出。若是當前線程是正常退出,那麼completedAbruptly=false,接着判斷線程池中是否還擁有足夠多的的線程,由於異常退出可能致使線程池中線程數量不足,此時就要執行addWorker()爲線程池添加新的worker數據,看下面的詳細分析:

執行最後的addWorke()有三種可能: 1)當前線程在執行task時 發生異常,這裏必定要建立一個新worker頂上去。 2)若是!workQueue.isEmpty()說明任務隊列中還有任務,這種狀況下最起碼要留一個線程,由於當前狀態爲 RUNNING || SHUTDOWN這是前提條件。 3)當前線程數量 < corePoolSize值,此時會建立線程,維護線程池數量在corePoolSize個水平。

tryTerminate()源碼分析

上面移除Worker的方法中有一個tryTerminate()方法的調用,這個方法是根據線程池狀態嘗試關閉線程池。

執行流程以下:

tryTerminate()流程.png

實現源碼以下:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) {
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

首先是判斷線程池狀態: 條件一:isRunning(c) 成立,直接返回就行,線程池很正常! 條件二:runStateAtLeast(c, TIDYING) 說明 已經有其它線程 在執行 TIDYING -> TERMINATED狀態了,當前線程直接回去。 條件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()) SHUTDOWN特殊狀況,若是是這種狀況,直接回去。得等隊列中的任務處理完畢後,再轉化狀態。

接着執行:

if (workerCountOf(c) != 0) {
    interruptIdleWorkers(ONLY_ONE);
    return;
}

走到這個邏輯,說明線程池狀態 >= STOP或者線程池狀態爲SHUTDOWN且隊列已經空了

當前線程池中的線程數量 > 0,調用interruptIdleWorkers()中斷一個空閒線程,而後返回。咱們來分析下,在getTask()返回爲空時會執行退出邏輯processWorkerExit(),這裏就會調用tryTerminate()方法嘗試關閉線程池。

若是此時線程池狀態知足線程池狀態 >= STOP或者線程池狀態爲SHUTDOWN且隊列已經空了,若是此時線程池中線程數不爲0,就會中斷一個空閒線程。 爲何這裏只中斷一個線程呢?這裏的設計思想是,若是線程數量特別多的話,只有一個線程去作喚醒空閒worker的任務可能會比較吃力,因此,就給了每一個 被喚醒的worker線程 ,在真正退出以前協助 喚醒一個空閒線程的任務,提供吞吐量的一種經常使用手段。

咱們順便看下interruptIdleWorkers()源碼:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

遍歷workers,若是線程是空閒狀態**(空閒狀態:queue.take()和queue.poll()返回空)**,則給其一箇中斷信號,若是是處於workQueue阻塞的線程,會被喚醒,喚醒後,進入下一次自旋時,可能會return null執行退出相關的邏輯,接着又會調用processWorkerExit()->tryTerminate(),回到上面場景,當前線程退出的時候仍是會繼續喚醒下一個空現線程。

接着往下看tryTerminate的剩餘邏輯:

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
            terminated();
        } finally {
            ctl.set(ctlOf(TERMINATED, 0));
            termination.signalAll();
        }
        return;
    }
} finally {
    mainLock.unlock();
}

執行到這裏的線程是誰? workerCountOf(c) == 0 時,會來到這裏。 最後一個退出的線程。 在 (線程池狀態 >= STOP || 線程池狀態爲 SHUTDOWN 且 隊列已經空了) 線程喚醒後,都會執行退出邏輯,退出過程當中 會 先將 workerCount計數 -1 => ctl -1。 調用tryTerminate 方法以前,已經減過了,因此0時,表示這是最後一個退出的線程了。

獲取全局鎖,進行加鎖操做,經過CAS設置線程池狀態爲TIDYING狀態,設置成功則執行terminated()方法,這也是一個自定義擴展的方法,當線程池停止的時候會調用此方法。

最後設置線程池狀態爲TERMINATED狀態,喚醒調用awaitTermination()方法的線程。

awaitTermination()源碼分析

該方法是判斷線程池狀態是否達到TERMINATED,若是達到了則直接返回true,沒有達到則會await掛起當前線程指定的時間。

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

在每次執行tryTerminate()後會喚醒全部被await的線程,繼續判斷線程池狀態。

shutDown()/shutDownNow()源碼分析

shutDownshutDown()方法都是直接改變線程池狀態的方法,通常咱們在系統關閉以前會調用此方法優雅的關閉線程池。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownshutdownNow方法調用差很少,只是shutdown是將線程池狀態設置爲SHUTDOWNshutdownNow是將線程池狀態設置爲STOPshutdownNow會返回全部未處理的task集合。

來看看它們共同調用的一些方法:

private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

這個方法是設置線程池狀態爲指定狀態,runStateAtLeast(c, targetState),判斷當前線程池ctl值,若是小於targetState則會日後執行。 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))),經過CAS指令,修改ctl中線程池狀態爲傳入的targetState

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers含義是爲空閒的線程設置中斷標識,這裏要清楚worker何時空閒?咱們在上面講解runWorker()方法時,執行task.run()以前,要針對Worker對象加鎖,設置Worker中的state值爲1,防止運行的worker被添加中斷標識。接着執行getTask()方法,獲取阻塞隊列中的任務,若是是queue.take()則會阻塞掛起當前線程,釋放鎖,此時線程處於空閒狀態。若是是queue.pool()返回爲空,runWorker()會釋放鎖,此時線程也是空閒狀態。

執行interrupt()後處於queue阻塞的線程,會被喚醒,喚醒後,進入下一次自旋判斷線程池狀態是否改變,若是改變可能直接返回空,這裏具體參看runWorker()getTask()方法。

onShutdown()也是一個擴展方法,須要子類去重寫,這裏表明當線程池關閉後須要作的事情。drainQueue()方法是獲取workQueue中現有的的任務列表。

問題回顧

  1. ThreadPoolExecutor中經常使用參數有哪些? 上面介紹過了,參見的參數是指ThreadPoolExecutor的構造參數,通常面試的時候都會先問這個,要解釋每一個參數的含義及做用。

  2. ThreadPoolExecutor中線程池狀態和線程數量如何存儲的? 經過AtomicInteger類型的變量ctl來存儲,前3位表明線程池狀態,後29位表明線程池中線程數量。

  3. ThreadPoolExecutor有哪些狀態,狀態之間流轉是什麼樣子的? RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 線程池的狀態流轉.png

  4. ThreadPoolExecutor任務處理策略? 這個問題就是考察execute()的執行過程,只要看過源碼就不會有問題。 執行流程.png

  5. ThreadPoolExecutor經常使用的拒絕策略有哪些? 策略處理該任務,線程池提供了4種策略: 1)AbortPolicy:直接拋出異常,默認策略 2)CallerRunsPolicy:用調用者所在的線程來執行任務 3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務 4)DiscardPolicy:直接丟棄任務 固然線程池是支持自定義拒絕策略的,須要實現RejectedExecutionHandler接口中rejectedExecution()方法便可。

  6. Executors工具類提供的線程池有哪些?有哪些缺陷? 1) FixedThreadPool 和 SingleThreadPool:容許的請求隊列長度爲 Integer.MAX_VALUE,可能會堆積大量的請求,從而致使 OOM。 2) CachedThreadPool:容許的建立線程數量爲 Integer.MAX_VALUE,可能會建立大量的線程,從而致使 OOM。 因此阿里巴巴也建議咱們要自定義線程池核心線程數以及阻塞隊列的長度。

  7. ThreadPoolExecutor核心線程池中線程預熱功能? 在建立線程池後,可使用prestartAllCoreThreads()來預熱核心線程池。

    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }
  8. ThreadPoolExecutor中建立的線程如何被複用的? 這個主要是看runWorker()和getTask()兩個方法的執行流程,當執行任務時調用runWorker()方法,執行完成後會繼續從workQueue中獲取任務繼續執行,已達到線程複用的效果,固然這裏還有一些細節,能夠回頭看看上面的源碼解析。

  9. ThreadPoolExecutor中關閉線程池的方法shutdownshutdownNow的區別? 最大的區別就是shutdown()會將線程池狀態變爲SHUTDOWN,此時新任務不能被提交,workQueue中還存有的任務能夠繼續執行,同時會像線程池中空閒的狀態發出中斷信號。 shutdownNow()方法是將線程池的狀態設置爲STOP,此時新任務不能被提交,線程池中全部線程都會收到中斷的信號。若是線程處於wait狀態,那麼中斷狀態會被清除,同時拋出InterruptedException。

  10. ThreadPoolExecutor中存在的一些擴展點? 鉤子方法: 1)beforeExecute()/afterExecute():runWorker()中線程執行前和執行後會調用的鉤子方法 2)terminated:線程池的狀態從TIDYING狀態流轉爲TERMINATED狀態時terminated方法會被調用的鉤子方法。 3)onShutdown:當咱們執行shutdown()方法時預留的鉤子方法。

  11. ThreadPoolExecutor支持動態調整核心線程數、最大線程數、隊列長度等一些列參數嗎?怎麼操做? 運行期間可動態調整參數的方法: 1)setCorePoolSize():動態調整線程池核心線程數 2)setMaximumPoolSize():動態調整線程池最大線程數 3)setKeepAliveTime(): 空閒線程存活時間,若是設置了allowsCoreThreadTimeOut=true,核心線程也會被回收,默認只回收非核心線程 4)allowsCoreThreadTimeOut():是否容許回收核心線程,若是是true,在getTask()方法中,獲取workQueue就採用workQueue.poll(keepAliveTime),若是超過等待時間就會被回收。

總結

這篇線程池源碼覆蓋到了ThreadPoolExecutor中大部分代碼,我相信認真閱讀完後確定會對線程池有更深入的理解。若有疑問或者建議可關注公衆號給我私信,我都會一一爲你們解答。

另外推薦一個個人up主朋友,他本身錄製了好多學習視頻並分享在B站上了,你們有時間能夠看一下(PS:非恰飯非利益相關,良心推薦):小劉講源碼-B站UP主

歡迎關注: 原創乾貨分享.png

相關文章
相關標籤/搜索