在Java7以前想要並行處理大量數據是很困難的,首先把數據拆分紅不少個部分,而後把這這些子部分放入到每一個線程中去執行計算邏輯,最後在把每一個線程返回的計算結果進行合併操做;在Java7中提供了一個處理大數據的fork/join框架,屏蔽掉了線程之間交互的處理,更加專一於數據的處理。java
Fork/Join框架採用的是思想就是分而治之,把大的任務拆分紅小的任務,而後放入到獨立的線程中去計算,同時爲了最大限度的利用多核CPU,採用了一個種工做竊取
的算法來運行任務,也就是說當某個線程處理完本身工做隊列中的任務後,嘗試當其餘線程的工做隊列中竊取一個任務來執行,直到全部任務處理完畢。因此爲了減小線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行;在百度找了一張圖算法
RecursiveTask
使用Fork/Join框架首先須要建立本身的任務,須要繼承RecursiveTask
,實現抽象方法segmentfault
protected abstract V compute();
實現類須要在該方法中實現任務的拆分,計算,合併;僞代碼能夠表示成這樣:安全
if(任務已經不可拆分){ return 順序計算結果; } else { 1.任務拆分紅兩個子任務 2.遞歸調用本方法,拆分子任務 3.等待子任務執行完成 4.合併子任務的結果 }
任務:完成對一億個天然數求和網絡
咱們先使用串行的方式實現,代碼以下:框架
long result = LongStream.rangeClosed(1, 100000000) .reduce(0, Long::sum); System.out.println("result:" + result);
使用Fork/Join框架實現,代碼以下:ide
public class SumRecursiveTask extends RecursiveTask<Long> { private long[] numbers; private int start; private int end; public SumRecursiveTask(long[] numbers) { this.numbers = numbers; this.start = 0; this.end = numbers.length; } public SumRecursiveTask(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length < 20000) { //小於20000個就不在進行拆分 return sum(); } SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); //進行任務拆分 SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); //進行任務拆分 leftTask.fork(); //把該子任務交友ForkJoinPoll線程池去執行 rightTask.fork(); //把該子任務交友ForkJoinPoll線程池去執行 return leftTask.join() + rightTask.join(); //把子任務的結果相加 } private long sum() { int sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static void main(String[] args) { long[] numbers = LongStream.rangeClosed(1, 100000000).toArray(); Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers)); System.out.println("result:" +result); } }
Fork/Join默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime().available- Processors()
獲得的。 可是你能夠經過系統屬性java.util.concurrent.ForkJoinPool.common. parallelism
來改變線程池大小,以下所示:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
這是一個全局設置,所以它將影響代碼中全部的並行流。目前還沒法專爲某個 並行流指定這個值。由於會影響到全部的並行流,因此在任務中經歷避免網絡/IO操做,不然可能會拖慢其餘並行流的運行速度
以上咱們說到的都是在Java7中使用並行流的操做,Java8並無止步於此,爲咱們提供更加便利的方式,那就是parallelStream
;parallelStream
底層仍是經過Fork/Join框架來實現的。測試
1.串行流轉化成並行流大數據
LongStream.rangeClosed(1,1000) .parallel() .forEach(System.out::println);
2.直接生成並行流this
List<Integer> values = new ArrayList<>(); for (int i = 0; i < 10000; i++) { values.add(i); } values.parallelStream() .forEach(System.out::println);
咱們使用parallelStream
來實現上面的累加例子看看效果,代碼以下:
public static void main(String[] args) { Summer summer = new Summer(); LongStream.rangeClosed(1, 100000000) .parallel() .forEach(summer::add); System.out.println("result:" + summer.sum); } static class Summer { public long sum = 0; public void add(long value) { sum += value; } }
運行結果以下:
運行以後,咱們發現運行的結果不正確,而且每次運行的結果都不同,這是爲何呢?
這裏其實就是錯用parallelStream
常見的狀況,parallelStream
是非線程安全的,在這個裏面中使用多個線程去修改了共享變量sum, 執行了sum += value
操做,這個操做自己是非原子性的,因此在使用並行流時應該避免去修改共享變量。
修改上面的例子,正確使用parallelStream
來實現,代碼以下:
long result = LongStream.rangeClosed(1, 100000000) .parallel() .reduce(0, Long::sum); System.out.println("result:" + result);
在前面咱們已經說過了fork/join的操做流程是:拆子部分,計算,合併結果;由於parallelStream
底層使用的也是fork/join框架,因此這些步驟也是須要作的,可是從上面的代碼,咱們看到Long::sum
作了計算,reduce
作了合併結果,咱們並無去作任務的拆分,因此這個過程確定是parallelStream
已經幫咱們實現了,這個時候就必須的說說Spliterator
Spliterator
是Java8加入的新接口,是爲了並行執行任務而設計的。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
tryAdvance: 遍歷全部的元素,若是還有能夠遍歷的就返回ture,不然返回false
trySplit: 對全部的元素進行拆分紅小的子部分,若是已經不能拆分就返回null
estimateSize: 當前拆分裏面還剩餘多少個元素
characteristics: 返回當前Spliterator特性集的編碼
感謝你們能夠耐心地讀到這裏。 固然,文中或許會存在或多或少的不足、錯誤之處,有建議或者意見也很是歡迎你們在評論交流。 最後,但願朋友們能夠點贊評論關注三連,由於這些就是我分享的所有動力來源🙏