死磕 java線程系列之本身動手寫一個線程池(續)

mythreadpool

(手機橫屏看源碼更方便)ide

問題

(1)本身動手寫的線程池如何支持帶返回值的任務呢?函數

(2)若是任務執行的過程當中拋出異常了該怎麼處理呢?源碼分析

簡介

上一章咱們本身動手寫了一個線程池,可是它是不支持帶返回值的任務的,那麼,咱們本身可否實現呢?必須能夠,今天咱們就一塊兒來實現帶返回值任務的線程池。測試

前情回顧

首先,讓咱們先回顧一下上一章寫的線程池:this

(1)它包含四個要素:核心線程數、最大線程數、任務隊列、拒絕策略;spa

(2)它具備執行無返回值任務的能力;線程

(3)它沒法處理有返回值的任務;設計

(4)它沒法處理任務執行的異常(線程中的異常不會拋出到線程外);code

那麼,咱們能不能在現有的基礎上實現其下面兩項能力呢?讓咱們一塊兒來試一試吧!cdn

有返回值和無返回值的任務到底有何不一樣?

答案很明顯,就是一個有返回值,一個無返回值,用僞代碼來表示就是下面這樣:

// 無返回值
    threadPool.execute(()->{
        System.out.println(1);
    });
    // 有返回值,分兩步走
    // 1. 提交任務到線程池中
    SomeClass result = threadPool.execute(()->{
        System.out.println(1);
        return 1;
    });
    // 2. 等待任務的結果返回
    Object value = result.get();
    複製代碼

無返回值的任務提交了就完事,主線程並不Care它到底有沒有執行完,並不關心它是否是拋出異常,主線程Just提交線程到線程池中,其他什麼都無論。

有返回值的任務就不同了,主線程首先要提交任務到線程池中,它須要使用到任務執行的結果,因此它必須等待任務執行完畢才能拿到任務執行的結果。

那麼,爲何不直接在execute的時候就等待任務執行完畢呢?這樣的話那不就跟串行沒啥區別了,還不如直接在主線程執行任務呢,還少了線程切換的資源消耗。

之因此要分紅兩步,是由於主線程並不必定須要當即獲取返回值,在須要用到返回值的時候纔去get,這樣就能夠在提交任務和獲取返回值之間幹些其它的事情,提升效率。

因此,提交任務的時候不須要阻塞,get返回值的時候纔可能須要阻塞,若是get的時候任務已經執行完畢了,這時候也不須要阻塞,若是get的時候任務還未執行完畢,那就要阻塞等待任務執行完畢才能獲取到返回值。

實現分析

首先,無返回值的任務咱們直接使用的Runnable函數式接口,有返回值的任務有沒有現成的接口呢?還真有,那就是Callable接口,它有個返回值。

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}複製代碼

其次,提交任務的時候須要有個返回值,它是在未來用來獲取任務執行結果的,實際上它也是新任務的一種能力,可使用它對任務進行包裝,使其具備返回值的能力。

public interface Future<T> {
    T get();
}複製代碼

再次,咱們須要給現有的線程池增長一種新的能力,根據單一職責原則,咱們定義一個新的接口來承載這種能力。

public interface FutureExecutor extends Executor {
    <T> Future<T> submit(Callable<T> command);
}複製代碼

而後,咱們須要一種新的任務,它既具備舊任務的執行能力(run()方法),又具備新任務的返回值能力(get()方法),因此咱們造一個「未來的任務」對提交的任務進行包裝,使其具備返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {

    /**
     * 真正的任務
     */
    private Callable<T> task;
    
    public FutureTask(Callable<T> task) {
        this.task = task;
    }

    @Override
    public void run() {
        // 具體實現...
    }

    @Override
    public T get() {
        // 具體實現...
    }
}複製代碼

最後,咱們只要對原有的線程池進行擴展,將提交的任務包裝成「未來獲取返回值的任務」,仍是使用原來的方法去執行,而後返回這個未來的任務便可。

