並行流就是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每個數據塊的流。在java7以前,並行處理數據很麻煩,第一,須要明確的把包含數據的數據結構分紅若干子部分。第二,給每個子部分分配一個獨立的線程。第三,適當的時候進行同步,避免出現數據競爭帶來的問題,最後將每個子部分的結果合併。在java7中引入了forkjoin框架來完成這些步驟,而java8中的stream接口可讓你不費吹灰之力就對數據執行並行處理,而stream接口幕後正是使用的forkjoin框架。不過,對順序流調用parallel()並不意味着流自己有任何的變化。它在內部實際上就是設了一個boolean標誌,表示你想讓parallel()以後的操做都並行執行。相似的你能夠用sequential()將並行流變爲順序流。這兩個方法可讓咱們更細化的控制流。java
eg.java8中stream的使用:算法
//順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //並行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉爲並行流 .parallel() .reduce(0l,Long::sum); }
並行流內部使用了默認的forkjoinPool,默認的線程數量就是處理器的數量(包括虛擬內核),
經過:Runtime.getRuntime().availableProcessors() 獲得。
經過:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")來改變線程池大小。緩存
咱們不該該理所固然的任認爲多線程比順序執行的效率更高,來看下面的例子:服務器
public class Exercise { public static void main(String[] args) { long num = 1000_000_0; long st = System.currentTimeMillis(); System.out.println("iterate順序" + sum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("iterate並行" + parallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("迭代" + forSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream並行" + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream順序" + longStreamSum(num) + ":" +(System.currentTimeMillis() - st)); } //順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //並行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉爲並行流 .parallel() .reduce(0l,Long::sum); } //迭代求和 public static long forSum(long n){ long result = 0; for(long i = 0 ;i <= n ; i++){ result += i; } return result; } //longStream並行 public static long longStreamParallelSum(long n){ return LongStream.rangeClosed(1,n) .parallel() .reduce(0l,Long::sum); } //longStream順序執行 public static long longStreamSum(long n){ return LongStream.rangeClosed(1,n) .reduce(0l,Long::sum); } }
並行流執行的時間比順序流和迭代執行的要長不少,兩個緣由:數據結構
LongStream.rangeClosed():多線程
1. 直接產生long類型數據,沒有開箱操做 2. 生成數字範圍,容易拆分紅獨立的小塊
因而可知,選擇適當的數據結構每每比並行化算法更重要。並行是有代價的。並行過程須要對流作遞歸劃分,把每一個子流的操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。可是多核之間移動數據的代價比咱們想象的要大,因此很重要的一點是保證再內核中並行執行的工做時間比內核之間傳輸數據的時間要長。架構
錯誤使用並行流的首要緣由就是使用的算法改變了共享變量的狀態,由於修改共享變量意味着同步,而使用同步方法就會使的並行毫無心義。如下是一些建議:併發
1. 測試,並行仍是順序執行最重要的基準就是不停的測試性能。 2. 留意裝箱,自動裝箱,拆箱會大大下降性能,java8提供了LongStream,IntStream,DoubleStream來避免這兩個操做。 3. 有些操做自己就是順序執行要率高,例如:limit,findFirst等依賴元素順序的操做。 4. 當執行單個任務的成本高時使用並行,若是單個操做的成本很低,並行執行反而會由於開啓線程,標記狀態等操做使得效率降低。 5. 小量數據不適用並行。 6. 考慮流中背後的數據結構是否易於分解。ArrayList的拆分效率比LinkedList高得多,由於前者用不着便利就能夠平均拆分。另外,range工廠方法的原始類型數據流也能夠快速分解。如下時流數據源的可分解性: - ArrayList:極佳 - LinkedList:差 - IntStream等:極佳 - Stream.iterate:差 - HashSet:好 - TreeSet:好 7. 中間操做改變流的方法,涉及到排序就不適用並行。 8. 終端操做合併流的代價,涉及到排序就不適用並行。
併發不高、任務執行時間長的業務要區分開看:框架