Java8 parallelStream淺析

JAVA8中引入了lamda表達式和Stream接口。其豐富的API及強大的表達能力極大的簡化代碼,提高了效率,同時還經過parallelStream提供併發操做的支持,本文探討parallelStream方法的使用。html

首先看下java doc中對parallelStream的定義。java

A sequence of elements supporting sequential and parallel aggregate operations.
 ...
Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream. 
Streams are created with an initial choice of sequential or parallel execution. (For example, Collection.stream()
creates a sequential stream, and Collection.parallelStream() creates a parallel one.)
This choice of execution mode may be modified by the BaseStream.sequential() or BaseStream.parallel() methods,
and may be queried with the BaseStream.isParallel() method.

既然能夠並行的執行,廢話很少說,先看一個例子。api

class Person {
        int    id;
        String name;
        String sex;
        float  height;

        public Person(int id, String name, String sex, float height) {
            this.id = id;
            this.name = name;
            this.sex = sex;
            this.height = height;
        }
}

    /**
     * 構造數據
     * 
     * @return
     */
    public List<Person> constructPersons() {

        List<Person> persons = new ArrayList<Person>();
        for (int i = 0; i < 5; i++) {
            Person p = new Person(i, "name" + i, "sex" + i, i);
            persons.add(p);
        }
        return persons;
    }

    /**
     * for
     * 
     * @param persons
     */
    public void doFor(List<Person> persons) {
        long start = System.currentTimeMillis();

        for (Person p : persons) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(p.name);
        }

        long end = System.currentTimeMillis();
        System.out.println("doFor cost:" + (end - start));
    }

    /**
     * 順序流
     * 
     * @param persons
     */
    public void doStream(List<Person> persons) {
        long start = System.currentTimeMillis();

        persons.stream().forEach(x -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(x.name);
        });

        long end = System.currentTimeMillis();
        System.out.println("doStream cost:" + (end - start));
    }

    /**
     * 並行流
     * 
     * @param persons
     */
    public void doParallelStream(List<Person> persons) {

        long start = System.currentTimeMillis();

        persons.parallelStream().forEach(x -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(x.name);
        });

        long end = System.currentTimeMillis();

        System.out.println("doParallelStream cost:" + (end - start));
    }

執行結果:數組

name0
name1
name2
name3
name4
doFor cost:5021
name0
name1
name2
name3
name4
doStream cost:5076
name4
name0
name2
name3
name1
doParallelStream cost:1010

代碼上 stream 和 parallelStream 語法差別較小,從執行結果來看,stream順序輸出,而parallelStream 無序輸出;parallelStream 執行耗時是 stream 的五分之一。
能夠看到在當前測試場景下,parallelStream 得到的相對較好的執行性能,那parallelStream背後究竟是什麼呢?
要深刻了解parallelStream,首先要弄明白ForkJoin框架和ForkJoinPool。ForkJoin框架是java7中提供的並行執行框架,他的策略是分而治之。說白了,就是把一個大的任務切分紅不少小的子任務,子任務執行完畢後,再把結果合併起來。安全

順便說下ForkJoin框架和ThreadPoolExecutor的區別,ForkJoin框架可使用數量有限的線程數,執行大量任務,而且這些任務之間是有父子依賴的,必須是子任務執行完成後,父任務才能執行。ThreadPoolExecutor 顯然是沒法支持這種場景的。而ForkJoin框架,可讓其中的線程建立新的任務,並掛起當前的任務,任務以及子任務會保留在一個內部隊列中,此時線程就可以從隊列中選擇任務順序執行。

Java 8爲ForkJoinPool添加了一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態元素,它擁有的默認線程數量等於運行計算機上的處理器數量。當調用Arrays類上添加的新方法時,自動並行化就會發生。好比用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新添加的Stream API中。

上面的代碼中,forEach方法會爲每一個元素的操做建立一個任務,該任務會被前文中提到的ForkJoinPool中的通用線程池處理。以上的並行計算邏輯固然也可使用ThreadPoolExecutor完成,可是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌。

默認線程池的數量就是處理器的數量,特殊場景下可使用系統屬性:-Djava.util.concurrent.ForkJoinPool.common.parallelism={N} 調整。

對上面例子作下調整,sleep時間變爲2ms,併發

Thread.sleep(2);

執行結果以下:oracle

doFor cost:12
=======================
doParallelStream cost:62
=======================
doStream cost:13

doParallelStream耗時最多,可見並非並行執行就是性能最好的,要根據具體的應用場景測試分析。這個例子中,每一個子任務執行時間較短,而線程切換消耗了大量時間。
說到了併發,不得不提線程安全。先看一個例子:框架

public void doThreadUnSafe() {
        List<Integer> listFor = new ArrayList<>();
        List<Integer> listParallel = new ArrayList<>();

        IntStream.range(0, 1000).forEach(listFor::add);
        IntStream.range(0, 1000).parallel().forEach(listParallel::add);

        System.out.println("listFor size :" + listFor.size());
        System.out.println("listParallel size :" + listParallel.size());
    }

輸出結果:性能

listFor size :1000
listParallel size :949

顯而易見,stream.parallel.forEach()中執行的操做並不是線程安全。若是須要線程安全,能夠把集合轉換爲同步集合,即:Collections.synchronizedList(new ArrayList<>())。

總結下來以下:測試

  1. 使用parallelStream能夠簡潔高效的寫出併發代碼。
  2. parallelStream並行執行是無序的。
  3. parallelStream提供了更簡單的併發執行的實現,但並不意味着更高的性能,它是使用要根據具體的應用場景。若是cpu資源緊張parallelStream不會帶來性能提高;若是存在頻繁的線程切換反而會下降性能。
  4. 任務之間最好是狀態無關的,由於parallelStream默認是非線程安全的,可能帶來結果的不肯定性。


參考:


 

摘自:https://zhuanlan.zhihu.com/p/43039062

相關文章
相關標籤/搜索