今天講一個牛逼而實用的概念,串行線程封閉
。對象池
是串行線程封閉的典型應用場景;線程池
糅合了對象池技術,但核心實現不依賴於對象池,很容易產生誤會。本文從串行線程封閉和對象池入手,最後經過源碼分析線程池的核心原理,釐清對象池與線程池之間的誤會。java
JDK版本:oracle java 1.8.0_102git
線程封閉是一種常見的線程安全設計策略:僅在固定的一個線程內訪問對象,不對其餘線程共享。github
使用線程封閉技術,對象O始終只對一個線程T1可見,「單線程」中天然不存在線程安全的問題。web
ThreadLocal是經常使用的線程安全工具,見源碼|ThreadLocal的實現原理。線程封閉在Servlet及高層的web框架Spring等中應用很多。面試
線程封閉雖然好用,卻限制了對象的共享。串行線程封閉改進了這一點:對象O只能由單個線程T1擁有,但能夠經過安全的發佈對象O來轉移O的全部權;在轉移全部權後,也只有另外一個線程T2能得到這個O的全部權,而且發佈O的T1不會再訪問O。數據庫
所謂「全部權」,指修改對象的權利。安全
相對於線程封閉,串行線程封閉使得任意時刻,最多僅有一個線程擁有對象的全部權。固然,這不是絕對的,只要線程T1事實不會再修改對象O,那麼就至關於僅有T2擁有對象的全部權。串行線層封閉讓對象變得能夠共享(雖然只能串行的擁有全部權),靈活性獲得大大提升;相對的,要共享對象就涉及安全發佈的問題,依靠BlockingQueue等同步工具很容易實現這一點。併發
對象池是串行線程封閉的經典應用場景,如數據庫鏈接池等。oracle
對象池利用了串行封閉:將對象O「借給」一個請求線程T1,T1使用完再交還給對象池,並保證「未擅自發布該對象」且「之後再也不使用」;對象池收回O後,等T2來借的時候再把它借給T2,完成對象全部權的傳遞。框架
猴子擼了一個簡化版的線程池,用戶只須要覆寫newObject()方法:
public abstract class AbstractObjectPool<T> {
protected final int min;
protected final int max;
protected final List<T> usings = new LinkedList<>();
protected final List<T> buffer = new LinkedList<>();
private volatile boolean inited = false;
public AbstractObjectPool(int min, int max) {
this.min = min;
this.max = max;
if (this.min < 0 || this.min > this.max) {
throw new IllegalArgumentException(String.format(
"need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
}
}
public void init() {
for (int i = 0; i < min; i++) {
buffer.add(newObject());
}
inited = true;
}
protected void checkInited() {
if (!inited) {
throw new IllegalStateException("not inited");
}
}
abstract protected T newObject();
public synchronized T getObject() {
checkInited();
if (usings.size() == max) {
return null;
}
if (buffer.size() == 0) {
T newObj = newObject();
usings.add(newObj);
return newObj;
}
T oldObj = buffer.remove(0);
usings.add(oldObj);
return oldObj;
}
public synchronized void freeObject(T obj) {
checkInited();
if (!usings.contains(obj)) {
throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
}
usings.remove(usings.indexOf(obj));
buffer.add(obj);
}
}
複製代碼
AbstractObjectPool具備如下特性:
雖然很簡單,但大能夠用於一些時間敏感、資源充裕的場景。若是時間進一步敏感,可將getObject()、freeObject()改寫爲併發程度更高的版本,但記得保證安全發佈安全回收;若是資源不那麼充裕,能夠適當增長對象回收策略。
能夠看到,一個對象池的基本行爲包括:
典型的對象池有各類鏈接池、常量池等,應用很是多,模型也大同小異,不作解析。使人迷惑的是線程池,很容易讓人誤覺得線程池的核心原理也是對象池,下面來追一遍源碼。
首先擺出結論:線程池糅合了對象池模型,但核心原理是生產者-消費者模型。
繼承結構以下:
用戶能夠將Runnable(或Callables)實例提交給線程池,線程池會異步執行該任務,返回響應的結果(完成/返回值)。
猴子最喜歡的是submit(Callable<T> task)
方法。咱們從該方法入手,逐步深刻函數棧,探究線程池的實現原理。
submit()方法在ExecutorService接口中定義,AbstractExecutorService實現,ThreadPoolExecutor直接繼承。
public abstract class AbstractExecutorService implements ExecutorService {
...
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
...
}
複製代碼
AbstractExecutorService#newTaskFor()建立一個RunnableFuture類型的FutureTask。
核心是execute()方法。
execute()方法在Executor接口中定義,ThreadPoolExecutor實現。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
複製代碼
咱們暫且忽略線程池的池化策略。關注一個最簡單的場景,看能不能先回答一個問題:線程池中的任務如何執行?
核心是addWorker()方法。以8行的參數爲例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,一般能夠直接在9行返回。
簡化以下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN) {
workers.add(w);
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
...
}
複製代碼
我去掉了不少用於管理線程池、維護線程安全的代碼。假設線程池未關閉,worker(即w,下同)添加成功,則必然可以將worker添加至workers中。workers是一個HashSet:
private final HashSet<Worker> workers = new HashSet<Worker>();
複製代碼
若是說與對象池有關,那麼workers即至關於示例代碼中的using,應用了對象池模型;只不過這裏的using是一直增加的,直到達到最大線程池大小maximumPoolSize。
可是很明顯,線程池並無將線程發佈出去,workers也僅僅完成using「保存線程」的功能。那麼,線程池中的任務如何執行呢?跟線程池有沒有關係?
注意九、1七、24行:
核心在與這三行,但線程池並無直接在addWorker()中啓動任務firstTask,代之以啓動一個worker。最終任務必然被啓動,那麼咱們繼續看Worker如何啓動這個任務。
Worker實現了Runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
...
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
}
複製代碼
爲何要將構造Worker時的參數命名爲firstTask?由於當且僅當須要創建新的Worker以執行任務task時,纔會調用構造函數。所以,任務task對於新Worker而言,是第一個任務firstTask。
Worker的實現很是簡單:將本身做爲Runable實例,構造時在內部建立並持有一個線程thread。Thread和Runable的使用你們很熟悉了,核心是Worker的run方法,它直接調用了runWorker()方法。
敲黑板!!!
重頭戲來了。簡化以下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
...
}
複製代碼
咱們在前面將要執行的任務賦值給firstTask,5-6行首先取出任務task,並將firstTask置爲null。由於接下來要執行task,firstTask字段就沒有用了。
重點是10-31行的while循環。下面分狀況討論。
case1對應前面做出的諸多假設。
第一次進入循環時,task==firstTask,不爲null,使10行布爾短路直接進入循環;從而16行執行的是firstTask的run()方法;異常處理不表;最後,finally代碼塊中,task會被置爲null,致使下一輪循環會進入case2。
case2是更廣泛的狀況,也就是線程池的核心。
case1中,task被置爲了null,使10行布爾表達式執行第二部分(task = getTask()) != null
(getTask()稍後再講,它返回一個用戶已提交的任務)。假設task獲得了一個已提交的任務,從而16行執行的是新得到的任務task的run()方法。後同case1,最後task仍然會被置爲null,之後循環都將進入case2。
任務從哪來呢?簡化以下:
public class ThreadPoolExecutor extends AbstractExecutorService {
...
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
...
}
複製代碼
咱們先看最簡單的,19-28行。
首先,workQueue是一個線程安全的BlockingQueue,大部分時候使用的實現類是LinkedBlockingQueue,見源碼|併發一枝花之BlockingQueue:
private final BlockingQueue<Runnable> workQueue;
複製代碼
假設timed爲false,則調用阻塞的take()方法,返回的r必定不是null,從而12行退出,將任務交給了某個worker線程。
一個小細節有點意思:前面每一個worker線程runWorker()方法時,在循環中加鎖粒度在worker級別,直接使用的lock同步;但由於每個woker都會調用getTask(),考慮到性能因素,源碼中getTask()中使用樂觀的CAS+SPIN實現無鎖同步。關於樂觀鎖和CAS,能夠參考個人另外一篇文章源碼|併發一枝花之ConcurrentLinkedQueue【僞】。
workQueue中的元素從哪來呢?這就要回顧execute()方法了。
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}
複製代碼
前面以8行的參數爲例,此時,線程池中的線程數未達到最小線程池大小corePoolSize,一般能夠直接在9行返回。進入8行的條件是「當前worker數小於最小線程池大小corePoolSize」
。
若是不知足,會繼續執行到12行。isRunning(c)
判斷線程池是否未關閉,咱們關注未關閉的狀況;則會繼續執行布爾表達式的第二部分workQueue.offer(command)
,嘗試將任務command放入隊列workQueue。
workQueue.offer()的行爲取決於線程池持有的BlockingQueue實例。Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()建立的線程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()建立的線程池則使用SynchronousQueue。以LinkedBlockingQueue爲例,建立時不配置容量,即建立爲無界隊列,則LinkedBlockingQueue#offer()永遠返回true,從而進入12-18行。
更細節的內容沒必要關心了,當workQueue.offer()返回true時,已經將任務command放入了隊列workQueue。當將來的某個時刻,某worker執行完某一個任務以後,會從workQueue中再取出一個任務繼續執行,直到線程池關閉,直到海枯石爛。
CachedThreadPool是一種無界線程池,使用SynchronousQueue能進一步提高性能,簡化代碼結構。留給讀者分析。
能夠看到,實際上,線程池的核心原理與對象池模型無關,而是生產者-消費者模型:
回到runWorker()方法,在執行任務的過程當中,線程池保留了一些鉤子方法,如beforeExecute()、afterExecute()。用戶能夠在實現本身的線程池時,能夠經過覆寫鉤子方法爲線程池添加功能。
但猴子不認爲鉤子方法是一種好的設計。由於鉤子方法大多依賴於源碼實現,那麼除非瞭解源碼或API聲明絕對的嚴謹正確,不然很難正確使用鉤子方法。等發生錯誤時再去了解實現,可能就太晚了。說到底,仍是不要使用相似extends這種表達「擴展」語義的語法來實現繼承,詳見Java中如何恰當的表達「繼承」與「擴展」的語義?。
固然,鉤子方法也是極其方便的。權衡看待。
相對於線程封閉,串行線程封閉離用戶的距離更近一些,簡單靈活,實用性強,很容易掌握。而線程封閉更多淪爲單純的設計策略,單純使用線程封閉的場景很少。
線程池與串行線程封閉、對象池的關係不大,但常常被混爲一談;沒看過源碼的很難想到其實現方案,面試時也能立分高下。
線程池的實現頗有意思。在追源碼以前,猴子一直覺得線程池就是把線程存起來,用的時候取出來執行任務;看了源碼才知道實現如此之妙,簡潔優雅效率高。源碼纔是最好的老師。
本文連接:源碼|從串行線程封閉到對象池、線程池
做者:猴子007
出處:monkeysayhi.github.io
本文基於 知識共享署名-相同方式共享 4.0 國際許可協議發佈,歡迎轉載,演繹或用於商業目的,可是必須保留本文的署名及連接。