Java7中的ForkJoin併發框架初探(下)—— ForkJoin的應用

前兩篇文章已經對Fork Join的設計和JDK中源碼的簡要分析。這篇文章,咱們來簡單地看看咱們在開發中怎麼對JDK提供的工具類進行應用,以提升咱們的需求處理效率。html

Fork Join這東西確實用好了能給咱們的任務處理提升效率,也爲開發帶來方便。但Fork Join不是那麼容易用好的,咱們先來看幾個例子(反例)。java

####1. 反例錯誤分析api

咱們先來看看這篇文章中提供的例子:http://www.iteye.com/topic/643724 (由於是反例,就不提供超連接了,只以普通文本給出URL)數組

這篇文章是我學習和整理Fork Join時搜索到的一篇文章,其實總的來講這篇文章前面分析得仍是比較好的,只是給出的第一個例子(有返回結果的RecursiveTask應用的例子)沒有正確地對Fork Join進行應用。爲了方便分析,仍是貼下這個例子中具體的的代碼吧。併發

public class Calculator extends RecursiveTask {
    private static final int THRESHOLD = 100;
    private int start;
    private int end;
 
    public Calculator(int start, int end) {
        this.start = start;
        this.end = end;
    }
 
    @Override
    protected Integer compute() {
        int sum = 0;
        if((start - end) < THRESHOLD){
            for(int i = start; i< end;i++){
                sum += i;
            }
        }else{
            int middle = (start + end) /2;
            Calculator left = new Calculator(start, middle);
            Calculator right = new Calculator(middle + 1, end);
            // 下三行高亮, 哈`
            left.fork();
            right.fork();
            sum = left.join() + right.join();
        }
        return sum;
    }
}

咱們看到其中一段已經高亮的代碼,顯示對兩個子任務進行fork()調用,即分別提交給當前線程的任務隊列,依次加到末尾。緊接着,又按照調用fork()的順序執行兩個子任務對象的join()方法。oracle

其實,這樣就有一個問題,在每次迭代中,第一個子任務會被放到線程隊列的倒數第二個位置,第二個子任務是最後一個位置。當執行join()調用的時候,因爲第一個子任務不在隊列尾而不能經過執行ForkJoinWorkerThread的unpushTask()方法取出任務並執行,線程最終只能掛起阻塞,等待通知。而Fork Join原本的作法是想經過子任務的合理劃分,避免過多的阻塞狀況出現。這樣,這個例子中的操做就違背了Fork Join的初衷,每次子任務的迭代,線程都會由於第一個子任務的join()而阻塞,加大了代碼運行的成本,提升了資源開銷,不利於提升程序性能。ide

除此以外,這段程序仍是不能進入Fork Join的過程,由於還有一個低級錯誤。看下第1五、16行代碼的條件,就清楚了。按照邏輯,start必然是比end小的。這將致使全部任務都將以循環累加的方式完成,而不會執行fork()和join()。工具

因而可知,Fork Join的使用仍是要注意對其自己的理解和對開發過程當中細節的把握的。咱們看下JDK中RecursiveAction和RecursiveTask這兩個類。性能

####2. RecursiveAction分析及應用實例學習

這兩個類都是繼承了ForkJoinTask,自己給出的實現邏輯並很少不復雜,在JDK的類文件中,它的註釋比源碼還要多。咱們能夠看下它的實現代碼。

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;
 
    protected abstract void compute();
 
    public final Void getRawResult() { return null; }
 
    protected final void setRawResult(Void mustBeNull) { }
 
    protected final boolean exec() {
        compute();
        return true;
    }
}

咱們看到其中兩個方法是關於處理空返回值的方法。而exec方法則是調用了compute(),這個compute就是咱們使用Fork Join時須要本身實現的邏輯。

咱們能夠看下API中給出的一個最簡單最具體的例子:

class IncrementTask extends RecursiveAction {
   final long[] array; final int lo; final int hi;
   IncrementTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   protected void compute() {
     if (hi - lo < THRESHOLD) {
       for (int i = lo; i < hi; ++i)
         array[i]++;
     }
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new IncrementTask(array, lo, mid),
                 new IncrementTask(array, mid, hi));
     }
   }
 }

大體的邏輯就是,對給定一個特定數組的某段,進行逐個加1的操做。咱們看到else中的代碼塊,顯示取一個lo和hi的中間值,此後分割成兩個子任務,並進行invokeAll()調用。咱們來看下繼承自FutureTask的invokeAll()方法實現。很簡單:

public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
    t2.fork();
    t1.invoke();
    t2.join();
}

對於參數中的兩個子任務,對第二個子任務進行fork(),即放入線程對應隊列的結尾,而後執行第一個子任務,再調用第二個子任務的join(),實際上就是跳轉到第二個子任務,進行執行(固然若是不能執行,就須要阻塞等待了)。

其實invokeAll()是個重載方法,同名的還有另外兩個,基本邏輯都是同樣的,咱們拿出一個通用一點的來看一下:

public static void invokeAll(ForkJoinTask<?>... tasks) {
    Throwable ex = null;
    int last = tasks.length - 1;
    for (int i = last; i >= 0; --i) {
        ForkJoinTask<?> t = tasks[i];
        if (t == null) {
            if (ex == null)
                ex = new NullPointerException();
        }
        else if (i != 0)
            t.fork();
        else if (t.doInvoke() < NORMAL && ex == null)
            ex = t.getException();
    }
    for (int i = 1; i <= last; ++i) {
        ForkJoinTask<?> t = tasks[i];
        if (t != null) {
            if (ex != null)
                t.cancel(false);
            else if (t.doJoin() < NORMAL && ex == null)
                ex = t.getException();
        }
    }
    if (ex != null)
        UNSAFE.throwException(ex);
}

咱們發現第一個子任務(i==0的狀況)沒有進行fork,而是直接執行,其他的通通先調用fork()放入任務隊列,以後再逐一join()。其實咱們注意到一個要點就是第一個任務不要fork()再join(),也就是上面中例子的錯誤所在,這樣會形成阻塞,而不能充分利用Fork Join的特色,也就不能保證任務執行的性能。

Oracle的JavaSE7 API中在RecursiveAction裏還有一個更復雜的例子,是計算double數組平方和的,因爲代碼較長,就不列在這裏了。整體思路和上面是同樣的,額外增長了動態閾值的判斷,感興趣的想深刻理解的能夠到這裏去參考一下。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/RecursiveAction.html

####3. RecursiveTask簡要說明

其實說完了RecursiveAction,RecursiveTask能夠用「同理」來解釋。實現代碼也很簡單:

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;
 
    V result;
 
    protected abstract V compute();
 
    public final V getRawResult() {
        return result;
    }
 
    protected final void setRawResult(V value) {
        result = value;
    }
 
    protected final boolean exec() {
        result = compute();
        return true;
    }
}

咱們看到惟一不一樣的是返回結果的處理,其他均可以和RecursiveAction同樣使用。

####4. Fork Join應用小結

Fork Join是爲咱們提供了一個很是好的「分而治之」思想的實現平臺,而且在必定程度上實現了「變串行併發爲並行」。但Fork Join不是萬能的頁不徹底是通用的,對於可很好分解成子任務的場景,咱們能夠對其進行應用,更多時候要考慮需求和應用場景,而且注意其使用要點才行。

相關文章
相關標籤/搜索