如何高效的使用並行流

image

在Java7以前想要並行處理大量數據是很困難的,首先把數據拆分紅不少個部分,而後把這這些子部分放入到每一個線程中去執行計算邏輯,最後在把每一個線程返回的計算結果進行合併操做;在Java7中提供了一個處理大數據的fork/join框架,屏蔽掉了線程之間交互的處理,更加專一於數據的處理。java


Fork/Join框架

Fork/Join框架採用的是思想就是分而治之,把大的任務拆分紅小的任務,而後放入到獨立的線程中去計算,同時爲了最大限度的利用多核CPU,採用了一個種工做竊取的算法來運行任務,也就是說當某個線程處理完本身工做隊列中的任務後,嘗試當其餘線程的工做隊列中竊取一個任務來執行,直到全部任務處理完畢。因此爲了減小線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行;在百度找了一張圖算法

image

  • 使用RecursiveTask

使用Fork/Join框架首先須要建立本身的任務,須要繼承RecursiveTask,實現抽象方法segmentfault

protected abstract V compute();

實現類須要在該方法中實現任務的拆分,計算,合併;僞代碼能夠表示成這樣:安全

if(任務已經不可拆分){
    return 順序計算結果;
} else {
    1.任務拆分紅兩個子任務
    2.遞歸調用本方法,拆分子任務
    3.等待子任務執行完成
    4.合併子任務的結果
}
  • Fork/Join實戰

任務:完成對一億個天然數求和網絡

咱們先使用串行的方式實現,代碼以下:框架

long result = LongStream.rangeClosed(1, 100000000)
                .reduce(0, Long::sum);
System.out.println("result:" + result);

使用Fork/Join框架實現,代碼以下:ide

public class SumRecursiveTask extends RecursiveTask<Long> {
    private long[] numbers;
    private int start;
    private int end;

    public SumRecursiveTask(long[] numbers) {
        this.numbers = numbers;
        this.start = 0;
        this.end = numbers.length;
    }

    public SumRecursiveTask(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length < 20000) {  //小於20000個就不在進行拆分
            return sum();
        }
        SumRecursiveTask leftTask = new SumRecursiveTask(numbers, start, start + length / 2); //進行任務拆分
        SumRecursiveTask rightTask = new SumRecursiveTask(numbers, start + (length / 2), end); //進行任務拆分
        leftTask.fork(); //把該子任務交友ForkJoinPoll線程池去執行
        rightTask.fork(); //把該子任務交友ForkJoinPoll線程池去執行
        return leftTask.join() + rightTask.join(); //把子任務的結果相加
    }


    private long sum() {
        int sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }


    public static void main(String[] args) {
        long[] numbers = LongStream.rangeClosed(1, 100000000).toArray();

        Long result = new ForkJoinPool().invoke(new SumRecursiveTask(numbers));
        System.out.println("result:" +result);
    }
}
Fork/Join默認的線程數量就是你的處理器數量,這個值是由 Runtime.getRuntime().available- Processors()獲得的。 可是你能夠經過系統屬性 java.util.concurrent.ForkJoinPool.common. parallelism來改變線程池大小,以下所示: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 這是一個全局設置,所以它將影響代碼中全部的並行流。目前還沒法專爲某個 並行流指定這個值。由於會影響到全部的並行流,因此在任務中經歷避免網絡/IO操做,不然可能會拖慢其餘並行流的運行速度

parallelStream

以上咱們說到的都是在Java7中使用並行流的操做,Java8並無止步於此,爲咱們提供更加便利的方式,那就是parallelStreamparallelStream底層仍是經過Fork/Join框架來實現的。測試

  • 常見的使用方式

1.串行流轉化成並行流大數據

LongStream.rangeClosed(1,1000)
                .parallel()
                .forEach(System.out::println);

2.直接生成並行流this

List<Integer> values = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            values.add(i);
        }
        values.parallelStream()
                .forEach(System.out::println);
  • 正確的使用parallelStream

咱們使用parallelStream來實現上面的累加例子看看效果,代碼以下:

public static void main(String[] args) {
    Summer summer = new Summer();
    LongStream.rangeClosed(1, 100000000)
            .parallel()
            .forEach(summer::add);
    System.out.println("result:" + summer.sum);

}

static class Summer {
    public long sum = 0;

    public void add(long value) {
        sum += value;
    }
}

運行結果以下:

result

運行以後,咱們發現運行的結果不正確,而且每次運行的結果都不同,這是爲何呢?
這裏其實就是錯用parallelStream常見的狀況,parallelStream是非線程安全的,在這個裏面中使用多個線程去修改了共享變量sum, 執行了sum += value操做,這個操做自己是非原子性的,因此在使用並行流時應該避免去修改共享變量。

修改上面的例子,正確使用parallelStream來實現,代碼以下:

long result = LongStream.rangeClosed(1, 100000000)
        .parallel()
        .reduce(0, Long::sum);
System.out.println("result:" + result);

在前面咱們已經說過了fork/join的操做流程是:拆子部分,計算,合併結果;由於parallelStream底層使用的也是fork/join框架,因此這些步驟也是須要作的,可是從上面的代碼,咱們看到Long::sum作了計算,reduce作了合併結果,咱們並無去作任務的拆分,因此這個過程確定是parallelStream已經幫咱們實現了,這個時候就必須的說說Spliterator

Spliterator是Java8加入的新接口,是爲了並行執行任務而設計的。

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();
}

tryAdvance: 遍歷全部的元素,若是還有能夠遍歷的就返回ture,不然返回false

trySplit: 對全部的元素進行拆分紅小的子部分,若是已經不能拆分就返回null

estimateSize: 當前拆分裏面還剩餘多少個元素

characteristics: 返回當前Spliterator特性集的編碼


總結

  1. 要證實並行處理比順序處理效率高,只能經過測試,不能靠猜想(本文累加的例子在多臺電腦上運行了屢次,也並不能證實採用並行來處理累加就必定比串行的快多少,因此只能經過多測試,環境不一樣可能結果就會不一樣)
  2. 數據量較少,而且計算邏輯簡單,一般不建議使用並行流
  3. 須要考慮流的操做時間消耗
  4. 在有些狀況下須要本身去實現拆分的邏輯,並行流才能高效

感謝你們能夠耐心地讀到這裏。 固然,文中或許會存在或多或少的不足、錯誤之處,有建議或者意見也很是歡迎你們在評論交流。 最後,但願朋友們能夠點贊評論關注三連,由於這些就是我分享的所有動力來源🙏
相關文章
相關標籤/搜索