根據開閉原則,【本篇文章由公衆號「彤哥讀源碼」原創】原來的代碼咱們不作任何修改,擴展新的子類來實現新的能力。

public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {

    public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
        super(name, coreSize, maxSize, taskQueue, rejectPolicy);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        // 包裝成未來獲取返回值的任務
        FutureTask<T> futureTask = new FutureTask<>(task);
        // 仍是使用原來的執行能力
        execute(futureTask);
        // 返回未來的任務,只須要返回其get返回值的能力便可
        // 因此這裏返回的是Future而不是FutureTask類型
        return futureTask;
    }
}複製代碼

好了,到這裏總體的邏輯咱們就已經比較清晰地實現完了,還剩下最關鍵的部分,這個「未來的任務」的兩個能力要如何實現。

未來的任務

未來的任務,具備兩個能力:一是執行真正任務的能力,二是未來獲取返回值的能力。

public class FutureTask<T> implements Runnable, Future<T> {
    @Override
    public void run() {
        // 具體實現...
    }

    @Override
    public T get() {
        // 具體實現...
    }
}複製代碼

首先,咱們要明確一件事,任務的執行是線程池中,獲取返回值是在主線程中,它們是在兩個線程中執行的,並且誰先誰後咱們沒法肯定。

其次,若是run()在get()以前執行,咱們須要告訴get()任務已經執行完畢了,因此須要一個狀態來通知這個事,還須要一個變量來承載任務執行的返回值。

/**
     * 任務執行的狀態,0未開始,1正常完成,2異常完成
     * 也可使用volatile+Unsafe實現CAS操做
     */
    private AtomicInteger state = new AtomicInteger(NEW);
    private static final int NEW = 0;
    private static final int FINISHED = 1;
    private static final int EXCEPTION = 2;
    /**
     * 任務執行的結果【本篇文章由公衆號「彤哥讀源碼」原創】
     * 若是執行正常,返回結果爲T
     * 若是執行異常,返回結果爲Exception
     */
    private Object result;複製代碼

再次,若是get()在run()以前執行,那就須要阻塞等待run()執行完畢才能拿到返回值,因此須要保存調用者(主線程),get()的時候park阻塞住,run()完成了unpark喚醒它來拿返回值。

/**
     * 調用者線程
     * 也可使用volatile+Unsafe實現CAS操做
     */
    private AtomicReference<Thread> caller = new AtomicReference<>();複製代碼

而後,咱們先來看看run()方法的邏輯,它其實就是先執行真正的任務,而後修改狀態爲完成,並保存任務的返回值,若是保存了主線程,還要喚醒它。

@Override
    public void run() {
        // 若是狀態不是NEW,說明執行過了,直接返回
        if (state.get() != NEW) {
            return;
        }
        try {
            // 執行任務【本篇文章由公衆號「彤哥讀源碼」原創】
            T r = task.call();
            // CAS更新state的值爲FINISHED
            // 若是更新成功,就把r賦值給result
            // 若是更新失敗,說明state的值不爲NEW了,也就是任務已經執行過了
            if (state.compareAndSet(NEW, FINISHED)) {
                this.result = r;
                // finish()必須放在修改state裏面,見下面的分析
                finish();
            }
        } catch (Exception e) {
            // 若是CAS更新state的值爲EXCEPTION成功,就把e賦值給result
            // 若是CAS更新失敗,說明state的值不爲NEW了,也就是任務已經執行過了
            if (state.compareAndSet(NEW, EXCEPTION)) {
                this.result = e;
                // finish()必須放在修改state裏面,見下面的分析
                finish();
            }
        }
    }

    private void finish() {
        // 檢查調用者是否爲空,若是不爲空,喚醒它
        // 調用者在調用get()方法的進入阻塞狀態
        for (Thread c; (c = caller.get()) != null;) {
            if (caller.compareAndSet(c, null)) {
                LockSupport.unpark(c);
            }
        }
    }複製代碼

