雖然目前處理器核心數已經發展到很大數目,可是按任務併發處理並不能徹底充分的利用處理器資源,由於通常的應用程序沒有那麼多的併發處理任務。基於這種現狀,考慮把一個任務拆分紅多個單元,每一個單元分別獲得執行,最後合併每一個單元的結果。javascript
Fork/Join框架是JAVA7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終彙總每一個小任務結果後獲得大任務結果的框架java
指的是某個線程從其餘隊列裏竊取任務來執行。使用的場景是一個大任務拆分紅多個小任務,爲了減小線程間的競爭,把這些子任務分別放到不一樣的隊列中,而且每一個隊列都有單獨的線程來執行隊列裏的任務,線程和隊列一一對應。可是會出現這樣一種狀況:A線程處理完了本身隊列的任務,B線程的隊列裏還有不少任務要處理。A是一個很熱情的線程,想過去幫忙,可是若是兩個線程訪問同一個隊列,會產生競爭,因此A想了一個辦法,從雙端隊列的尾部拿任務執行。而B線程永遠是從雙端隊列的頭部拿任務執行(任務是一個個獨立的小任務),這樣感受A線程像是小偷在竊取B線程的東西同樣。算法
工做竊取算法的優勢:併發
利用了線程進行並行計算,減小了線程間的競爭。框架
工做竊取算法的缺點:異步
一、若是雙端隊列中只有一個任務時,線程間會存在競爭。ide
二、竊取算法消耗了更多的系統資源,如會建立多個線程和多個雙端隊列。this
Fork/Join中兩個重要的類:線程
一、ForkJoinTask:使用該框架,須要建立一個ForkJoin任務,它提供在任務中執行fork和join操做的機制。通常狀況下,咱們並不須要直接繼承ForkJoinTask類,只須要繼承它的子類,它的子類有兩個:設計
a、RecursiveAction:用於沒有返回結果的任務。
b、RecursiveTask:用於有返回結果的任務。
二、ForkJoinPool:任務ForkJoinTask須要經過ForkJoinPool來執行。
package test; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer> { private static final long serialVersionUID = 1L; //閾值 private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; //判斷任務是否足夠小 boolean canCompute = (end - start) <= THRESHOLD; if(canCompute) { //若是小於閾值,就進行運算 for(int i=start; i<=end; i++) { sum += i; } } else { //若是大於閾值,就再進行任務拆分 int middle = (start + end)/2; CountTask leftTask = new CountTask(start,middle); CountTask rightTask = new CountTask(middle+1,end); //執行子任務 leftTask.fork(); rightTask.fork(); //等待子任務執行完,並獲得執行結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); //合併子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask task = new CountTask(1,6); //執行一個任務 Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
這個程序是將1+2+3+4+5+6拆分紅1+2;3+4;5+6三個部分進行子程序進行計算後合併。
一、leftTask.fork();
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
fork方法內部會先判斷當前線程是不是ForkJoinWorkerThread的實例,若是知足條件,則將task任務push到當前線程所維護的雙端隊列中。
final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }
在push方法中,會調用ForkJoinPool的signalWork方法喚醒或建立一個工做線程來異步執行該task任務。
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); }
經過doJoin方法返回的任務狀態來判斷,若是不是NORMAL,則拋異常:
private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); }
來看下doJoin方法:
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }
先查看任務狀態,若是已經完成,則直接返回任務狀態;若是沒有完成,則從任務隊列中取出任務並執行。
map
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final List<Integer> doubleNumbers = numbers.stream() .map(number -> number * 2) .collect(Collectors.toList());
結果:[2, 4, 6, 8]
也能夠搞成其餘的類型,初始List是Integer,也能夠變成String
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final List<String> numberIndex = numbers.stream() .map(number -> "#" + number) .collect(Collectors.toList());
結果:[#1, #2, #3, #4]
reduce
1.不提供初始值的reduce,返回值是Optional,表示可能爲空,使用orElseGet能夠返回一個null時的默認值
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final Optional<Integer> sum = numbers.stream() .reduce((a, b) -> a + b); sum.orElseGet(() -> 0);
結果:10
這裏的(a, b) -> a + b的類型實際上是BinaryOperator,它接受兩個類型相同的參數
當把numbers改成Arrays.asList()時,結果爲0。
2.使用初始值的reduce,由於提供了初始值,因此返回值再也不是Optional
final List<Integer> numbers = Arrays.asList(1, 2, 3, 4); final Integer sum = numbers.stream() .reduce(0, (a, b) -> a + b);
結果:10