並行流就是把一個內容分紅多個數據塊,並用不一樣的線程分別處理每一個數據塊的流。
Java 8 中將並行進行了優化,咱們能夠很容易的對數據進行並行操做。Stream API 能夠聲明性地經過parallel() 與sequential() 在並行流與順序流之間進行切換。java
Fork/Join 框架:就是在必要的狀況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行join 彙總.框架
採用「工做竊取」模式(work-stealing):當執行新的任務時它能夠將其拆分分紅更小的任務執行,並將小任務加到線程隊列中,而後再從一個隨機線程的隊列中偷一個並把它放在本身的隊列中。ide
相對於通常的線程池實現,fork/join框架的優點體如今對其中包含的任務的處理方式上.在通常的線程池中,若是一個線程正在執行的任務因爲某些緣由沒法繼續運行,那麼該線程會處於等待狀態.而在fork/join框架實現中,若是某個子問題因爲等待另一個子問題的完成而沒法繼續運行.那麼處理該子問題的線程會主動尋找其餘還沒有運行的子問題來執行.這種方式減小了線程的等待時間,提升了性能.性能
ForkJoinCalculate.java優化
package www.muzi.com; import java.util.concurrent.RecursiveTask; /** * Date:2017/3/10 15:18 */ public class ForkJoinCalculate extends RecursiveTask<Long>{ private long start; private long end; private static final long THRESHOLD = 10000L; public ForkJoinCalculate(long start, long end) { this.start = start; this.end = end; } @Override protected Long compute() { long length = end - start; if (length <= THRESHOLD){ long sum = 0; for (long i = start; i <= end; i++) { sum += i; } return sum; }else{ long middle = (start + end) / 2; ForkJoinCalculate left = new ForkJoinCalculate(start, middle); left.fork();//拆分子任務,同時壓入線程隊列 ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end); right.fork();//拆分子任務,同時壓入線程隊列 return left.join() + right.join(); } } }
TestForkJoin.javathis
package www.muzi.com; import org.junit.Test; import java.time.Duration; import java.time.Instant; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream; /** * Date:2017/3/10 17:12 */ public class TestForkJoin { Long num = 10000000000L; /** * ForkJoin */ @Test public void test1(){ Instant start = Instant.now(); ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinCalculate(0,num); Long sum = pool.invoke(task); System.out.println(sum); Instant end = Instant.now(); System.out.println("耗費時間:" + Duration.between(start,end).toMillis());//3904 } /** * 普通for循環 */ @Test public void test2(){ Instant start = Instant.now(); Long sum = 0L; for (int i = 0; i < num; i++) { sum += i; } System.out.println(sum); Instant end = Instant.now(); System.out.println("耗費時間:" + Duration.between(start,end).toMillis());//由於時間太長。。。。不等了 } /** * 並行流 */ @Test public void test3(){ Instant start = Instant.now(); LongStream.rangeClosed(0, num) .parallel() .reduce(0, Long:: sum); Instant end = Instant.now(); System.out.println("耗費時間:" + Duration.between(start,end).toMillis());//3887 } /** * 串行流 */ @Test public void test4(){ Instant start = Instant.now(); LongStream.rangeClosed(0, num) .sequential() .reduce(0, Long:: sum); Instant end = Instant.now(); System.out.println("耗費時間:" + Duration.between(start,end).toMillis());//7398 } }