Java Executors 源碼分析

0 引言

前段時間須要把一個C++的項目port到Java中,所以時隔三年後從新熟悉了下Java。因爲須要一個通用的線程池,天然而然就想到了Executors。java

用了後,感受很爽... 因而忍不住摳了下源碼。所以就有了這篇學習筆記。編程

言歸正傳,Java Executor是一個功能豐富,接口設計很好的,基於生產者-消費者模式的通用線程池。這種線程池的設計思想也在不少地方被應用。數組

在這篇文章中,我並不打算介紹java線程池的使用,生產者-消費者模式,併發編程基本概念等。安全

一般來講,一個線程池的實現包括四個部分:數據結構

  1. 執行任務的線程
  2. 用於封裝任務的task對象
  3. 存儲任務的數據結構
  4. 線程池自己

1 Thread

Thread 並非concurrent包的一部分。Thread包含着name, priority等成員和對應的操做方法。併發

它是繼承自runable的,也就是說線程的入口函數是run。它的繼承體系和重要操做函數以下圖:異步

thread 類結構

它實現了一系列包括sleep, yield等靜態方法。以及獲取當前線程的靜態方法currentThread()。這些都是native方法。ide

值得注意的是它的中斷機制(雖然它也實現了suspend和resume方法,可是這兩個方法已被棄用):函數

  1. 經過調用interrupt來觸發一箇中斷
  2. isInterrupted() 用來查詢線程的中斷狀態
  3. interrupted() 用來查詢並清除線程的中斷狀態
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();
    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

在默認的狀況下,blocker (Interruptible 成員變量)的值爲null, 這時調用interrupt,僅僅是調用interrupt0設置一個標誌位。學習

而若是blocker的值不爲null,則會調用其interrupt方法實現真正的中斷。

(關於blocker值什麼時候被設置,在後面會看到一個使用場景。)

當線程處於可中斷的阻塞狀態時,好比說阻塞在sleep, wait, join,select等操做時,調用interrupt方法會讓線程從阻塞狀態退出,並拋出InterruptedException。

值得注意的一點是:interrupt讓咱們從阻塞的方法中退出,但線程的中斷狀態卻並不會被設置

try {
    Thread.sleep(10);
}
catch (InterruptedException e) {
    System.out.println("IsInterrupted: " + Thread.currentThread().isInterrupted());
}

如上述示例代碼,此時你獲得的輸出是: IsInterrupted : false 。這是一個有點使人意外的地方。
上述代碼並不是一個好的示例,由於interrupt被咱們「吃」掉了!除非你明確的知道這是你想要的。不然的話請考慮在異常捕獲中(catch段中)加上:

Thread.currentThread.interrupt();

2. Task

Java可執行的接口類有兩種,Runnable和Callable,它們的區別是Callable能夠帶返回值,一個須要實現Run()方法,另外一個須要實現帶返回值的Call() 方法。

在java.util.concurret中還有另一個接口類Future。

Future表示一個異步任務的結果,就是user code向線程池提交一個任務後,它會返回對應的 Future對象。用以觀察任務執行的狀態(isCancelled, isDone),取消任務(Cancel)或者等待任務執行(get, timeout get)。

FutureTask 類結構圖

如上圖,RunnableFuture是一箇中間類,它將Runnable和Future的功能糅合到一塊兒。FutureTask 則是真正的實現。

FutureTask

FutureTask能夠從一個Runnable和Callable構造,當經過Runnable構造時,它會調用Excutors.callable接口將其轉爲Callable對象保存起來。

從上面的類圖中能夠看出,FutureTask除了簡單的狀態查詢等接口外,還具備兩個重要的接口:get()get(long timeout, TimeUnit unit)), cancel(bool mayInterruptIfRunning)

它們分別提供兩個重要的功能:阻塞(當前線程)等待(一段時間)直到task完成或者異常終止;取消任務。

任務取消

一個任務具備三種狀態:還沒有運行,正在運行,已經執行完畢。

在調用cancel後,若是任務處於已經執行完畢了,則不須要作任何事情直接返回;

若是任務還沒有運行,將其狀態設爲cancelled;
若是任務正在執行,並且user以cancel(true)的方式取消這個任務。那麼FutureTask會經過調用Thread.interrupt來終止當前任務。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 任務已經完成或者被中斷等其餘狀態
    if (state != NEW)
        return false;
    if (mayInterruptIfRunning) {
        // 正在運行,或者還沒有運行
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        Thread t = runner;
        if (t != null)
            t.interrupt();
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    // 設置cancel標誌位
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    finishCompletion();
    return true;
}

