由於已經看到了ThreadPoolExecutor的源碼,因此很容易就看到了ThreadPoolExecutor線程池的數據結構。圖1描述了這種數據結構。
圖1 ThreadPoolExecutor 數據結構
其實,即使沒有上述圖形描述ThreadPoolExecutor的數據結構,我們根據線程池的要求也很能夠猜測出其數據結構出來。
對於ThreadPoolExecutor而言,一個線程就是一個Worker對象,它與一個線程綁定,當Worker執行完畢就是線程執行完畢,這個在後面詳細討論線程池中線程的運行方式。
既然是線程池,那麼就首先研究下線程的構造方法。
-
public
interface ThreadFactory {
-
Thread newThread(Runnable r);
-
}
ThreadPoolExecutor使用一個線程工廠來構造線程。線程池都是提交一個任務Runnable,然後在某一個線程Thread中執行,ThreadFactory 負責如何創建一個新線程。
在J.U.C中有一個通用的線程工廠java.util.concurrent.Executors.DefaultThreadFactory,它的構造方式如下:
-
static
class DefaultThreadFactory implements ThreadFactory {
-
static
final AtomicInteger poolNumber =
new AtomicInteger(
1);
-
final ThreadGroup group;
-
final AtomicInteger threadNumber =
new AtomicInteger(
1);
-
final String namePrefix;
-
DefaultThreadFactory() {
-
SecurityManager s = System.getSecurityManager();
-
group = (s !=
null)? s.getThreadGroup() :
-
Thread.currentThread().getThreadGroup();
-
namePrefix =
"pool-" +
-
poolNumber.getAndIncrement() +
-
"-thread-";
-
}
-
public Thread newThread(Runnable r) {
-
Thread t =
new Thread(group, r,
-
namePrefix + threadNumber.getAndIncrement(),
-
0);
-
if (t.isDaemon())
-
t.setDaemon(
false);
-
if (t.getPriority() != Thread.NORM_PRIORITY)
-
t.setPriority(Thread.NORM_PRIORITY);
-
return t;
-
}
-
}
在這個線程工廠中,同一個線程池的所有線程屬於同一個線程組,也就是創建線程池的那個線程組,同時線程池的名稱都是「pool-<poolNum>-thread-<threadNum>」,其中poolNum是線程池的數量序號,threadNum是此線程池中的線程數量序號。這樣如果使用jstack的話很容易就看到了系統中線程池的數量和線程池中線程的數量。另外對於線程池中的所有線程默認都轉換爲非後臺線程,這樣主線程退出時不會直接退出JVM,而是等待線程池結束。還有一點就是默認將線程池中的所有線程都調爲同一個級別,這樣在操作系統角度來看所有系統都是公平的,不會導致競爭堆積。
一個線程Worker被構造出來以後就開始處於運行狀態。以下是一個線程執行的簡版邏輯。
-
private
final
class Worker implements Runnable {
-
private
final ReentrantLock runLock =
new ReentrantLock();
-
private Runnable firstTask;
-
Thread thread;
-
Worker(Runnable firstTask) {
-
this.firstTask = firstTask;
-
}
-
private void runTask(Runnable task) {
-
final ReentrantLock runLock =
this.runLock;
-
runLock.lock();
-
try {
-
task.run();
-
}
finally {
-
runLock.unlock();
-
}
-
}
-
public void run() {
-
try {
-
Runnable task = firstTask;
-
firstTask =
null;
-
while (task !=
null || (task = getTask()) !=
null) {
-
runTask(task);
-
task =
null;
-
}
-
}
finally {
-
workerDone(
this);
-
}
-
}
-
}
當提交一個任務時,如果需要創建一個線程(何時需要在下一節中探討)時,就調用線程工廠創建一個線程,同時將線程綁定到Worker工作隊列中。需要說明的是,Worker隊列構造的時候帶着一個任務Runnable,因此Worker創建時總是綁定着一個待執行任務。換句話說,創建線程的前提是有必要創建線程(任務數已經超出了線程或者強制創建新的線程,至於爲何強制創建新的線程後面章節會具體分析),不會無緣無故創建一堆空閒線程等着任務。這是節省資源的一種方式。
一旦線程池啓動線程後(調用線程run())方法,那麼線程工作隊列Worker就從第1個任務開始執行(這時候發現構造Worker時傳遞一個任務的好處了),一旦第1個任務執行完畢,就從線程池的任務隊列中取出下一個任務進行執行。循環如此,直到線程池被關閉或者任務拋出了一個RuntimeException。
由此可見,線程池的基本原理其實也很簡單,無非預先啓動一些線程,線程進入死循環狀態,每次從任務隊列中獲取一個任務進行執行,直到線程池被關閉。如果某個線程因爲執行某個任務發生異常而終止,那麼重新創建一個新的線程而已。如此反覆。
其實,線程池原理看起來簡單,但是複雜的是各種策略,例如何時該啓動一個線程,何時該終止、掛起、喚醒一個線程,任務隊列的阻塞與超時,線程池的生命週期以及任務拒絕策略等等。
我們從一個API開始接觸Executor是如何處理任務隊列的。
java.util.concurrent.Executor.execute(Runnable)
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
線程池中所有任務執行都依賴於此接口。這段話有以下幾個意思:
回答上面兩個「可能「。任務可能被執行,那不可能的情況就是上面說的情況3;可能不是立即執行,是因爲任務可能還在隊列中排隊,因此還在等待分配線程執行。瞭解完了字面上的問題,我們再來看具體的實現。
-
public void execute(Runnable command) {
-
if (command ==
null)
-
throw
new NullPointerException();
-
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
-
if (runState == RUNNING && workQueue.offer(command)) {
-
if (runState != RUNNING || poolSize ==
0)
-
ensureQueuedTaskHandled(command);
-
}
-
else
if (!addIfUnderMaximumPoolSize(command))
-
reject(command);
// is shutdown or saturated
-
}
-
}
這一段代碼看起來挺簡單的,其實這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個執行流程是這樣的:
文字描述步驟不夠簡單?下面圖形詳細表述了此過程。
老實說這個圖比上面步驟更難以理解,那麼從何入手呢。
流程的入口很簡單,我們就是要執行一個任務(Runnable command),那麼它的結束點在哪或者有哪幾個?
根據左邊這個圖我們知道可能有以下幾種出口:
(1)圖中的P1、P7,我們根據這條路徑可以看到,僅僅是將任務加入任務隊列(offer(command))了;
(2)圖中的P3,這條路徑不將任務加入任務隊列,但是啓動了一個新工作線程(Worker)進行掃尾操作,用戶處理爲空的任務隊列;
(3)圖中的P4,這條路徑沒有將任務加入任務隊列,但是啓動了一個新工作線程(Worker),並且工作現場的第一個任務就是當前任務;
(4)圖中的P5、P6,這條路徑沒有將任務加入任務隊列,也沒有啓動工作線程,僅僅是拋給了任務拒絕策略。P2是任務加入了任務隊列卻因爲線程池已經關閉於是又從任務隊列中刪除,並且拋給了拒絕策略。
如果上面的解釋還不清楚,可以去研究下面兩段代碼:
java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable) java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable) java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)
那麼什麼時候一個任務被立即執行呢?
在線程池運行狀態下,如果線程池大小 小於 核心線程池大小或者線程池已滿(任務隊列已滿)並且線程池大小 小於 最大線程池大小(此時線程池大小 大於 核心線程池大小的),用程序描述爲:
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())
上面的條件就是一個任務能夠被立即執行的條件。
有了execute的基礎,我們看看ExecutorService中的幾個submit方法的實現。
-
public Future<?> submit(Runnable task) {
-
if (task ==
null)
throw
new NullPointerException();
-
RunnableFuture<Object> ftask = newTaskFor(task,
null);
-
execute(ftask);
-
return ftask;
-
}
-
-
public <T>
Future<T> submit(Runnable task, T result) {
-
if (task ==
null)
throw
new NullPointerException();
-
RunnableFuture<T> ftask = newTaskFor(task, result);
-
execute(ftask);
-
return ftask;
-
}
-
-
public <T>
Future<T> submit(Callable<T> task) {
-
if (task ==
null)
throw
new NullPointerException();
-
RunnableFuture<T> ftask = newTaskFor(task);
-
execute(ftask);
-
return ftask;
-
}
很簡單,不是麼?對於一個線程池來說複雜的地方也就在execute方法的執行流程。在下一節中我們來討論下如何獲取任務的執行結果,也就是Future類的使用和原理。
這一節來探討下線程池中任務執行的結果以及如何阻塞線程、取消任務等等。
-
package info.imxylz.study.concurrency.future;
-
-
public
class SleepForResultDemo implements Runnable {
-
-
static
boolean result =
false;
-
-
static void sleepWhile(long ms) {
-
try {
-
Thread.sleep(ms);
-
}
catch (Exception e) {}
-
}
-
-
@Override
-
public void run() {
-
//do work
-
System.out.println(
"Hello, sleep a while.");
-
sleepWhile(
2000L);
-
result =
true;
-
}
-
-
public static void main(String[] args) {
-
SleepForResultDemo demo =
new SleepForResultDemo();
-
Thread t =
new Thread(demo);
-
t.start();
-
sleepWhile(
3000L);
-
System.out.println(result);
-
}
-
-
}
在沒有線程池的時代裏面,使用Thread.sleep(long)去獲取線程執行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務可能的執行時間,並且還會阻塞主線程,不管任務有沒有執行完畢。
-
package info.imxylz.study.concurrency.future;
-
-
public
class SleepLoopForResultDemo implements Runnable {
-
-
boolean result =
false;
-
-
volatile
boolean finished =
false;
-
-
static void sleepWhile(long ms) {
-
try {
-
Thread.sleep(ms);
-
}
catch (Exception e) {}
-
}
-
-
@Override
-
public void run() {
-
//do work
-
try {
-
System.out.println(
"Hello, sleep a while.");
-
sleepWhile(
2000L);
-
result =
true;
-
}
finally {
-
finished =
true;
-
}
-
}
-
-
public static void main(String[] args) {
-
SleepLoopForResultDemo demo =
new SleepLoopForResultDemo();
-
Thread t =
new Thread(demo);
-
t.start();
-
while (!demo.finished) {
-
sleepWhile(
10L);
-
}
-
System.out.println(demo.result);
-
}
-
-
}
使用volatile與while死循環的好處就是等待的時間可以稍微小一點,但是依然有CPU負載高並且阻塞主線程的問題。最簡單的降低CPU負載的方式就是使用Thread.join().
-
SleepLoopForResultDemo demo =
new SleepLoopForResultDemo();
-
Thread t =
new Thread(demo);
-
t.start();
-
t.join();
-
System.out.println(demo.result);
顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質上講就是利用while和wait來實現的。
上面的方式中都存在一個問題,那就是會阻塞主線程並且任務不能被取消。爲了解決這個問題,線程池中提供了一個Future接口。
在Future接口中提供了5個方法。
API看起來容易,來研究下異常吧。get()請求獲取一個結果會阻塞當前進程,並且可能拋出以下三種異常:
對於get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,由於有超時機制,因此還可能得到一個TimeoutException。
boolean cancel(boolean mayInterruptIfRunning)方法比較複雜,各種情況比較多:
來看看Future接口的實現類java.util.concurrent.FutureTask<V>具體是如何操作的。
在FutureTask中使用了一個AQS數據結構來完成各種狀態以及加鎖、阻塞的實現。
在此AQS類java.util.concurrent.FutureTask.Sync中一個任務用4中狀態:
初始情況下任務狀態state=0,任務執行(innerRun)後狀態變爲運行狀態RUNNING(state=1),執行完畢後變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消後就變爲狀態CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態變更的。
-
void innerRun() {
-
if (!compareAndSetState(
0, RUNNING))
-
return;
-
try {
-
runner = Thread.currentThread();
-
if (getState() == RUNNING)
// recheck after setting thread
-
innerSet(callable.call());
-
else
-
releaseShared(
0);
// cancel
-
}
catch (Throwable ex) {
-
innerSetException(ex);
-
}
-
}
執行一個任務有四步:設置運行狀態、設置當前線程(AQS需要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這裏也可以看到,一個任務只能執行一次,因爲執行完畢後它的狀態不在爲初始值0,要麼爲CANCELLED,要麼爲RAN。
取消一個任務(cancel)又是怎樣進行的呢?對比下前面取消任務的描述是不是很簡單,這裏無非利用AQS的狀態來改變任務的執行狀態,最終達到放棄未啓動或者正在執行的任務的目的。
-
boolean innerCancel(boolean mayInterruptIfRunning) {
-
for (;;) {
-
int s = getState();
-
if (ranOrCancelled(s))
-
return
false;
-
if (compareAndSetState(s, CANCELLED))
-
break;
-
}
-
if (mayInterruptIfRunning) {
-
Thread r = runner;
-
if (r !=
null)
-
r.interrupt();
-
}
-
releaseShared(
0);
-
done();
-
return
true;
-
}
到目前爲止我們依然沒有說明到底是如何阻塞獲取一個結果的。下面四段代碼描述了這個過程。
-
V innerGet() throws InterruptedException, ExecutionException {
-
acquireSharedInterruptibly(
0);
-
if (getState() == CANCELLED)
-
throw
new CancellationException();
-
if (exception !=
null)
-
throw
new ExecutionException(exception);
-
return result;
-
}
-
//AQS#acquireSharedInterruptibly
-
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
-
if (Thread.interrupted())
-
throw
new InterruptedException();
-
if (tryAcquireShared(arg) <
0)
-
doAcquireSharedInterruptibly(arg);
//park current Thread for result
-
}
-
protected int tryAcquireShared(int ignore) {
-
return innerIsDone()?
1 : -
1;
-
}
-
-
boolean innerIsDone() {
-
return ranOrCancelled(getState()) && runner ==
null;
-
}
當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這裏獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。如果不滿足條件,那麼在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。
至於將Runnable接口轉換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現。
-
static
final
class RunnableAdapter<T> implements Callable<T> {
-
final Runnable task;
-
final T result;
-
RunnableAdapter(Runnable task, T result) {
-
this.task = task;
-
this.result = result;
-
}
-
public T call() {
-
task.run();
-
return result;
-
}
-
}
java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、週期性任務調度的實現。
有了整個線程池的實現,再回頭來看延遲、週期性任務調度的實現應該就很簡單了,因爲所謂的延遲、週期性任務調度,無非添加一系列有序的任務隊列,然後按照執行順序的先後來處理整個任務隊列。如果是週期性任務,那麼在執行完畢的時候加入下一個時間點的任務即可。
由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區別在於任務是有序(按照執行時間順序)的,並且需要到達時間點(臨界點)才能執行,並不是任務隊列中有任務就需要執行的。也就是說唯一不同的就是任務隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基於java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現。
DelayQueue是基於有序隊列PriorityQueue實現的。PriorityQueue 也叫優先級隊列,按照自然順序對元素進行排序,類似於TreeMap/Collections.sort一樣。
同樣是有序隊列,DelayQueue和PriorityQueue區別在什麼地方?
由於DelayQueue在獲取元素時需要檢測元素是否「可用」,也就是任務是否達到「臨界點」(指定時間點),因此加入元素和移除元素會有一些額外的操作。
典型的,移除元素需要檢測元素是否達到「臨界點」,增加元素的時候如果有一個元素比「頭元素」更早達到臨界點,那麼就需要通知任務隊列。因此這需要一個條件變量final Condition available 。
移除元素(出隊列)的過程是這樣的:
-
public E take() throws InterruptedException {
-
final ReentrantLock lock =
this.lock;
-
lock.lockInterruptibly();
-
try {
-
for (;;) {
-
E first = q.peek();
-
if (first ==
null) {
-
available.await();
-
}
else {
-
long delay = first.getDelay(TimeUnit.NANOSECONDS);
-
if (delay >
0) {
-
long tl = available.awaitNanos(delay);
-
}
else {
-
E x = q.poll();
-
assert x !=
null;
-
if (q.size() !=
0)
-
available.signalAll();
// wake up other takers
-
return x;
-
-
}
-
}
-
}
-
}
finally {
-
lock.unlock();
-
}
-
}
同樣加入元素也會有相應的條件變量操作。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒「等待線程」去檢測元素。因爲頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。
同樣加入元素也會有相應的條件變量操作。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒「等待線程」去檢測元素。因爲頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。
同樣加入元素也會有相應的條件變量操作。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒「等待線程」去檢測元素。因爲頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。
-
public E take() throws InterruptedException {
-
final ReentrantLock lock =
this.lock;
-
lock.lockInterruptibly();
-
try {
-
for (;;) {
-
E first = q.peek();
-
if (first ==
null) {
-
available.await();
-
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- E first&