擁抱並行流,提升程序執行速度

前言

在 Java7 以前,若是想要並行處理一個集合,咱們須要如下幾步 1. 手動分紅幾部分 2. 爲每部分建立線程 3. 在適當的時候合併。而且還須要關注多個線程之間共享變量的修改問題。而 Java8 爲咱們提供了並行流,能夠一鍵開啓並行模式。是否是很酷呢?讓咱們來看看吧前端

聲明:本文首發於博客園,做者:後青春期的Keats;地址:https://www.cnblogs.com/keatsCoder/ 轉載請註明,謝謝!java

並行流

認識和開啓並行流

什麼是並行流:並行流就是將一個流的內容分紅多個數據塊,並用不一樣的線程分別處理每一個不一樣數據塊的流。例若有這麼一個需求:算法

有一個 List 集合,而 list 中每一個 apple 對象只有重量,咱們也知道 apple 的單價是 5元/kg,如今須要計算出每一個 apple 的單價,傳統的方式是這樣: 數據庫

List<Apple> appleList = new ArrayList<>(); // 僞裝數據是從庫裏查出來的

for (Apple apple : appleList) {
    apple.setPrice(5.0 * apple.getWeight() / 1000);
}

咱們經過迭代器遍歷 list 中的 apple 對象,完成了每一個 apple 價格的計算。而這個算法的時間複雜度是 O(list.size()) 隨着 list 大小的增長,耗時也會跟着線性增長。並行流網絡

能夠大大縮短這個時間。並行流處理該集合的方法以下:多線程

appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));

和普通流的區別是這裏調用的 parallelStream() 方法。固然也能夠經過 stream.parallel() 將普通流轉換成並行流。並行流也能經過 sequential() 方法轉換爲順序流,但要注意:流的並行和順序轉換不會對流自己作任何實際的變化,僅僅是打了個標記而已。而且在一條流水線上對流進行屢次並行 / 順序的轉換,生效的是最後一次的方法調用app

並行流如此方便,它的線程從那裏來呢?有多少個?怎麼配置呢?框架

並行流內部使用了默認的 ForkJoinPool 線程池。默認的線程數量就是處理器的核心數,而配置系統核心屬性: java.util.concurrent.ForkJoinPool.common.parallelism 能夠改變線程池大小。不過該值是全局變量。改變他會影響全部並行流。目前還沒法爲每一個流配置專屬的線程數。通常來講採用處理器核心數是不錯的選擇ide

測試並行流的性能

爲了更容易的測試性能,咱們在每次計算完蘋果價格後,讓線程睡 1s,表示在這期間執行了其餘 IO 相關的操做,並輸出程序執行耗時,順序執行的耗時:函數

public static void main(String[] args) throws InterruptedException {
    List<Apple> appleList = initAppleList();

    Date begin = new Date();
    for (Apple apple : appleList) {
        apple.setPrice(5.0 * apple.getWeight() / 1000);
        Thread.sleep(1000);
    }
    Date end = new Date();
    log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
}

Snipaste_2020-05-21_21-49-44

並行版本

List<Apple> appleList = initAppleList();

Date begin = new Date();
appleList.parallelStream().forEach(apple ->
                                   {
                                       apple.setPrice(5.0 * apple.getWeight() / 1000);
                                       try {
                                           Thread.sleep(1000);
                                       } catch (InterruptedException e) {
                                           e.printStackTrace();
                                       }
                                   }
                                  );
Date end = new Date();
log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);

耗時狀況

Snipaste_2020-05-21_22-16-08

跟咱們的預測一致,個人電腦是 四核I5 處理器,開啓並行後四個處理器每人執行一個線程,最後 1s 完成了任務!

並行流能夠隨便用嗎?

可拆分性影響流的速度