注意到: FutureTask並無一個RUNNING的狀態來標識該任務正在執行。正常的狀況下,任務從開始建立直到運行完畢,這段過程的狀態都是NEW。

阻塞等待

user code能夠調用get() 接口等待任務完成或者調用get(long, TimeUnit)等待一段時間。但get()接口被調用,當前的線程將被掛起,直到條件知足(任務完成或者異常退出)。

在前文中咱們瞭解到,Thread並無提供掛起和阻塞的方法。在這裏,Java利用LockSupport類來實現目的。(我猜想其中用了相似條件變量的方法來實現)。

park

LockSupport也屬於concurrent。FutureTask利用它的park (parkNanos)和unpark方法來實現線程的掛起和恢復:


public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); }

其中parkNanos跟park方法並沒有本質區別,只是多了一個timeout參數。FutureTask分別用它們來實現get和timeout的get。

注意到上面的setBlocker方法了嗎?沒錯,它就是給在上文Thread.interrupt方法中出現過的Thread成員變量blocker賦值。從這咱們能夠看出,它是可中斷的。

而它真正實現掛起的則是依賴unsafe類。unsafe類在concurrent中頻繁出現,但sun去並不建議使用它。

它除了提供park,unpark方法外,還提供了一些內存和同步原語。好比CAS等。

多個等待者

調用get()的線程能夠是一個,也能夠是多個。爲了可以在恰當的時機將它們一一恢復,FutureTask內部須要維護一個鏈表來記錄全部的等待線程:waiters.

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

get 全貌

至此,咱們終於瞭解get的全貌了。get會調用awaitDone方法來實現阻塞。固然,只有兩個狀態須要處理:NEW, COMPLETING。

NEW的狀態在前文已經有介紹過。COMPLETING狀態一般持續較短,在FutureTask 內部的callable 的call方法調用完畢後,會須要將call的返回值設置到outcome這個成員變量。隨後將狀態設爲NORMAL。這期間的狀態就是COMPLETING。

顯而易見,對於這種狀態,咱們只須要調用yield讓出線程資源,使得FutureTask完成這一過程便可。


private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet 2 Thread.yield(); else if (q == null) // 3 q = new WaitNode(); else if (!queued) // 4 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 5 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 6 LockSupport.park(this); } }

當任務處於NEW狀態正在被執行時,其餘線程調用get而進入awaitdone函數。

此時的流程是 3 -> 4 -> 5 或者 3 -> 4 -> 6。

它會首先分配一個WaitNode對象 --> 把它插入到waiters鏈表的表頭 --> 而後開始等待。那麼park函數什麼時候返回呢?

  1. 對應的unpark被調用(或者在這以前已經被調用)
  2. 若是設置了timeout的,會在時間到達後退出。
  3. 被中斷。
  4. 其餘異常。

等待線程恢復

當任務執行完畢(或者被cancel)時,FutureTask會調用最終調用finishcompletion,改函數會改變FutureTask狀態,並調用LockSupport.unpark方法。

此時,awaitDone線程從park中返回,而後檢查當前的狀態已經被改變,隨後退出for循環。

線程安全

FutureTask是會被多個線程訪問的,涉及到臨界區的保護,可是其內部卻並無任何的鎖操做。而在該類定義的末尾,有這樣的代碼。

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

這段代碼會在類被加載時執行一次。注意到它利用getDeclaredField反射機制來保存了三個offset:
stateOffset,runnerOffset,waitersOffset分別對應着state,runner,waiters這三個成員的偏移量。

FutureTask真是對這三個成員變量進行CAS操做來保證原子性和無鎖化的。實現CAS的類正是上文出現過的sun.misc.Unsafe類。

UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())

第一個參數是對象指針,第二個是偏移量,第三個是舊值,最後一個是新值。詳細可參考Unsafe文檔。

3. BlockingQueue

java實現了生產者-消費者模式的隊列。因爲隊列的容量有限,所以涉及到在隊列爲空的時候取task和在隊列已滿的時候存task的策略,連同一系列的查詢函數一塊兒,BlockingQueue包含着11個靜態方法。

BlockingQueue只是一個interface,它的實現類包括鏈表方式的LinkedBlockingQueue 、數組方式的ArrayBlockingQueue以及PriorityBlockingQueue等。

LinkedBlockingQueue

下面以LinkedBlockingQueue爲例來了解一下它的實現。

