Java 8 (6) Stream 流 - 並行數據處理與性能

在Java 7以前,並行處理集合很是麻煩。首先你要明確的把包含數據的數據結構分紅若干子部分,而後你要把每一個子部分分配一個獨立的線程。而後,你須要在恰當的時候對他們進行同步來避免競爭,等待全部線程完成。最後,把這些部分結果合併起來。Java 7中引入了一個叫作 分支/合併的框架,讓這些操做更穩定,更不容易出錯。java

並行流算法

  使用Stream接口能夠方便的處理它的元素,能夠經過對收集源調用parallelStream方法來把集合轉換爲並行流。並行流就是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每一個數據塊的流。這樣就能夠把給定操做的工做負荷分配給多核處理器的全部內核,讓它們都忙起來。編程

例如求和:1到10000之間的和。數組

    //求和
    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).reduce(0L,Long::sum);
    }

這段代碼等價於傳統Java:數據結構

    //求和
    public static long getSum(long n){
        long sum = 0;
        for(long i = 1L;i<=10000;i++){
            sum += i;
        }
        return sum;
    }

將順序流轉換爲並行流app

  只須要對順序流調用parallel方法便可轉換爲並行流:框架

    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).parallel().reduce(0L,Long::sum);
    }

這段代碼在內部將Stream分紅了幾塊,所以能夠對不一樣的塊獨立進行概括操做。最後,同一個概括操做會將各個子流的部分概括結果合併起來,獲得整個原始流結果。異步

對順序流執行parallel方法並不意味着流自己有任何實際的變化,它內部就是一個布爾值,表示parallel以後進行的操做都並行執行,只須要對並行流調用sequential方法就能夠變回順序流。這兩個方法能夠結合起來,在須要並行的時候並行,須要串行的時候串行。ide

    Stream.parallel()
          .filter(...)
          .sequential()
          .map(...)
          .parallel()
          .reduce();

可是 最後一次parallel或sequential調用會影響整個流水線,上面的例子流水線會並行執行,由於最後調用的是它。函數

 

並行流內部使用了默認的ForkJoinPool,它默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime().availableProcessors()獲得的,能夠經過系統屬性java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池的大小,例如:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")。這是一個全局屬性,意味着全部的並行操做都會受影響。通常不建議修改它。

 

對這三個方法進行測量:

編寫一個測量方法,這個方法接受一個函數和一個long參數,他會對傳給方法的long應用函數10次,記錄每次執行的時間(毫秒),並返回最短的一次執行時間:

    public static long measureSumPerf(Function<Long, Long> adder, long n) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            long sum = adder.apply(n);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + sum);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

 

    //iterate
    public static long getSum(long n){
        return Stream.iterate(1L,i->i+1).limit(n).reduce(0L,Long::sum);
    }
    //iterate Parallel
    public static long getSumParallel(long n){
        return Stream.iterate(1L,i->i+1).limit(n).parallel().reduce(0L,Long::sum);
    }
    //Java Old
    public static long getSumOldJava(long n){
        long sum = 0;
        for(int i = 0;i<=n;i++){
            sum += i;
        }
        return sum;
    }
System.out.println(measureSumPerf(Main::getSum,10000000)); //105
System.out.println(measureSumPerf(Main::getSumParallel,10000000)); //147
System.out.println(measureSumPerf(Main::getSumOldJava,10000000)); //5

用傳統for循環的方式是最快的,由於它更爲底層,更重要的是不須要對原始類型進行任何裝箱或拆箱操做。他才5毫秒便可完成。

順序化執行結果爲105毫秒,

用並行化進行測試,結果竟然是最慢的 147毫秒,由於iterate生成的是裝箱的對象,必須拆箱成數字才能求和,而且咱們很難把iterate分紅多個獨立塊來進行並行執行。

這意味着 並行化編程可能很複雜,若是用的不對,它甚至會讓程序的總體性能更差。

LongStream.rangeClosed方法與iterate相比有兩個優勢:

1.LongStream.rangeClosed直接產生原始類型的long數字,沒有裝箱和拆箱。

