《java.util.concurrent 包源碼閱讀》22 Fork/Join框架的初體驗

JDK7引入了Fork/Join框架,所謂Fork/Join框架,我的解釋:Fork分解任務成獨立的子任務,用多線程去執行這些子任務,Join合併子任務的結果。這樣就能使用多線程的方式來執行一個任務。數組

 

JDK7引入的Fork/Join有三個核心類:多線程

ForkJoinPool,執行任務的線程池併發

ForkJoinWorkerThread,執行任務的工做線程框架

ForkJoinTask,一個用於ForkJoinPool的任務抽象類。this

 

由於ForkJoinTask比較複雜,抽象方法比較多,平常使用時通常不會繼承ForkJoinTask來實現自定義的任務,而是繼承ForkJoinTask的兩個子類:spa

RecursiveTask:子任務帶返回結果時使用線程

RecursiveAction:子任務不帶返回結果時使用code

 

對於Fork/Join框架的原理,Doug Lea的文章:A Java Fork/Join Framework對象

在看了網上的不少例子以後,發如今自定義任務類實現compute方法的邏輯通常是這樣的:blog

if 任務足夠小
    直接返回結果
else
    分割成N個子任務
    依次調用每一個子任務的fork方法執行子任務
    依次調用每一個子任務的join方法合併執行結果

 

而執行該自定義任務的調用的則是ForkJoinPool的execute方法,所以首先來看的就是ForkJoinPool的execute方法,看看和普通線程池執行任務有什麼不一樣:

    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        forkOrSubmit(task);
    }

所以forkOrSubmit是真正執行ForkJoinTask的方法:

    private <T> void forkOrSubmit(ForkJoinTask<T> task) {
        ForkJoinWorkerThread w;
        Thread t = Thread.currentThread();
        if (shutdown)
            throw new RejectedExecutionException();
        if ((t instanceof ForkJoinWorkerThread) &&
            (w = (ForkJoinWorkerThread)t).pool == this)
            w.pushTask(task);
        else
            // 正常執行的時候是主線程調用的,所以關注addSubmission
            addSubmission(task);
    }

 

那麼咱們首先要關注的是addSubmission方法,發覺所作的事情和普通線程池很相似,就是把任務加入到隊列中,不一樣的是直接使用Unsafe操做內存來添加任務對象

    private void addSubmission(ForkJoinTask<?> t) {
        final ReentrantLock lock = this.submissionLock;
        lock.lock();
        try {
            // 隊列只是普通的數組而不是普通線程池的BlockingQueue,
            // 喚醒worker線程的工做由下面的signalWork來完成
            // 使用Unsafe進行內存操做,把任務放置在數組中
            ForkJoinTask<?>[] q; int s, m;
            if ((q = submissionQueue) != null) {
                long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;
                if (s - queueBase == m)
                    // 數組已滿,爲數組擴容
                    growSubmissionQueue();
            }
        } finally {
            lock.unlock();
        }
        // 通知有新任務來了:兩種操做,有空閒線程則喚醒該線程
        // 不然若是能夠新建worker線程則爲這個任務新建worker線程
        // 若是不能夠就返回了,等到有空閒線程來執行這個任務
        signalWork();
    }

 

接下來要弄清楚就是在compute中fork時,按道理來講這個動做是和主任務在同一個線程中執行,fork是若是把子任務變成多線程執行的:

    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

 

在上面分析forkOrSubmit的時候一樣見到了ForkJoinWorkerThreadpushTask方法調用,那麼來看這個方法:

    final void pushTask(ForkJoinTask<?> t) {
        // 代碼的基本邏輯和ForkJoinPool的addSubmission方法基本一致
        // 都是把任務加入了任務隊列中,這裏是加入到ForkJoinWorkerThread
        // 內置的任務隊列中
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            UNSAFE.putOrderedObject(q, u, t);
            queueTop = s + 1;         // or use putOrderedInt
            // 這裏不太明白
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue();
        }
    }

 

看到這裏一會兒陷入了僵局,爲何ForkJoinWorkerThread要內建一個隊列呢,並且若是子任務仍舊在同一個線程內的話,何以實現併發執行子任務呢?下一篇文章繼續。

相關文章
相關標籤/搜索