LinkedBlockingQueue是一個FIFO的隊列,它真正用來存儲元素的節點類型是Node :

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

對應的,在LinkedBlockingQueue中保存了頭節點和尾節點 :

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
private transient Node<E> head;
/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

在LinkedBlockingQueue中,Java使用了雙鎖機制,分別對頭節點和尾節點加鎖。這樣取和存的操做就能夠同時進行了。


/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

以take爲例,獲取並移除此隊列的頭部,在元素變得可用以前一直等待(可被打斷)。


public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }

它將會一直阻塞在notEmpty.await()上,直到信號到達或者被中斷。注意到它只須要對takeLock加鎖,而無需對putLock加鎖。

相應的,put操做也只須要鎖上putLock就能夠了。

有的操做則須要兩個鎖都鎖上,好比說remove,由於咱們不肯定要刪除的元素的位置。

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

能夠看到LinkedBlockingQueue 並無直接調用lock,而是經過fullyLock和fullyUnLock來加解鎖以保證一致性,避免死鎖:


/** * Lock to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlock to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }

固然,雙鎖隊列在插入第一個元素和最後一個元素出隊的時候會有衝突。這裏的解決辦法是加了一個哨兵,開始的時候,頭尾節點都指向這個哨兵,在隨後的操做中,頭結點始終指向哨兵,而尾節點指向真正有效的值。

4. Executors

類結構

有了前面這些零件,咱們就能夠開始組裝線程池對象了。java裏面Executors的真正實現類主要包括兩個ThreadPollExecutors和ScheduledThreadPoolExecutor。其中ScheduledThreadPoolExecutor經過實現其基類ScheduledExecutorService擴展了ThreadPoolExecutor類。

Executor 類結構

SheduledExecutorsService主要用於執行週期性的或者定時的任務。其餘狀況下咱們更多使用ThreadPoolExecutor。

ThreadPoolExecutor

ThreadPoolExecutor總共有七個構造參數:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

從其註釋和參數名不難猜想各個參數的用途。惟一有點麻煩的是corePoolSize, maximumPoolSize這兩個參數的區別。你能夠參考這裏或者這裏。

但大多數狀況咱們並不須要直接調用它的構造函數,在Executors裏面定義了一系列的靜態方法供咱們使用。包括newFixedThreadPool、newSingleThreadExecutor等。

因爲ThreadPoolExecutor是一個通用的線程池,所以它須要爲各類各樣的狀況預留足夠的接口。ThreadPoolExecutor除了提供豐富的接口外,還提供了一些「什麼都不作」的函數,爲user預留接口。
好比每一個任務在執行以前會調用beforeExecute,執行完畢後又會調用afterExecute。又好比terminate用來通知用戶代碼該線程將要結束。

這些接口java都提供了及其豐富的文檔。

Executor接口設計的目的或許也在於此,爲簡單的狀況提供儘可能簡單的使用方法,同時爲複雜的狀況或者說高級用戶提供足夠多的接口。

一個不用擔憂的問題

在最初使用ThreadPoolExecutor 時候,用到FutrueTask的cancel接口,我老是擔憂一個問題:

因爲cancel是依賴線程的interrupt方法來實現的,也就是說cancel的狀態保持在線程中而不是task中。那麼當這個線程執行下一個task會不會被影響?爲了驗證這一點,我作了個小小的實驗:

public class InterruptTest
{
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread());
            System.out.println("before interrupt " + Thread.currentThread().isInterrupted());
            Thread.currentThread().interrupt();

            System.out.println("after interrupt " + Thread.currentThread().isInterrupted());
        }
    }
    public static void main(String[] str)
    {
        ExecutorService service = Executors.newFixedThreadPool(1);
        // MyTask task1 = new MyTask();
        Future<?> future1 = service.submit(new InterruptTest.MyTask());
        Future<?> future2 = service.submit(new InterruptTest.MyTask());
    }
}

輸出結果說明,個人擔憂是多餘的:

Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true
Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true

其關鍵代碼就在ThreadPoolExecutor.runWorker 方法中,線程的中斷狀態會被清除(shutDown例外)。

final void runWorker(Worker w) {
     ...
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
    ...
}

參見 SO 的提問

其中Executors還有不少的東西,可是看看文章的長度,我決定把那些關於Executors的筆記先「藏」起來。

若是感興趣的能夠翻看源碼: ThreadFactory, RejectHandler, worker, task, shutDown策略,鎖機制... 看看ThreadPoolExecutor 把這些積木堆成一個房子的吧。

相關文章
相關標籤/搜索