經過上面的測試,有的人會輕易獲得一個結論:並行流很快,咱們能夠徹底放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內部迭代來實現了。事實真的是這樣嗎?並行流真的如此完美嗎?答案固然是否認的。你們能夠複製下面的代碼,在本身的電腦上測試。測試完後能夠發現,並行流並不老是最快的處理方式。

  1. 對於 iterate 方法來處理的前 n 個數字來講,無論並行與否,它老是慢於循環的,非並行版本能夠理解爲流化操做沒有循環更偏向底層致使的慢。可並行版本是爲何慢呢?這裏有兩個須要注意的點:

    1. iterate 生成的是裝箱的對象,必須拆箱成數字才能求和

    2. 咱們很難把 iterate 分紅多個獨立的塊來並行執行

      這個問題頗有意思,咱們必須意識到某些流操做比其餘操做更容易並行化。對於 iterate 來講,每次應用這個函數都要依賴於前一次應用的結果。所以在這種狀況下,咱們不只不能有效的將流劃分紅小塊處理。反而還由於並行化再次增長了開支。

  2. 而對於 LongStream.rangeClosed() 方法來講,就不存在 iterate 的第兩個痛點了。它生成的是基本類型的值,不用拆裝箱操做,另外它能夠直接將要生成的數字 1 - n 拆分紅 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。所以並行狀態下的 rangeClosed() 是快於 for 循環外部迭代的

package lambdasinaction.chap7;

import java.util.stream.*;

public class ParallelStreams {

    public static long iterativeSum(long n) {
        long result = 0;
        for (long i = 0; i <= n; i++) {
            result += i;
        }
        return result;
    }

    public static long sequentialSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
    }

    public static long parallelSum(long n) {
        return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
    }

    public static long rangedSum(long n) {
        return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
    }

    public static long parallelRangedSum(long n) {
        return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
    }

}
package lambdasinaction.chap7;

import java.util.concurrent.*;
import java.util.function.*;

public class ParallelStreamsHarness {

    public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();

    public static void main(String[] args) {
        System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
        System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
        System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
        System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
        System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
    }

    public static <T, R> long measurePerf(Function<T, R> f, T input) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            R result = f.apply(input);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + result);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }
}

共享變量修改的問題

並行流雖然輕易的實現了多線程,可是仍未解決多線程中共享變量的修改問題。下面代碼中存在共享變量 total,分別使用順序流和並行流計算前n個天然數的和

public static long sideEffectSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).forEach(accumulator::add);
    return accumulator.total;
}

public static long sideEffectParallelSum(long n) {
    Accumulator accumulator = new Accumulator();
    LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
    return accumulator.total;
}

public static class Accumulator {
    private long total = 0;

    public void add(long value) {
        total += value;
    }
}

順序執行每次輸出的結果都是:50000005000000,而並行執行的結果卻五花八門了。這是由於每次訪問 totle 都會存在數據競爭,關於數據競爭的緣由,你們能夠看看關於 volatile 的博客。所以當代碼中存在修改共享變量的操做時,是不建議使用並行流的。

並行流的使用注意

在並行流的使用上有下面幾點須要注意:

  • 儘可能使用 LongStream / IntStream / DoubleStream 等原始數據流代替 Stream 來處理數字,以免頻繁拆裝箱帶來的額外開銷

  • 要考慮流的操做流水線的總計算成本,假設 N 是要操做的任務總數,Q 是每次操做的時間。N * Q 就是操做的總時間,Q 值越大就意味着使用並行流帶來收益的可能性越大

    例如:前端傳來幾種類型的資源,須要存儲到數據庫。每種資源對應不一樣的表。咱們能夠視做類型數爲 N,存儲數據庫的網絡耗時 + 插入操做耗時爲 Q。通常狀況下網絡耗時都是比較大的。所以該操做就比較適合並行處理。固然當類型數目大於核心數時,該操做的性能提高就會打必定的折扣了。更好的優化方法在往後的博客會爲你們奉上

  • 對於較少的數據量,不建議使用並行流

  • 容易拆分紅塊的流數據,建議使用並行流

如下是一些常見的集合框架對應流的可拆分性能表

可拆分性
ArrayList 極佳
LinkedList
IntStream.range 極佳
Stream.iterate
HashSet
TreeSet

碼字不易,若是你以爲讀完之後有收穫,不妨點個推薦讓更多的人看到吧!

相關文章
相關標籤/搜索