線程是一種資源,並非只存在程序的世界裏。程序,只是對生活的一種抽象表述。html
好比車站的售票窗口、退票窗口、檢票窗口,每一個窗口都在作不一樣的事情,就是同時在運行的不一樣線程。java
線程多了,須要管理,不一樣的線程也要能保證不會互相干擾,各作各的。segmentfault
線程像人的能量,線程池比如人能量的總場,這個比喻可能不太恰當,由於人同時作好幾件事情時,哪怕看似「同時」的,也是分心作的,是本身的CPU在不斷切換時間片,那只是「併發」,而不是「並行」。bash
public class ThreadPoolExecutor extends AbstractExecutorService {
/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed 複製代碼
// Android-added: @ReachabilitySensitive
@ReachabilitySensitive
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
線程池用一個32位的int來同時保存runState和workerCount,其中高3位(第31到29位)是runState,其他29位是workerCount(大約500 million)。併發
來個圖看看存儲結構dom
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
corePoolSize 核心線程數,比如班幹部的人數。ide
maximumPoolSize 最大線程數,比如教室裏的座位數。 當提交任務數超過了這個最大值,線程還有拒絕策略——RejectExecutionHandler,作不動了嘛。函數
keepAliveTime 除核心線程外的空閒線程保持存活時間。 當線程池裏線程數超過corePoolSize數量了,keepAliveTime時間到,就把空閒線程關了,否則也閒置了呀,節省能量嘛。工具
workQueue 經過workQueue,線程池實現了阻塞功能。 當線程池中的線程數超過它的corePoolSize的時候,線程會進入阻塞隊列進行阻塞等待。oop
threadFactory 建立線程的工廠。 全部的線程都經過這個Factory建立(經過addWorker方法)。
handler 線程池的飽和策略。
更多被調用的構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
複製代碼
SynchronousQueue 直接提交。直接提交任務給線程,而不是保存它們。
LinkedBlockingQueue 無界隊列。 當核心線程都在忙的時候,用一個無界隊列存放提交的任務。 線程數不會超過核心線程數,maximumPoolSize設置也無效。
ArrayBlockingQueue 有界隊列。 防止資源被消耗完,隊列也是有上限的。
Executors.defaultThreadFactory()是Executors靜態工廠裏默認的threadFactory。 後面再詳細說。
線程池建立線程時,會將線程封裝成工做線程Worker,就是在線程池裏幹活的人。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
複製代碼
既實現了Runnable,又繼承了AbstractQueuedSynchronizer(AQS),因此其既是一個可執行的任務,又能夠達到鎖的效果。
Worker和Task的區別: Worker是線程池中的線程,而Task雖然是runnable,可是並無真正執行,只是被Worker調用了run方法。
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
複製代碼
先來看看整體流程圖~~
注意點:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 若是當前正在運行的線程數 < corePoolSize,嘗試用給到的command來啓動一個新線程做爲第一個任務。 * 調用addWorker方法,檢查runState和workerCount, * 而且若是增長線程的話,能防止產生錯誤警報,若是不能增長線程,則返回false。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 若是一個任務被成功地加到隊列裏,仍然須要雙重檢驗來確認是否須要新建一個線程 *(由於可能在上一次檢查後,已經存在的線程已經died)或者進入這個方法後,線程池已經被關閉了。 * 因此咱們須要再次檢查state,若是線程池中止了須要回滾入隊列,若是池中沒有線程了,新建一個線程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 若是不能把任務加入隊列(可能線程池已經關閉或者滿了),那麼須要新開一個線程(往maxPoolSize發展)。 * 若是失敗的話,說明線程池shutdown了或者滿了,就要拒絕這個任務了。 */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/** * 二、若是線程池RUNNING狀態,且入隊列成功 */
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//若是再次校驗過程當中,線程池不是RUNNING狀態,
// 而且remove(command)--workQueue.remove()成功,拒絕當前command
if (! isRunning(recheck) && remove(command))
reject(command);
//爲何只檢查運行的worker數量是否是0呢?? 爲何不和corePoolSize比較呢??
// 只保證有一個worker線程能夠從queue中獲取任務執行就好了??
// 由於只要還有活動的worker線程,就能夠消費workerQueue中的任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/** * 三、若是線程池不是running狀態 或者 沒法入隊列 * 嘗試開啓新線程,擴容至maxPoolSize, * 若是addWork(command, false) 失敗了,拒絕當前command */
else if (!addWorker(command, false))
reject(command);
}
複製代碼
看註解有點費勁,把execute方法裏的註釋單獨列出來,一步步說得很清楚:
若是當前正在運行的線程數 < corePoolSize,嘗試用給到的command來啓動一個新線程做爲第一個任務。 調用addWorker方法,檢查runState和workerCount,而且若是增長線程的話,能防止產生錯誤警報,若是不能增長線程,則返回false。
若是一個任務被成功地加到隊列裏,仍然須要雙重檢驗來確認是否須要新建一個線程。 (由於可能在上一次檢查後,已經存在的線程已經died)或者進入這個方法後,線程池已經被關閉了。因此咱們須要再次檢查state,若是線程池中止了須要回滾入隊列,若是池中沒有線程了,新建一個線程。
若是不能把任務加入隊列(可能線程池已經關閉或者滿了),那麼須要新開一個線程(往maxPoolSize發展)。若是失敗的話,說明線程池shutdown了或者滿了,就要拒絕這個任務了。
本身畫一個:
好像仍是很複雜,再簡單點:
總的來講,就是:
它是一個Java中的工具類。提供工廠方法來建立不一樣類型的線程池。 用它能夠很方便地建立出下面幾種線程池來。
經常使用線程池 | 特色 | 適應場景 |
---|---|---|
newSingleThreadExecutor | 單線程的線程池 | 用於須要保證順序執行的場景,而且只有一個線程在執行 |
newFixedThreadPool | 固定大小的線程池 | 用於已知併發壓力的狀況下,對線程數作限制。 |
newCachedThreadPool | 能夠無限擴大的線程池 | 比較適合處理執行時間比較小的任務。 |
newScheduledThreadPool | 能夠延時啓動,定時啓動的線程池 | 適用於須要多個後臺線程執行週期任務的場景。 |
newWorkStealingPool | 擁有多個任務隊列的線程池 | 能夠減小鏈接數,建立當前可用cpu數量的線程來並行執行。 |
好比這樣建立:
ExecutorService singleService = Executors.newSingleThreadExecutor();
ExecutorService fixedService = Executors.newFixedThreadPool(9);
ExecutorService cacheService = Executors.newCacheThreadPool();
複製代碼
或者經過ThreadPoolExecutor的構造函數自定義須要的線程池。
先寫到這裏~