2.LongStream.rangeClosed會生成數字範圍,很容易拆分爲獨立的小塊。

    //5
    public static long GetRangeClosedSum(long n){
        return LongStream.rangeClosed(1,n).reduce(0L,Long::sum);
    }

順序化的LongStream.rangeClosed 只花費了5毫秒,他比iterate順序化要快得多,由於他沒有裝箱和拆箱。再來看看並行化:

    //1
    public static long GetRangeClosedSumParallel(long n){
        return LongStream.rangeClosed(1,n).parallel().reduce(0L,Long::sum);
    }

LongStream.rangeClosed 調用parallel方法後,執行只使用了1毫秒,終於能夠像上面圖中同樣並行了,並行化過程自己須要對流作遞歸劃分,把每一個子流的概括操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。

 

正確使用並行流
  錯用並行流而產生錯誤的首要緣由,就是使用的算法改變了某些共享狀態,例如 另外一種實現對n個天然數求和的方法,但這會改變一個共享累加器:

public class Accumulator {
    public long total = 0;
    public void add(long value){
        total += value;
    }
}
    public static long sideEffectSum(long n){
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1,n).forEach(Accumulator::add);
        return accumulator.total;
    }

這種代碼本質就是順序的,每次訪問total都會出現數據競爭。若是你嘗試用同步來修復,那就徹底失去並行的意義了。咱們試着在forEach前加入parallel方法使其並行化:

public static long sideEffectSum(long n){
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1,n).parallel().forEach(Accumulator::add);
        return accumulator.total;
    }

調用上面的測試方法:

System.out.println(measureSumPerf(Main::sideEffectSum,10000000));
Result: 10140890625203
Result: 9544849565325
Result: 6438093946815
Result: 11805543046590
Result: 6658954367405
Result: 4642751863823
Result: 5948081550315
Result: 7219270279482
Result: 7258008360508
Result: 4898539133022
1

性能可有可無了,由於結果都是錯誤的,每次執行都會返回不一樣的結果,都離正確值差很遠。這是因爲多個線程在同時訪問累加器,執行total+=value;foreach中調用的方法會改變多個線程共享對象的可變狀態。 共享可變狀態會影響並行流以及並行計算。

 

如何使用並行流

  1.測量,把順序流轉換成並行流很容易,但不必定性能會提高。並行流不必定老是比順序流快,因此使用並行流時對其和順序流進行測量。

  2.留意裝箱。自動裝箱和拆箱操做會大大下降性能,Java 8中又原始類型流(IntStream、LongStream、DoubleStream)來避免這些操做。

  3.有些操做自己在並行流上的性能就比順序流差。特別是limit何findFirst等依賴於元素順序的操做,他們在並行流上執行的代價很是大。例如,findAny會比findFrist性能好,由於它不必定要按照順序來執行。

  4.對於小數據量,不建議使用並行流。

  5.要考慮流背後的數據結構是否易於分解。例如,ArrayList的拆分效率比LinkedList高的多,由於ArrayList用不着遍歷就能夠拆分,而LinkedList必須遍歷。另外,用range方法建立的原始類型流也能夠快速分解。

 

分支/合併框架

  分支/合併框架的目的是以遞歸的方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務結果合併起來成爲總體結果。它是ExecutorService接口的一個實現,它把子任務分配給線程池(ForkJoinPool)中的工做線程。

1.使用RecursiveTask

  要把任務提交到這個池,必須建立RecursiveTask<R>的一個子類,其中R是並行化任務(以及全部子任務)產生的結果類型,或者若是任務不返回結果,則是RecursiveAction類型(它可能會更新其餘非局部機構)。要定義RecursiveTask,只需實現它惟一的抽象方法compute,這個方法同時定義了將任務拆分紅子任務的邏輯,以及沒法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。相似如下僞代碼:

        if(任務足夠小或不可分) {
            順序計算該任務
        }else{
            將任務拆分爲兩個子任務 
            遞歸調用本方法,拆分每一個子任務,等待全部子任務完成 
            合併每一個子任務的結果
        }

通常來講沒有確切的標準來絕對一個任務是否能夠被拆分,可是有幾種試探方法能夠查看是否能夠拆分。

 之前面的求和例子爲基礎,咱們試着用這個框架爲一個數字範圍long[]數組求和,首選須要爲RecursiveTask類作一個實現,ForkJoinSumCalculator

