前段時間須要把一個C++的項目port到Java中,所以時隔三年後從新熟悉了下Java。因爲須要一個通用的線程池,天然而然就想到了Executors。java
用了後,感受很爽... 因而忍不住摳了下源碼。所以就有了這篇學習筆記。編程
言歸正傳,Java Executor是一個功能豐富,接口設計很好的,基於生產者-消費者模式的通用線程池。這種線程池的設計思想也在不少地方被應用。數組
在這篇文章中,我並不打算介紹java線程池的使用,生產者-消費者模式,併發編程基本概念等。安全
一般來講,一個線程池的實現包括四個部分:數據結構
Thread 並非concurrent包的一部分。Thread包含着name, priority等成員和對應的操做方法。併發
它是繼承自runable的,也就是說線程的入口函數是run。它的繼承體系和重要操做函數以下圖:異步
它實現了一系列包括sleep, yield等靜態方法。以及獲取當前線程的靜態方法currentThread()。這些都是native方法。ide
值得注意的是它的中斷機制(雖然它也實現了suspend和resume方法,可是這兩個方法已被棄用):函數
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); }
在默認的狀況下,blocker (Interruptible 成員變量)的值爲null, 這時調用interrupt,僅僅是調用interrupt0設置一個標誌位。學習
而若是blocker的值不爲null,則會調用其interrupt方法實現真正的中斷。
(關於blocker值什麼時候被設置,在後面會看到一個使用場景。)
當線程處於可中斷的阻塞狀態時,好比說阻塞在sleep, wait, join,select等操做時,調用interrupt方法會讓線程從阻塞狀態退出,並拋出InterruptedException。
值得注意的一點是:interrupt讓咱們從阻塞的方法中退出,但線程的中斷狀態卻並不會被設置!
try { Thread.sleep(10); } catch (InterruptedException e) { System.out.println("IsInterrupted: " + Thread.currentThread().isInterrupted()); }
如上述示例代碼,此時你獲得的輸出是: IsInterrupted : false 。這是一個有點使人意外的地方。
上述代碼並不是一個好的示例,由於interrupt被咱們「吃」掉了!除非你明確的知道這是你想要的。不然的話請考慮在異常捕獲中(catch段中)加上:
Thread.currentThread.interrupt();
Java可執行的接口類有兩種,Runnable和Callable,它們的區別是Callable能夠帶返回值,一個須要實現Run()方法,另外一個須要實現帶返回值的Call() 方法。
在java.util.concurret中還有另一個接口類Future。
Future表示一個異步任務的結果,就是user code向線程池提交一個任務後,它會返回對應的 Future對象。用以觀察任務執行的狀態(isCancelled, isDone),取消任務(Cancel)或者等待任務執行(get, timeout get)。
如上圖,RunnableFuture是一箇中間類,它將Runnable和Future的功能糅合到一塊兒。FutureTask 則是真正的實現。
FutureTask能夠從一個Runnable和Callable構造,當經過Runnable構造時,它會調用Excutors.callable接口將其轉爲Callable對象保存起來。
從上面的類圖中能夠看出,FutureTask除了簡單的狀態查詢等接口外,還具備兩個重要的接口:get()
和 get(long timeout, TimeUnit unit)), cancel(bool mayInterruptIfRunning)
。
它們分別提供兩個重要的功能:阻塞(當前線程)等待(一段時間)直到task完成或者異常終止;取消任務。
一個任務具備三種狀態:還沒有運行,正在運行,已經執行完畢。
在調用cancel後,若是任務處於已經執行完畢了,則不須要作任何事情直接返回;
若是任務還沒有運行,將其狀態設爲cancelled;
若是任務正在執行,並且user以cancel(true)的方式取消這個任務。那麼FutureTask會經過調用Thread.interrupt來終止當前任務。
public boolean cancel(boolean mayInterruptIfRunning) { // 任務已經完成或者被中斷等其餘狀態 if (state != NEW) return false; if (mayInterruptIfRunning) { // 正在運行,或者還沒有運行 if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } // 設置cancel標誌位 else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
注意到: FutureTask並無一個RUNNING的狀態來標識該任務正在執行。正常的狀況下,任務從開始建立直到運行完畢,這段過程的狀態都是NEW。
user code能夠調用get() 接口等待任務完成或者調用get(long, TimeUnit)等待一段時間。但get()接口被調用,當前的線程將被掛起,直到條件知足(任務完成或者異常退出)。
在前文中咱們瞭解到,Thread並無提供掛起和阻塞的方法。在這裏,Java利用LockSupport類來實現目的。(我猜想其中用了相似條件變量的方法來實現)。
LockSupport也屬於concurrent。FutureTask利用它的park (parkNanos)和unpark方法來實現線程的掛起和恢復:
public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); }
其中parkNanos跟park方法並沒有本質區別,只是多了一個timeout參數。FutureTask分別用它們來實現get和timeout的get。
注意到上面的setBlocker方法了嗎?沒錯,它就是給在上文Thread.interrupt方法中出現過的Thread成員變量blocker賦值。從這咱們能夠看出,它是可中斷的。
而它真正實現掛起的則是依賴unsafe類。unsafe類在concurrent中頻繁出現,但sun去並不建議使用它。
它除了提供park,unpark方法外,還提供了一些內存和同步原語。好比CAS等。
調用get()的線程能夠是一個,也能夠是多個。爲了可以在恰當的時機將它們一一恢復,FutureTask內部須要維護一個鏈表來記錄全部的等待線程:waiters.
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
至此,咱們終於瞭解get的全貌了。get會調用awaitDone方法來實現阻塞。固然,只有兩個狀態須要處理:NEW, COMPLETING。
NEW的狀態在前文已經有介紹過。COMPLETING狀態一般持續較短,在FutureTask 內部的callable 的call方法調用完畢後,會須要將call的返回值設置到outcome這個成員變量。隨後將狀態設爲NORMAL。這期間的狀態就是COMPLETING。
顯而易見,對於這種狀態,咱們只須要調用yield讓出線程資源,使得FutureTask完成這一過程便可。
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet 2 Thread.yield(); else if (q == null) // 3 q = new WaitNode(); else if (!queued) // 4 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 5 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 6 LockSupport.park(this); } }
當任務處於NEW狀態正在被執行時,其餘線程調用get而進入awaitdone函數。
此時的流程是 3 -> 4 -> 5 或者 3 -> 4 -> 6。
它會首先分配一個WaitNode對象 --> 把它插入到waiters鏈表的表頭 --> 而後開始等待。那麼park函數什麼時候返回呢?
當任務執行完畢(或者被cancel)時,FutureTask會調用最終調用finishcompletion,改函數會改變FutureTask狀態,並調用LockSupport.unpark方法。
此時,awaitDone線程從park中返回,而後檢查當前的狀態已經被改變,隨後退出for循環。
FutureTask是會被多個線程訪問的,涉及到臨界區的保護,可是其內部卻並無任何的鎖操做。而在該類定義的末尾,有這樣的代碼。
private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
這段代碼會在類被加載時執行一次。注意到它利用getDeclaredField反射機制來保存了三個offset:
stateOffset,runnerOffset,waitersOffset分別對應着state,runner,waiters這三個成員的偏移量。
FutureTask真是對這三個成員變量進行CAS操做來保證原子性和無鎖化的。實現CAS的類正是上文出現過的sun.misc.Unsafe類。
UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
第一個參數是對象指針,第二個是偏移量,第三個是舊值,最後一個是新值。詳細可參考Unsafe文檔。
java實現了生產者-消費者模式的隊列。因爲隊列的容量有限,所以涉及到在隊列爲空的時候取task和在隊列已滿的時候存task的策略,連同一系列的查詢函數一塊兒,BlockingQueue包含着11個靜態方法。
BlockingQueue只是一個interface,它的實現類包括鏈表方式的LinkedBlockingQueue 、數組方式的ArrayBlockingQueue以及PriorityBlockingQueue等。
下面以LinkedBlockingQueue爲例來了解一下它的實現。
LinkedBlockingQueue是一個FIFO的隊列,它真正用來存儲元素的節點類型是Node :
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
對應的,在LinkedBlockingQueue中保存了頭節點和尾節點 :
/** * Head of linked list. * Invariant: head.item == null */ private transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last;
在LinkedBlockingQueue中,Java使用了雙鎖機制,分別對頭節點和尾節點加鎖。這樣取和存的操做就能夠同時進行了。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
以take爲例,獲取並移除此隊列的頭部,在元素變得可用以前一直等待(可被打斷)。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
它將會一直阻塞在notEmpty.await()上,直到信號到達或者被中斷。注意到它只須要對takeLock加鎖,而無需對putLock加鎖。
相應的,put操做也只須要鎖上putLock就能夠了。
有的操做則須要兩個鎖都鎖上,好比說remove,由於咱們不肯定要刪除的元素的位置。
public boolean remove(Object o) { if (o == null) return false; fullyLock(); try { for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
能夠看到LinkedBlockingQueue 並無直接調用lock,而是經過fullyLock和fullyUnLock來加解鎖以保證一致性,避免死鎖:
/** * Lock to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlock to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }
固然,雙鎖隊列在插入第一個元素和最後一個元素出隊的時候會有衝突。這裏的解決辦法是加了一個哨兵,開始的時候,頭尾節點都指向這個哨兵,在隨後的操做中,頭結點始終指向哨兵,而尾節點指向真正有效的值。
有了前面這些零件,咱們就能夠開始組裝線程池對象了。java裏面Executors的真正實現類主要包括兩個ThreadPollExecutors和ScheduledThreadPoolExecutor。其中ScheduledThreadPoolExecutor經過實現其基類ScheduledExecutorService擴展了ThreadPoolExecutor類。
SheduledExecutorsService主要用於執行週期性的或者定時的任務。其餘狀況下咱們更多使用ThreadPoolExecutor。
ThreadPoolExecutor總共有七個構造參數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
從其註釋和參數名不難猜想各個參數的用途。惟一有點麻煩的是corePoolSize, maximumPoolSize這兩個參數的區別。你能夠參考這裏或者這裏。
但大多數狀況咱們並不須要直接調用它的構造函數,在Executors裏面定義了一系列的靜態方法供咱們使用。包括newFixedThreadPool、newSingleThreadExecutor等。
因爲ThreadPoolExecutor是一個通用的線程池,所以它須要爲各類各樣的狀況預留足夠的接口。ThreadPoolExecutor除了提供豐富的接口外,還提供了一些「什麼都不作」的函數,爲user預留接口。
好比每一個任務在執行以前會調用beforeExecute,執行完畢後又會調用afterExecute。又好比terminate用來通知用戶代碼該線程將要結束。
這些接口java都提供了及其豐富的文檔。
Executor接口設計的目的或許也在於此,爲簡單的狀況提供儘可能簡單的使用方法,同時爲複雜的狀況或者說高級用戶提供足夠多的接口。
在最初使用ThreadPoolExecutor 時候,用到FutrueTask的cancel接口,我老是擔憂一個問題:
因爲cancel是依賴線程的interrupt方法來實現的,也就是說cancel的狀態保持在線程中而不是task中。那麼當這個線程執行下一個task會不會被影響?爲了驗證這一點,我作了個小小的實驗:
public class InterruptTest { public static class MyTask implements Runnable { @Override public void run() { System.out.println(Thread.currentThread()); System.out.println("before interrupt " + Thread.currentThread().isInterrupted()); Thread.currentThread().interrupt(); System.out.println("after interrupt " + Thread.currentThread().isInterrupted()); } } public static void main(String[] str) { ExecutorService service = Executors.newFixedThreadPool(1); // MyTask task1 = new MyTask(); Future<?> future1 = service.submit(new InterruptTest.MyTask()); Future<?> future2 = service.submit(new InterruptTest.MyTask()); } }
輸出結果說明,個人擔憂是多餘的:
Thread[pool-1-thread-1,5,main] before interrupt false after interrupt true Thread[pool-1-thread-1,5,main] before interrupt false after interrupt true
其關鍵代碼就在ThreadPoolExecutor.runWorker 方法中,線程的中斷狀態會被清除(shutDown例外)。
final void runWorker(Worker w) { ... // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); ... }
其中Executors還有不少的東西,可是看看文章的長度,我決定把那些關於Executors的筆記先「藏」起來。
若是感興趣的能夠翻看源碼: ThreadFactory, RejectHandler, worker, task, shutDown策略,鎖機制... 看看ThreadPoolExecutor 把這些積木堆成一個房子的吧。