1.下降系統資源消耗 java
2.提升線程可控性。緩存
1.建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。安全
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(
nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); } 複製代碼
2.jdk8新增的,會根據所需的併發數來動態建立和關閉線程。能合理的使用CPU進行對任務進行併發操做,因此適合使用在很耗時的任務。bash
注意返回的是ForkJoinPool 併發
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool (
parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
} 複製代碼
什麼是ForkJoinPool?
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler, boolean asyncMode) {
this(checkParallelism(parallelism), checkFactory(factory), handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } 複製代碼
使用一個無限隊列來保存所須要執行的任務,能夠傳入線程的數量;不傳入,則默認使用當前計算機中可用的CPU數量;使用分治法來解決問題,使用 fork()和join()來進行調用。框架
3.建立一個可緩存的線程池,可靈活回收空閒線程,若無可回收,則新建線程。 async
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 複製代碼
4.建立一個單線程的線程池。 oop
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }複製代碼
5.建立一個定長線程池,支持定時及週期性任務執行。源碼分析
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize); } 複製代碼
Executor結構:
ui
Executor【接口】 一個運行新任務的簡單接口
public interface Executor{
void execute(Runnable command);
} 複製代碼
ExecutorService【接口】 擴展了Executor接口。
添加了一些用來管理執行生命週期和任務生命週期的方法。
AbstractExecutorService【抽象類】
對ExecutorService接口的抽象類實現。不是咱們分析的重點。 複製代碼
ThreadPoolExecutor【類】
Java線程池的核心實現 複製代碼
屬性解釋:
// AtomicInteger是原子類 ctlOf()返回值爲RUNNING;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 高3位表示線程狀態private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位表示workerCount容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 能接收任務且能處理阻塞隊列中的任務private static final int RUNNING = -1 << COUNT_BITS;// 不能接收新任務,但能夠處理隊列中的任務。private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務,不處理隊列任務。private static final int STOP = 1 << COUNT_BITS;// 全部任務都終止private static final int TIDYING = 2 << COUNT_BITS;// 什麼都不作private static final int TERMINATED = 3 << COUNT_BITS;// 存聽任務的阻塞隊列private final BlockingQueue<Runnable> workQueue;複製代碼
值得注意的是狀態值越大線程越不活躍。
線程池狀態的轉換模型:
構造器:
public ThreadPoolExecutor(int corePoolSize,//線程池初始啓動時線程的數量 int maximumPoolSize,//最大線程數量 long keepAliveTime,//空閒線程多久關閉? TimeUnit unit,// 計時單位 BlockingQueue<Runnable> workQueue,//聽任務的阻塞隊列 ThreadFactory threadFactory,//線程工廠 RejectedExecutionHandler handler// 拒絕策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}複製代碼
在向線程池提交任務時,會經過兩個方法:execute和submit。
【本次只講execute方法,submit、Future、Callable一塊兒講】
execute方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // clt記錄着runState和workerCount int c = ctl.get(); //workerCountOf方法取出低29位的值,表示當前活動的線程數 //而後拿線程數和 核心線程數作比較 if (workerCountOf(c) < corePoolSize) { // 若是活動線程數<核心線程數 // 添加到 //addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷 if (addWorker(command, true)) // 若是成功則返回 return; // 若是失敗則從新獲取 runState和 workerCount c = ctl.get(); } // 若是當前線程池是運行狀態而且任務添加到隊列成功 if (isRunning(c) && workQueue.offer(command)) { // 從新獲取 runState和 workerCount int recheck = ctl.get(); // 若是不是運行狀態而且 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) //第一個參數爲null,表示在線程池中建立一個線程,但不去啓動 // 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize addWorker(null, false); } //再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize else if (!addWorker(command, false)) //若是失敗則拒絕該任務 reject(command);}複製代碼
總結一下它的工做流程:
經過上面的execute方法能夠看出,最主要的邏輯仍是在addWorker方法中實現的。
addWorker方法:
主要工做就是在線程池中建立一個新的線程並執行。複製代碼
參數定義:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取運行狀態 int rs = runStateOf(c); // Check if queue empty only if necessary. // 若是狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務) // 而且 若是 !(rs爲SHUTDOWN 且 firsTask爲空 且 阻塞隊列不爲空) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 返回false return false; for (;;) { //獲取線程數wc int wc = workerCountOf(c); // 若是wc大與容量 || core若是爲true表示根據corePoolSize來比較,不然爲maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增長workerCount(原子操做) if (compareAndIncrementWorkerCount(c)) // 若是增長成功,則跳出 break retry; // wc增長失敗,則再次獲取runState c = ctl.get(); // Re-read ctl // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回從新執行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根據firstTask來建立Worker對象 w = new Worker(firstTask); // 根據worker建立一個線程 final Thread t = w.thread; if (t != null) { // new一個鎖 final ReentrantLock mainLock = this.mainLock; // 加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 獲取runState int rs = runStateOf(ctl.get()); // 若是rs小於SHUTDOWN(處於運行)或者(rs=SHUTDOWN && firstTask == null) // firstTask == null證實只新建線程而不執行任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 若是t活着就拋異常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 不然加入worker(HashSet) //workers包含池中的全部工做線程。僅在持有mainLock時訪問。 workers.add(w); // 獲取工做線程數量 int s = workers.size(); //largestPoolSize記錄着線程池中出現過的最大線程數量 if (s > largestPoolSize) // 若是 s比它還要大,則將s賦值給它 largestPoolSize = s; // worker的添加工做狀態改成true workerAdded = true; } } finally { mainLock.unlock(); } // 若是worker的添加工做完成 if (workerAdded) { // 啓動線程 t.start(); // 修改線程啓動狀態 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } // 返回線啓動狀態 return workerStarted;複製代碼
爲何須要持有mainLock?
由於workers是HashSet類型的,不能保證線程安全。
那w = new Worker(firstTask);如何理解呢
Work.java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable複製代碼
能夠看到它集成AQS併發框架還發現了Runnable。證實它仍是一個線程任務類。那咱們調用t.start()事實上就是調用了該類重寫的run方法。
Worker爲何使用ReentrantLock來實現呢?
tryAcquire方法它是不容許重入的,而ReentrantLock是容許重入的。對於線程來講,若是線程正在執行是不容許其餘鎖重入進來的。
線程只須要兩個狀態,一個是獨佔鎖,代表是正在執行任務;一個是不加鎖,代表是空閒狀態。
public void run(){
runnWorker(this);
}複製代碼
run方法有調用了runnWorker方法:
final void runWorker(Worker w) { // 拿到當前線程 Thread wt = Thread.currentThread(); // 拿到當前任務 Runnable task = w.firstTask; // 將Worker.firstTask置空 而且釋放鎖 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 若是task或者getTask不爲空,則一直循環 while (task != null || (task = getTask()) != null) { // 加鎖 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // return ctl.get() >= stop // 若是線程池狀態>=STOP 或者 (線程中斷且線程池狀態>=STOP)且當前線程沒有中斷 // 其實就是保證兩點: // 1. 線程池沒有中止 // 2. 保證線程沒有中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中斷當前線程 wt.interrupt(); try { // 空方法 beforeExecute(wt, task); Throwable thrown = null; try { // 執行run方法(Runable對象) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // 執行完後, 將task置空, 完成任務++, 釋放鎖 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 退出工做 processWorkerExit(w, completedAbruptly); }複製代碼
總結一下runWorker方法的執行流程:
這個流程圖很是經典:
除此以外,ThreadPoolExecutor還提供了tryAcquire、tryRelease、shutdown、shutdownNow、tryTerminate、等涉及的一系列線程狀態更改的方法。
在runWorker方法中,爲何要在執行任務的時候對每一個工做線程都加鎖呢?
shutdown方法與getTask方法存在競爭條件。