public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> {

    //要求和的數組
    private final long[] numbers;
    //子任務處理的數組的開始位置。
    private final int start;
    //子任務處理的數組的終止位置
    private final int end;
    //不可將任務分解爲子任務的數組大小
    public static final long THRESHOLD = 10000;

    //共用構造函數用於建立主任務
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    //私有構造方法用於以遞歸方式爲主任務建立子任務
    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //該任務負責求和的部分的大小
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially(); //若是大小小於或等於閥值,順序計算結果
        }
        //建立一個子任務來爲數組的前一半進行求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        //利用另外一個ForkJoinPool線程異步執行新建立的子任務
        leftTask.fork();
        //建立一個任務爲數組的後一半進行求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);

        //同步執行第二個子任務,有可能容許進一步遞歸劃分
        Long rightResult = rightTask.compute();
        //讀取第一個子任務的結果,若是還沒有完成就等待
        Long leftResult = leftTask.join();
        //合併兩個任務的結果
        return leftResult + rightResult;
    }

    //在子任務不可再分時計算結果
    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

如今就能夠經過調用構造函數來求和了:

        long[] numbers = LongStream.rangeClosed(1,10000000).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        System.out.println(new ForkJoinPool().invoke(task));

這裏使用LongStream.rangeClosed生成了一個long 數組,而後建立了一個ForkJoinTask的父類,並把數組傳遞給ForkJoinSumCalculator的公共構造函數,最後建立愛你了一個新的ForkJoinPool,並把任務傳給它調用方法。在ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。

在實際應用時,使用多個ForkJoinPool是沒有意義的,通常來講把它實例化一次,而後把實力保存在靜態字段中,使之成爲單例。這樣就能夠在任何地方方便的重用了。

運行ForkJoinSumCalculator

  當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由池中的一個線程執行,這個線程會調用任務的compute方法。該方法會檢查任務是否小到足以順序執行,若是不夠小則會把要求和的數組分紅兩半,分給兩個新的ForkJoinSumCalculator,而它們也由ForkJoinPool安排執行。所以,這一過程能夠遞歸重複,把原任務分爲更小的任務,直到知足不方便或不可能再進一步拆分的條件。這時會順序計算每一個任務的結果,而後由分支過程建立的任務二叉樹遍歷回到它的根。接下來回合併每一個子任務的部分結果,從而獲得總任務的結果。

System.out.println(measureSumPerf(Main::forkJoinSum,10000000)); // 79

執行速度爲79毫秒,是由於必須先把整個數字流都放進一個long數組,以後才能在ForkJoinSumCalculator任務中使用它。

 

使用分支/合併框架的最佳作法

1.對一個任務調用join方法會阻塞調用方,直到該任務作出結果。所以,有必要在兩個字任務的計算都開始後再調用它。不然,你獲得的版本會比原始的順序算法更慢更復雜,由於每一個子任務都必須等待另外一個子任務完成才能啓動。

2.不該該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啓動並行計算。

3.對於子任務調用fork方法能夠把它排進ForkJoinPool。同時對左邊和右邊的子任務調用它彷佛很天然,但這樣的效率要比直接對其中一個調用compute低。這樣作你能夠爲其中一個子任務重用同一個線程,從而避免在線程池中多分配一個任務形成的開銷。

4.對於分支/合併拆分策略你必須選擇一個標準,來決定任務是要進一步拆分仍是已到能夠順序求值。

 

