JAVA並行框架:Fork/Join

1、背景

雖然目前處理器核心數已經發展到很大數目,可是按任務併發處理並不能徹底充分的利用處理器資源,由於通常的應用程序沒有那麼多的併發處理任務。基於這種現狀,考慮把一個任務拆分紅多個單元,每一個單元分別獲得執行,最後合併每一個單元的結果。java

Fork/Join框架是JAVA7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架。算法

fork/join

2、工做竊取算法

指的是某個線程從其餘隊列裏竊取任務來執行。使用的場景是一個大任務拆分紅多個小任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列中,而且每一個隊列都有單獨的線程來執行隊列裏的任務,線程和隊列一一對應。可是會出現這樣一種狀況:A線程處理完了本身隊列的任務,B線程的隊列裏還有不少任務要處理。A是一個很熱情的線程,想過去幫忙,可是若是兩個線程訪問同一個隊列,會產生競爭,因此A想了一個辦法,從雙端隊列的尾部拿任務執行。而B線程永遠是從雙端隊列的頭部拿任務執行(任務是一個個獨立的小任務),這樣感受A線程像是小偷在竊取B線程的東西同樣。併發

work-stealing

工做竊取算法的優勢:框架

         利用了線程進行並行計算,減小了線程間的競爭。異步

工做竊取算法的缺點:ide

         一、若是雙端隊列中只有一個任務時,線程間會存在競爭。this

         二、竊取算法消耗了更多的系統資源,如會建立多個線程和多個雙端隊列。spa

3、框架設計

 Fork/Join中兩個重要的類:線程

一、ForkJoinTask:使用該框架,須要建立一個ForkJoin任務,它提供在任務中執行fork和join操做的機制。通常狀況下,咱們並不須要直接繼承ForkJoinTask類,只須要繼承它的子類,它的子類有兩個:設計

a、RecursiveAction:用於沒有返回結果的任務。

b、RecursiveTask:用於有返回結果的任務。

二、ForkJoinPool:任務ForkJoinTask須要經過ForkJoinPool來執行。

 1 package test;
 2 
 3 import java.util.concurrent.ExecutionException;
 4 import java.util.concurrent.ForkJoinPool;
 5 import java.util.concurrent.Future;
 6 import java.util.concurrent.RecursiveTask;
 7 
 8 
 9 public class CountTask extends RecursiveTask<Integer>
10 {
11     private static final long serialVersionUID = 1L;
12     //閾值
13     private static final int THRESHOLD = 2;
14     private int start;
15     private int end;
16     
17     public CountTask(int start, int end)
18     {
19         this.start = start;
20         this.end = end;
21     }
22 
23     @Override
24     protected Integer compute()
25     {
26         int sum = 0;
27         //判斷任務是否足夠小
28         boolean canCompute = (end - start) <= THRESHOLD;
29         if(canCompute)
30         {
31             //若是小於閾值,就進行運算
32             for(int i=start; i<=end; i++)
33             {
34                 sum += i;
35             }
36         }
37         else
38         {
39             //若是大於閾值,就再進行任務拆分
40             int middle = (start + end)/2;
41             CountTask leftTask = new  CountTask(start,middle);
42             CountTask rightTask = new  CountTask(middle+1,end);
43             //執行子任務
44             leftTask.fork();
45             rightTask.fork();
46             //等待子任務執行完,並獲得執行結果
47             int leftResult = leftTask.join();
48             int rightResult = rightTask.join();
49             //合併子任務
50             sum = leftResult + rightResult;
51             
52         }
53         return sum;
54     }
55     
56     public static void main(String[] args)
57     {
58         ForkJoinPool forkJoinPool = new ForkJoinPool();
59         CountTask task = new CountTask(1,6);
60         //執行一個任務
61         Future<Integer> result = forkJoinPool.submit(task);
62         try
63         {
64             System.out.println(result.get());
65         }
66         catch (InterruptedException e)
67         {
68             e.printStackTrace();
69         }
70         catch (ExecutionException e)
71         {
72             e.printStackTrace();
73         }
74         
75     }
76     
77 }

這個程序是將1+2+3+4+5+6拆分紅1+2;3+4;5+6三個部分進行子程序進行計算後合併。

4、源碼解讀

一、leftTask.fork();

1 public final ForkJoinTask<V> fork() {
2         Thread t;
3         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
4             ((ForkJoinWorkerThread)t).workQueue.push(this);
5         else
6             ForkJoinPool.common.externalPush(this);
7         return this;
8     }

fork方法內部會先判斷當前線程是不是ForkJoinWorkerThread的實例,若是知足條件,則將task任務push到當前線程所維護的雙端隊列中。

 1  final void push(ForkJoinTask<?> task) {
 2             ForkJoinTask<?>[] a; ForkJoinPool p;
 3             int b = base, s = top, n;
 4             if ((a = array) != null) {    // ignore if queue removed
 5                 int m = a.length - 1;     // fenced write for task visibility
 6                 U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
 7                 U.putOrderedInt(this, QTOP, s + 1);
 8                 if ((n = s - b) <= 1) {
 9                     if ((p = pool) != null)
10                         p.signalWork(p.workQueues, this);
11                 }
12                 else if (n >= m)
13                     growArray();
14             }
15         }

在push方法中,會調用ForkJoinPool的signalWork方法喚醒或建立一個工做線程來異步執行該task任務。

二、

 public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

經過doJoin方法返回的任務狀態來判斷,若是不是NORMAL,則拋異常:

 private void reportException(int s) {
        if (s == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL)
            rethrow(getThrowableException());
    }

來看下doJoin方法:

private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

先查看任務狀態,若是已經完成,則直接返回任務狀態;若是沒有完成,則從任務隊列中取出任務並執行。

相關文章
相關標籤/搜索