線程池續:你必需要知道的線程池submit()實現原理之FutureTask!

FutureTask思惟導圖.png

前言

上一篇內容寫了Java中線程池的實現原理及源碼分析,說好的是實實在在的大知足,想經過一篇文章讓你們對線程池有個透徹的瞭解,可是文章寫完總以爲還缺點什麼?java

上篇文章只提到線程提交的execute()方法,並無講解線程提交的submit()方法,submit()有一個返回值,能夠獲取線程執行的結果Future<T>,這一講就那深刻學習下submit()FutureTask實現原理。數據結構

使用場景&示例

使用場景

我能想到的使用場景就是在並行計算的時候,例如一個方法中調用methodA()、methodB(),咱們能夠經過線程池異步去提交方法A、B,而後在主線程中獲取組裝方法A、B計算後的結果,可以大大提高方法的吞吐量。多線程

FutureTask使用場景.png

使用示例

/**
 * @author wangmeng
 * @date 2020/5/28 15:30
 */
public class FutureTaskTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = Executors.newCachedThreadPool();

        System.out.println("====執行FutureTask線程任務====");
        Future<String> futureTask = threadPool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                System.out.println("FutureTask執行業務邏輯");
                Thread.sleep(2000);
                System.out.println("FutureTask業務邏輯執行完畢!");
                return "歡迎關注: 一枝花算不算浪漫!";
            }
        });

        System.out.println("====執行主線程任務====");
        Thread.sleep(1000);
        boolean flag = true;
        while(flag){
            if(futureTask.isDone() && !futureTask.isCancelled()){
                System.out.println("FutureTask異步任務執行結果:" + futureTask.get());
                flag = false;
            }
        }

        threadPool.shutdown();
    }
}

上面的使用很簡單,submit()內部傳遞的其實是個Callable接口,咱們本身實現其中的call()方法,咱們經過futureTask既能夠獲取到具體的返回值。異步

submit()實現原理

submit() 是也是提交任務到線程池,只是它能夠獲取任務返回結果,返回結果是經過FutureTask來實現的,先看下ThreadPoolExecutor中代碼實現:ide

public class ThreadPoolExecutor extends AbstractExecutorService {
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
}

提交任務仍是執行execute()方法,只是task被包裝成了FutureTask ,也就是在excute()中啓動線程後會執行FutureTask.run()方法。源碼分析

再來具體看下它執行的完整鏈路圖:學習

submit()全鏈路圖.png

上圖能夠看到,執行任務並返回執行結果的核心邏輯實在FutureTask中,咱們以FutureTask.run/get 兩個方法爲突破口,一點點剖析FutureTask的實現原理。this

FutureTask源碼初探

先看下FutureTask中部分屬性:線程

FutureTask屬性.png

public class FutureTask<V> implements RunnableFuture<V> {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable<V> callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;
}
  1. state

當前task狀態,共有7中類型。
NEW: 當前任務還沒有執行
COMPLETING: 當前任務正在結束,還沒有徹底結束,一種臨界狀態
NORMAL:當前任務正常結束
EXCEPTIONAL: 當前任務執行過程當中發生了異常。
CANCELLED: 當前任務被取消
INTERRUPTING: 當前任務中斷中..
INTERRUPTED: 當前任務已中斷3d

  1. callble

用戶提交任務傳遞的Callable,自定義call方法,實現業務邏輯

  1. outcome

任務結束時,outcome保存執行結果或者異常信息。

  1. runner

當前任務被線程執行期間,保存當前任務的線程對象引用

  1. waiters

由於會有不少線程去get當前任務的結果,因此這裏使用了一種stack數據結構來保存

FutureTask.run()實現原理

咱們已經知道在線程池runWorker()中最終會調用到FutureTask.run()方法中,咱們就來看下它的執行原理吧:

run()執行邏輯.png

具體代碼以下:

public class FutureTask<V> implements RunnableFuture<V> {
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

首先是判斷FutureTaskstate狀態,必須是NEW才能夠繼續執行。

而後經過CAS修改runner引用爲當前線程。

接着執行用戶自定義的call()方法,將返回結果設置到result中,result可能爲正常返回也可能爲異常信息。這裏主要是調用set()/setException()

FutureTask.set()實現原理

set()方法的實現很簡單,直接看下代碼:

public class FutureTask<V> implements RunnableFuture<V> {
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }
}

call()返回的數據賦值給全局變量outcome上,而後修改state狀態爲NORMAL,最後調用finishCompletion()來作掛起線程的喚醒操做,這個方法等到get()後面再來說解。

FutureTask.get()實現原理

get().png

接着看下代碼:

public class FutureTask<V> implements RunnableFuture<V> {
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
}

若是FutureTaskstateNORMAL或者COMPLETING,說明當前任務並無執行完成,調用get()方法會被阻塞,具體的阻塞邏輯在awaitDone()方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

這個方法能夠說是FutureTask中最核心的方法了,一步步來分析:

若是timed不爲空,這說明指定nanos時間還未返回結果,線程就會退出。

q是一個WaitNode對象,是將當前引用線程封裝在一個stack數據結構中,WaitNode對象屬性以下:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

接着判斷當前線程是否中斷,若是中斷則拋出中斷異常。

下面就進入一輪輪的if... else if...判斷邏輯,咱們仍是採用分支的方式去分析。

分支一:if (s > COMPLETING) {

此時get()方法已經有結果了,不管是正常返回的結果,仍是異常、中斷、取消等,此時直接返回state狀態,而後執行report()方法。

分支二:else if (s == COMPLETING)

條件成立,說明當前任務接近完成狀態,這裏讓當前線程再釋放cpu,進行下一輪搶佔cpu

分支三:else if (q == null)

第一次自旋執行,WaitNode尚未初始化,初始化q=new WaitNode();

分支四:else if (!queued){

queued表明當前線程是否入棧,若是沒有入棧則進行入棧操做,順便將全局變量waiters指向棧頂元素。

分支五/六:LockSupport.park

若是設置了超時時間,則使用parkNanos來掛起當前線程,不然使用park()

通過這麼一輪自旋循環後,若是執行call()尚未返回結果,那麼調用get()方法的線程都會被掛起。

被掛起的線程會等待run()返回結果後依次喚醒,具體的執行邏輯在finishCompletion()中。

最終stack結構中數據以下:

WaitNode.png

FutureTask.finishCompletion()實現原理

finishCompletion().png

具體實現代碼以下:

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;
}

代碼實現很簡單,看過get()方法後,咱們知道全部調用get()方法的線程,在run()尚未返回結果前,都會保存到一個有WaitNode構成的statck數據結構中,並且每一個線程都會被掛起。

這裏是遍歷waiters棧頂元素,而後依次查詢起next節點進行喚醒,喚醒後的節點接着會日後調用report()方法。

FutureTask.report()實現原理

report().png

具體代碼以下:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

這個方法很簡單,由於執行到了這裏,說明當前state狀態確定大於COMPLETING,判斷若是是正常返回,那麼返回outcome數據。

若是state是取消狀態,拋出CancellationException異常。

若是狀態都不知足,則說明執行中出現了差錯,直接拋出ExecutionException

FutureTask.cancel()實現原理

cancel().png

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

cancel()方法的邏輯很簡單,就是修改state狀態爲CANCELLED,而後調用finishCompletion()來喚醒等待的線程。

這裏若是mayInterruptIfRunning,就會先中斷當前線程,而後再去喚醒等待的線程。

總結

FutureTask的實現原理其實很簡單,每一個方法基本上都畫了一個簡單的流程圖來方便當即。

後面還打算分享一個BlockingQueue相關的源碼解讀,這樣線程池也能夠算是完結了。

在這以前可能會先分享一個SpringCloud常見配置代碼分析、最佳實踐等手冊,方便工做中使用,也是對以前看過的源碼一種總結。敬請期待!
歡迎關注:
原創乾貨分享.png

相關文章
相關標籤/搜索