工做竊取

  在ForkJoinSumCalculator的例子中,設置的閥值是10000,就是在數組爲10000時就不會再建立子任務了。在測試案例中,咱們先有了一個1000萬的數組,意味着ForkJoinSumCalculator至少會分出1000個子任務來。分出大量的小任務通常來講是一個好的選擇,理想的狀況下,劃分並行任務時,應該讓每一個任務都用徹底相同時間完成,讓全部的CPU都一樣繁忙,但在實際中,每一個子任務所花費的時間可能天差地別,要麼是由於劃分策略效率低,要麼是有不可預知的緣由,好比磁盤訪問慢,或是須要和外部服務協調執行。

  分支/合併框架工程用一種稱爲工做竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味着這些任務差很少被平均分配到ForkJoinPool中的全部線程上。每一個線程都爲分配給它的任務保存一個雙向鏈式隊列,沒完成一個任務,他就會從隊列頭上取出下一個任務開始執行。基於前面所述的緣由,某個線程可能早早完成了分配給它的任務,也就是它的隊列已經空了,而其餘的線程還很忙。這時,這個線程沒有閒下來,而是隨機選了一個別的線程,從隊列的尾巴上偷走一個任務。這個過程一直繼續下去,直到全部的任務都執行完畢,全部隊列都清空。這就是爲何劃成許多小任務而不是少數幾個大任務,這樣有助於更好地在工做線程之間平衡負載。

如今應該清楚 流 是如何使用 分支/合併 框架來並行處理它的項目了。本例中咱們明確指定了將數組拆分紅多少個任務的邏輯。可是,使用並行流時就不用這麼作了,有一種機制來爲你拆分流。

 

Spliterator

  Spliterator是Java 8 中加入的另外一個新接口,這個名字表明「可分迭代器」,和Iterator同樣,Spliterator也用於遍歷數據源中的元素,但它是爲了並行執行而設計的。Java 8已經爲集合框架中包含了全部數據結構提供了一個默認的Spliterator實現。 集合實現了Spliterator接口,接口提供了一個spliterator方法。這個接口定義了若干方法:

public interface Spliterator<T> {
    
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();

}

T是Spliterator遍歷的元素類型,tryAdvance方法的行爲相似於普通的Iterator,由於它會按順序一個個地使用Spliterator中的元素,而且若是還有其餘元素要遍歷就返回true。但trySplit是專爲Spliterator接口設計的,由於它能夠把一些元素劃出去分給第二個Spliterator,讓它們兩個並行處理。Spliterator還可經過estimateSize,讓它們兩個並行處理。Spliterator還可經過estimateSize方法估計還剩下多少元素要遍歷,由於即便不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點。

拆分過程

  將Stream拆分紅多個部分的算法是一個遞歸過程,如圖所示,第一步是對第一個Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用trySplit直到它返回null,代表它處理的數據結構不能再分割,第三部,這個遞歸拆分過程到第四步就終止了,這時全部的Spliterator在調用trySplit時都返回了null。

這個拆分過程也受Spliterator自己特性影響,而特性是經過characteristics方法聲明的。

Spliterator的特性

Spliterator接口聲明的最後一個抽象方法是characteristics,它返回一個int,表明Spliterator自己特性集的編碼。

  ORDERED : 元素有既定的順序,所以Spliterator在遍歷和劃分時也遵循這一點

  DISTINCT : 對於任意一對遍歷過的元素x和y,x.equals(y) 返回false

  SORTED : 遍歷的元素按照一個預約義的順序排序

  SIZED : 該Spliterator由一個已知大小的源創建,所以estimatedSize()返回的是 準確值

  NONNULL : 保證遍歷的元素不會爲null

  IMMUTABLE: Spliterator的數據源不能修改,這意味着在遍歷時不能添加、刪除、修改任何元素

  CONCURRENT : 該Spliterator的數據源能夠被其餘線程同時修改而無需同步

  SUBSIZED : 該Spliterator和全部從它拆分出來的Spliterator都是SIZED

 

小結:

  1.內部迭代讓你能夠並行處理一個流,而無需在代碼中顯示使用和協調不一樣的線程。

  2.雖然並行處理一個流很容易,卻不能保證程序在全部狀況下都運行的更快。所以必定要測量,確保你並無把程序拖的更慢。

  3.像並行流那樣昂對一個數據集並行執行操做能夠提高性能,特別是要處理的元素數量龐大,或處理單個元素特別耗時時。

  4.儘可能使用原始特化流,來避免裝箱和拆箱操做。

  5.分支/合併框架讓你得以用遞歸方式將能夠並行的任務拆分紅功效的任務,在不一樣的線程上執行,而後將各個子任務的結果合併起來生成總體結果。

  6.Spliterator定義了並行流如何拆分它要遍歷的數據。

相關文章
相關標籤/搜索