在阿里巴巴Java開發手冊中有這麼兩段話,以下圖所示java
能夠看到提到的兩點,第一要求不能顯示的建立線程,也就是new Thread的這種形式,須要使用線程池對線程進行管理,第二不容許使用官方提供的四種線程池,而是須要經過自行建立的方式去建立線程池,更加理解線程池的容許規則安全
本文就基於JDK1.8的代碼,對線程池源碼進行解析,帶你們可以更好的理解線程池的概念以及其運行規則,若有錯誤,請你們指出多線程
先從構造函數看起:函數
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
有的朋友可能還不是很清晰,舉個例子,一個公司,核心線程就是表明公司的內部核心員工,最大線程數量就是員工的最大數量,可能包含非內部員工,由於有一些試點或者簡單的項目,須要一些外協人員來作,也就是非核心線程,那麼當這些項目作完了或者失敗了,公司爲了節約用人成本,就遣散非核心員工,也就是閒置線程的存活時間。假如核心員工每一個人都很忙,可是需求又一波接一波,那就職務排期,也就是任務隊列,當任務隊列都滿了時候,還要來需求?對不起,不接受,直接拒絕,這也就是handler對應的拒絕策略了,能夠例子不是很合適,可是主要幫助你們理解下大概的意思。oop
打開源碼類,能夠看到以下幾個變量學習
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼
AtomicInteger是一個原子操做類,保證線程安全,採用低29位表示線程的最大數量,高3位表示5種線程池狀態,維護兩個參數,workCount和runState。workCount表示有效的線程數量,runState表示線程池的運行狀態。ui
terminated()
方法已經執行完成引用一張圖片幫助你們理解5個狀態this
execute()atom
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
int c = ctl.get();
//若是當前線程數量小於核心線程數量,執行addWorker建立新線程執行command任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//若是當前是運行狀態,將任務放入阻塞隊列,double-check線程池狀態
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//若是再次check,發現線程池狀態不是運行狀態了,移除剛纔添加進來的任務,而且拒絕改任務
if (! isRunning(recheck) && remove(command))
reject(command);
//處於運行狀態,可是沒有線程,建立線程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//往線程池中建立新的線程失敗,則reject任務
else if (!addWorker(command, false))
reject(command);
}
複製代碼
這裏大概總結下execute方法的執行流程,其實你們看源碼方法註釋是同樣很好的學習方法spa
這裏注意一點,當核心線程滿的時候,並不會去直接建立非核心線程去執行任務,而是先放進任務隊列,能夠理解爲需求任務首先是須要讓內部核心員工去完成的,任務隊列的優先級是高於非核心員工的,addWorker(),這裏的傳進去的boolean值,就表明着建立核心線程或者非核心線程
reject()
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
複製代碼
拒絕任務很簡單,reject方法會調用handler的rejectedExecution(command,this)方法,handler是RejectedExecutionHandler接口,默認實現是AbortPolicy,下面是AbortPolicy的實現:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
複製代碼
能夠看到默認策略是直接拋出異常的,這只是默認使用的策略,能夠經過實現接口實現本身的邏輯。
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 這裏return false的狀況有如下幾種
//1.當前狀態是stop及以上 2.當前是SHUTDOWN狀態,可是firstTask不爲空
//3.當前是SHUTDOWN狀態,可是隊列中爲空
//從第一節咱們知道,SHUTDOWN狀態是不執行進來的任務的,可是會繼續執行隊列中的任務
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
複製代碼
這裏就主要流程分析下
Worker()
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
......
}
複製代碼
能夠看到,Worker內部維護,一個線程變量以及任務變量,啓動一個 Worker對象中包含的線程 thread, 就至關於要執行 runWorker()方法, 並將該 Worker對象做爲該方法的參數.
runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//task不爲空,執行當前任務,任務執行完後將task置位空,getTask方法接着不斷從隊列中取任務
while (task != null || (task = getTask()) != null) {
w.lock();
//再次check線程池狀態,若是是stop狀態,直接interrupt()中斷任務
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//執行任務
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
經過while循環不斷的調用getTask方法,獲取任務task並進行執行,若是任務都執行完,跳出循環,線程結束並減小當前線程數量。
getTask()
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
複製代碼
這裏主要有兩個判斷須要說明下:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
allowCoreThreadTimeOut :這個第一節有說過,若是核心線程設置了該屬性,也是須要進行回收的,wc > corePoolSize:當前線程是非核心線程也是須要回收的,知足任何一個條件,timed 置位true
timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take()
若是上述的timed標誌位爲true,調用poll方法獲取任務,同時設置超時時間,若是沒有任務,則超時返回null,跳出runWorker的循環,線程結束被回收,若是爲false,調用take方法,此時若是沒有任務,不會返回null,而是會進入阻塞狀態,等待任務,不會被回收
線程池中的細節比較多,大體作一下總結概括
大概分析就是這麼多,但願有可以幫助到一些朋友更好的理解線程池的工做原理以及在使用中可以更好的使用,若有疑問或者錯誤,歡迎一塊兒討論