最後,咱們再看看get()方法,若是任務還未執行,就阻塞等待任務的執行;若是任務已經執行完畢了,直接拿返回值便可;可是,還有一種狀況,get()方法執行的過程當中run()方法也在執行,因此get()方法中的每一步都要檢查狀態的值有沒有變化。

@Override
    public T get() {
        int s = state.get();
        // 若是任務還未執行完成,判斷當前線程是否要進入阻塞狀態
        if (s == NEW) {
            // 標識調用者線程是否被標記過
            boolean marked = false;
            for (;;) {
                // 從新獲取state的值
                s = state.get();
                // 若是state大於NEW說明完成了,跳出循環
                if (s > NEW) {
                    break;
                    // 此處必須把caller的CAS更新和park()方法分紅兩步處理,不能把park()放在CAS裏面
                } else if (!marked) {
                    // 嘗試更新調用者線程
                    // 試想斷點停在此處【本篇文章由公衆號「彤哥讀源碼」原創】
                    // 此時state爲NEW,讓run()方法執行到底,它不會執行finish()中的unpark()方法
                    // 這時打開斷點,這裏會更新caller成功,可是循環從頭再執行一遍發現state已經變了,
                    // 直接在上面的if(s>NEW)處跳出循環了,由於finish()在修改state內部
                    marked = caller.compareAndSet(null, Thread.currentThread());
                } else {
                    // 調用者線程更新以後park當前線程
                    // 試想斷點停在此處
                    // 此時state爲NEW,讓run()方法執行到底,由於上面的caller已經設置值了,
                    // 因此會執行finish()方法中的unpark()方法,
                    // 這時再打開斷點,這裏不會park信
                    // 見unpark()方法的註釋,上面寫得很清楚:
                    // 若是線程執行了park()方法,那麼執行unpark()方法會喚醒那個線程
                    // 若是先執行了unpark()方法,那麼線程下一次執行park()方法將不會阻塞
                    LockSupport.park();
                }
            }
        }

        if (s == FINISHED) {
            return (T) result;
        }
        throw new RuntimeException((Throwable) result);
    }複製代碼

在咱們的實現中,若是任務執行的過程拋出異常了,也是經過result返回給主線程,這樣主線程就拿到了這個異常,它就能夠作相應的處理了。

好了,完整的實現到此結束,不知道你領悟了沒有。

測試用例

最後奉上測試代碼:

public class MyThreadPoolFutureExecutorTest {
    public static void main(String[] args) {
        FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
        List<Future<Integer>> list = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            int num = i;
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000);
                System.out.println("running: " + num);
                return num;
            });
            list.add(future);
        }

        for (Future<Integer> future : list) {
            System.out.println("runned: " + future.get());
        }
    }
}複製代碼

運行結果:

thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒絕的任務
【本篇文章由公衆號「彤哥讀源碼」原創】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9
複製代碼

總結

(1)有返回值的任務是經過包裝成未來的任務來實現的,這個任務既具備基本的執行能力,又具備未來獲取返回值的能力;

(2)任務執行的異常跟任務正常的返回值是經過同一個返回值返回到主線程的,主線程根據狀態判斷是異常仍是正常值;

(3)咱們的實現中運用了單一職責原則、開閉原則等設計原則,對原有代碼沒有形成任何的入侵;

彩蛋

手寫線程池目前只打算寫這兩章,後面開始進入jdk原生線程池的源碼分析,敬請期待。

另外,須要手寫線程池完整源碼的同窗請關注個人公衆號「彤哥讀源碼」,在後臺回覆「MyThreadPool」(不帶引號)便可領取手寫線程池完整源碼,注意大小寫不要弄錯哦,不然彤哥是不會給你的哈。

歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode

相關文章
相關標籤/搜索