java併發編程學習6--並行流

【概念

並行流就是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每個數據塊的流。在java7以前,並行處理數據很麻煩,第一,須要明確的把包含數據的數據結構分紅若干子部分。第二,給每個子部分分配一個獨立的線程。第三,適當的時候進行同步,避免出現數據競爭帶來的問題,最後將每個子部分的結果合併。在java7中引入了forkjoin框架來完成這些步驟,而java8中的stream接口可讓你不費吹灰之力就對數據執行並行處理,而stream接口幕後正是使用的forkjoin框架。不過,對順序流調用parallel()並不意味着流自己有任何的變化。它在內部實際上就是設了一個boolean標誌,表示你想讓parallel()以後的操做都並行執行。相似的你能夠用sequential()將並行流變爲順序流。這兩個方法可讓咱們更細化的控制流。java

eg.java8中stream的使用:算法

//順序求和
public static long sum(long n){
    return Stream.iterate(1l,i -> i + 1)
            .limit(n)
            .reduce(0l,Long::sum);
}

//並行求和
public static long parallelSum(long n){
    return Stream.iterate(1l,i -> i + 1)
            .limit(n)
            //將流轉爲並行流
            .parallel()
            .reduce(0l,Long::sum);
}

【配置並行流線程池

並行流內部使用了默認的forkjoinPool,默認的線程數量就是處理器的數量(包括虛擬內核),
經過:Runtime.getRuntime().availableProcessors() 獲得。
經過:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")來改變線程池大小。緩存

【性能測試

咱們不該該理所固然的任認爲多線程比順序執行的效率更高,來看下面的例子:服務器

public class Exercise {

    public static void main(String[] args) {
        long num = 1000_000_0;

        long st = System.currentTimeMillis();
        System.out.println("iterate順序" + sum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("iterate並行" + parallelSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("迭代" + forSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("LongStream並行" + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("LongStream順序" + longStreamSum(num) + ":" +(System.currentTimeMillis() - st));
    }

    //順序求和
    public static long sum(long n){
        return Stream.iterate(1l,i -> i + 1)
                .limit(n)
                .reduce(0l,Long::sum);
    }

    //並行求和
    public static long parallelSum(long n){
        return Stream.iterate(1l,i -> i + 1)
                .limit(n)
                //將流轉爲並行流
                .parallel()
                .reduce(0l,Long::sum);
    }

    //迭代求和
    public static long forSum(long n){
        long result = 0;
        for(long i = 0 ;i <= n ; i++){
            result += i;
        }
        return result;
    }

    //longStream並行
    public static long longStreamParallelSum(long n){
        return LongStream.rangeClosed(1,n)
                .parallel()
                .reduce(0l,Long::sum);
    }

    //longStream順序執行
    public static long longStreamSum(long n){
        return LongStream.rangeClosed(1,n)
                .reduce(0l,Long::sum);
    }
}

並行流執行的時間比順序流和迭代執行的要長不少,兩個緣由:數據結構

  1. iterate()生成的是裝箱對象,必需要拆箱才能求和;
  2. iterate()很難分紅多個獨立的塊並行運行,由於每次應用這個函數都要依賴前一次的應用的結果。數字列表在概括的過程開始時沒有準備好,於是沒法有效的把流劃分紅小塊來並行處理。可是咱們又標記流爲並行執行,這就給順序執行增長了開銷,每一次的求和操做都新開啓了一個線程。

【使用更有針對性的的方法

LongStream.rangeClosed():多線程

1. 直接產生long類型數據,沒有開箱操做
    2. 生成數字範圍,容易拆分紅獨立的小塊

因而可知,選擇適當的數據結構每每比並行化算法更重要。並行是有代價的。並行過程須要對流作遞歸劃分,把每一個子流的操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。可是多核之間移動數據的代價比咱們想象的要大,因此很重要的一點是保證再內核中並行執行的工做時間比內核之間傳輸數據的時間要長架構

【正確的使用並行流

錯誤使用並行流的首要緣由就是使用的算法改變了共享變量的狀態,由於修改共享變量意味着同步,而使用同步方法就會使的並行毫無心義。如下是一些建議:併發

1. 測試,並行仍是順序執行最重要的基準就是不停的測試性能。
2. 留意裝箱,自動裝箱,拆箱會大大下降性能,java8提供了LongStream,IntStream,DoubleStream來避免這兩個操做。
3. 有些操做自己就是順序執行要率高,例如:limit,findFirst等依賴元素順序的操做。
4. 當執行單個任務的成本高時使用並行,若是單個操做的成本很低,並行執行反而會由於開啓線程,標記狀態等操做使得效率降低。
5. 小量數據不適用並行。
6. 考慮流中背後的數據結構是否易於分解。ArrayList的拆分效率比LinkedList高得多,由於前者用不着便利就能夠平均拆分。另外,range工廠方法的原始類型數據流也能夠快速分解。如下時流數據源的可分解性:
   - ArrayList:極佳
   - LinkedList:差
   - IntStream等:極佳
   - Stream.iterate:差
   - HashSet:好
   - TreeSet:好
7. 中間操做改變流的方法,涉及到排序就不適用並行。
8. 終端操做合併流的代價,涉及到排序就不適用並行。

【正確的使用並行

  1. 高併發、任務執行時間短的業務,線程池線程數能夠設置爲CPU核數+1,減小線程上下文的切換
  2. 併發不高、任務執行時間長的業務要區分開看:框架

    • 假如是業務時間長集中在IO操做上,也就是IO密集型的任務,由於IO操做並不佔用CPU,因此不要讓全部的CPU閒下來,能夠加大線程池中的線程數目,讓CPU處理更多的業務
    • 假如是業務時間長集中在計算操做上,也就是計算密集型任務,這個就沒辦法了,和(1)同樣吧,線程池中的線程數設置得少一些,減小線程上下文的切換
  3. 併發高、業務執行時間長,解決這種類型任務的關鍵不在於線程池而在於總體架構的設計,看看這些業務裏面某些數據是否能作緩存是第一步,增長服務器是第二步,至於線程池的設置,設置參考(2)。最後,業務執行時間長的問題,也可能須要分析一下,看看能不能使用中間件對任務進行拆分和解耦。
相關文章
相關標籤/搜索