ThreadPoolExecutor算是JUC中最經常使用的類之一了。ThreadPoolExecutor,顧名思義,thread-pool-executor,硬翻譯就是「線程-池-執行者」;java中,經過ThreadPoolExecutor能夠很容易的建立一個線程池。可是咱們爲何要使用線程池?呢?它可以帶來什麼樣的優點呢?它又是怎麼實現的呢?OK,帶着這幾個問題,咱們來學習一下JAVA中的線程池技術。java
關於這個問題其實有點雞肋,我以爲再問這個問題以前更應該問爲何要有線程池。那爲何呢?數組
this is a 例子:bash
快遞行業最近兩年發展的灰常火熱,據說工資也很是的高,搞得我一每天的都沒有心思去好好寫代碼了...
微信
以前的小快遞公司都是沒有固定的快遞員的,就是說,每次去送一件快遞,站點負責人就須要去找一我的來幫忙送,送完以後就沒有而後了(固然,錢仍是要給的)。
框架
可是後來隨着貨愈來愈多,找人給錢成本太大,並且農忙時還須要花很長時間去找人,因此就僱用了5我的,簽了合同,長期爲站點配送。
ide
之前都是隨時用隨時找,如今不是,如今是成立了一個物流公司,開了一個配送部,配送部門規定正式配送員最多隻能有五我的。函數
以前配送的缺點是什麼:oop
成立配送部以後解決的問題post
OK,咱們以上述例子來對應理解線程池的基本原理學習
先來看下,JAVA對ThreadPoolExecutor的類申明:
public class ThreadPoolExecutor extends AbstractExecutorService 複製代碼
在【初識】-JUC·Executor框架中給出了Executor的繼承體系。ThreadPoolExecutor就是具有線程池功能的集成者。
//構造方法一
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//構造方法二
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
//構造方法三
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
//構造方法四
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
從上面的代碼能夠看出,構造方法(1、2、三)都是經過調用(四)來作具體屬性初始化的。那麼咱們直接來看構造方法四;在構造方法四中總共須要7個參數,先來看下每一個參數的具體含義:
corePoolSize
核心線程數大小。那麼什麼是核心線程數呢,咱們能夠類比於上面例子中的配送部中籤定勞動合同的人的個數。
maximumPoolSize
最大線程數。加入說如今是雙十一期間,快遞異常的多,配送部的5我的徹底忙不過來,並且倉庫也滿了,怎麼辦呢?這個時候就須要再招聘一些臨時配送員,假設maximumPoolSize爲10,那麼也就是說,臨時招聘能夠招5我的,配送部簽定正式勞動合同的人和簽定臨時合同的人加一塊不能超過配送部規定的最大人數(10人)。因此說,maximumPoolSize就是線程池可以容許的存在的最大線程的數量。
keepAliveTime
存活時間。爲何要有這個呢?想一下,雙十一過去了,貨物已經配送的差很少了。臨時合同寫的是若是臨時配送員2天沒有配送了,那配送部就有權利終止臨時合同,如今已經達到2天這個點了,須要開除這些臨時配送專員了。對於線程池來講,keepAliveTime就是用來表示,當除核心線程池以外的線程超過keepAliveTime時間以後,就須要被系統回收了。
unit
keepAliveTime的時間單位。
workQueue
工做隊列。這個就至關於一個倉庫,如今配送部5我的都在配送,可是還不斷的有新的快遞達到,這個時候就須要一個倉庫來存放這些快遞。對於線程池來講,當核心線程都有本身的任務處理,而且還有任務進來的時候,就會將任務添加到工做隊列中去。
threadFactory
線程工廠。就是用來建立線程的。能夠類比成招聘組,會給每一個線程分配名字或者編號這樣。
handler
RejectedExecutionHandler 用來描述拒絕策略的。假設如今個人倉庫也知足,而且配送部已經達到10我的了。怎麼辦呢,那麼只能採用一些策略來拒絕任務了。
// runState is stored in the high-order bits
//RUNNING;該狀態的線程池接收新任務,而且處理阻塞隊列中的任務
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN;該狀態的線程池不接收新任務,但會處理阻塞隊列中的任務;
private static final int SHUTDOWN = 0 << COUNT_BITS;
//STOP;不接收新任務,也不處理阻塞隊列中的任務,而且會中斷正在運行的任務;
private static final int STOP = 1 << COUNT_BITS;
//全部的任務已終止,ctl記錄的」任務數量」爲0,線程池會變爲TIDYING狀態
private static final int TIDYING = 2 << COUNT_BITS;
//線程池完全終止,就變成TERMINATED狀態。
private static final int TERMINATED = 3 << COUNT_BITS;
複製代碼
下面是在網上發現的一位大牛的圖;感受能夠較爲直觀的描述狀態的變動
有幾個點須要注意。
public void execute(Runnable command) {
//任務爲null,直接拋出空指針異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//若是線程數大於等於基本線程數,將任務加入隊列
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
複製代碼
經過實現這個接口去建立一個新的線程
public interface ThreadFactory {
Thread newThread(Runnable r);
}
複製代碼
經過addWorker方法來添加,其實在excute中只是做爲一個提交任務的入口,實際的處理邏輯都是在addWorker這個方法裏來完成的。addWorker有兩個參數:
先來看下這個方法的前半部分:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//自旋方式
for (;;) {
//獲取當前線程池的狀態
int c = ctl.get();
int rs = runStateOf(c);
//若是狀態是STOP,TIDYING,TERMINATED狀態的話,則會返回false
//若是狀態是SHUTDOWN,可是firstTask不爲空或者workQueue爲空的話,那麼直接返回false。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//經過自旋的方式,判斷要添加的worker是否爲corePool範疇以內的
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
複製代碼
//若是超過CAPACITY限制了則直接返回false
wc >= CAPACITY
複製代碼
//判斷當前的workerCount是否大於corePoolsize,不然則判斷是否大於maximumPoolSize //具體的比較取決於入參core是true仍是false。
wc >= (core ? corePoolSize : maximumPoolSize)
複製代碼
若是上面兩個有一個知足了,則直接返回false。
下面是判斷WorkerCount經過CAS操做增長1是否成功,成功的話就到此結束
if (compareAndIncrementWorkerCount(c))
break retry;
複製代碼
若是不成功,則再次判斷當前線程池的狀態,若是如今獲取到的狀態與進入自旋的狀態不一致的話,那麼則經過continue retry從新進行狀態的判斷。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
複製代碼
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
//建立一個新的Worker對象
w = new Worker(firstTask);
final Thread t = w.thread;
//
if (t != null) {
//加鎖
mainLock.lock();
try {
// 在鎖定的狀況下從新檢查。
// 在一下狀況退出:ThreadFactory 建立失敗或者在獲取鎖以前shut down了
int c = ctl.get();
int rs = runStateOf(c);
//狀態校驗
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 預先檢查t是能夠啓動的
throw new IllegalThreadStateException();
//添加至workers中
workers.add(w);
int s = workers.size();
//若是超過了歷史最大線程數,則將當前池數量設置爲歷史最大線程記錄數
if (s > largestPoolSize)
largestPoolSize = s;
//標識添加工做線程成功
workerAdded = true;
}
} finally {
//解鎖
mainLock.unlock();
}
//若是添加成功則啓動當前工做線程
if (workerAdded) {
t.start();
//並將當前線程狀態設置爲已啓動
workerStarted = true;
}
}
} finally {
//添加失敗
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
固然咱們也能夠自定義拒絕策略。
一、ArrayBlockingQueue
基於數組的阻塞隊列,長度有限
二、LinkedBlockingQuene
基於鏈表的阻塞隊列,長度無限,使用這個可能會致使咱們的拒絕策略失效。由於能夠無限的建立新的工做線程。
三、PriorityBlockingQueue
具備優先級的無界阻塞隊列;
三、SynchronousQuene
SynchronousQuene是一個是一個不存儲元素的BlockingQueue;每個put操做必需要等待一個take操做,不然不能繼續添加元素。因此這個比較特殊,它不存咱們的任務,也就說說它的每一個put操做必須等到另外一個線程調用take操做,不然put操做一直處於阻塞狀態。
這個是ThreadPoolExecutor的一個內部類,表示一個工做線程。重要的是這個內部類實現了AbstractQueuedSynchronizer(AQS:抽象隊列同步器)抽象類。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
private static final long serialVersionUID = 6138294804551838833L;
/** 當前work持有的線程 */
final Thread thread;
/** 運行的初始任務。 可能爲空。*/
Runnable firstTask;
/** 每一個線程完成任務的計數器 */
volatile long completedTasks;
/** * 構造函數 */
Worker(Runnable firstTask) {
// 禁止中斷,直到runWorker
setState(-1);
//想提交的任務交給當前工做線程
this.firstTask = firstTask;
//經過線程工廠建立一個新的線程
this.thread = getThreadFactory().newThread(this);
}
/** 將run方法的執行委託給外部runWorker */
public void run() {
runWorker(this);
}
// 是否鎖定
//
// 0表明解鎖狀態。
// 1表明鎖定狀態。
protected boolean isHeldExclusively() {
return getState() != 0;
}
//嘗試獲取鎖(重寫AQS的方法)
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//嘗試釋放鎖(重寫AQS的方法)
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//加鎖
public void lock() { acquire(1); }
//嘗試加鎖
public boolean tryLock() { return tryAcquire(1); }
//解鎖
public void unlock() { release(1); }
//是否鎖定
public boolean isLocked() { return isHeldExclusively(); }
//若是啓動則中斷
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
複製代碼
最後來看下runWorker這個方法(ThreadPoolExecutor中的方法):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
下面是對註釋的蹩腳翻譯,歡迎吐槽,但注意尺度,O(∩_∩)O哈哈~
主要工做循環運行。重複地從隊列中獲取任務並執行它們,同時處理一些問題:
異常機制的最終效果是afterExecute和線程的UncaughtExceptionHandler擁有關於用戶代碼遇到的任何問題的準確信息。
本文是JUC的第二篇,意在經過查看源碼來了解線程池的具體工做原理。文中若是存在不當的描述,但願小夥伴們可以及時提出。灰常感謝!
歡迎關注微信公衆號,乾貨滿滿哦~