流能夠並行執行,以增長大量輸入元素的運行時性能。並行流ForkJoinPool經過靜態ForkJoinPool.commonPool()方法使用公共可用的流。底層線程池的大小最多使用五個線程 - 具體取決於可用物理CPU核心的數量:html
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3
在個人機器上,公共池初始化爲默認值爲3的並行度。經過設置如下JVM參數能夠減少或增長此值:java
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持建立並行元素流的方法parallelStream()。或者,您能夠在給定流上調用中間方法parallel(),以將順序流轉換爲並行流。api
爲了評估並行流的並行執行行爲,下一個示例將有關當前線程的信息打印出來:數組
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
經過調查調試輸出,咱們應該更好地理解哪些線程實際用於執行流操做:oracle
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2]
如您所見,並行流利用公共中的全部可用線程ForkJoinPool來執行流操做。輸出在連續運行中可能不一樣,由於實際使用的特定線程的行爲是非肯定性的。函數
讓咱們經過一個額外的流操做來擴展該示例:性能
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s]\n", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s]\n", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s]\n", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s]\n", s, Thread.currentThread().getName()));
結果可能最初看起來很奇怪:線程
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1]
彷佛sort只在主線程上順序執行。實際上,sort在並行流上使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,若是排序將按順序或並行執行,則此方法決定數組的長度:調試
若是指定數組的長度小於最小粒度,則使用適當的Arrays.sort方法對其進行排序。
回到reduce一節的例子。咱們已經發現組合器函數只是並行調用,而不是順序流調用。讓咱們看看實際涉及哪些線程:code
List<Person> persons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s]\n", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s]\n", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });
控制檯輸出顯示累加器和組合器函數在全部可用線程上並行執行:
accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
總之,並行流能夠爲具備大量輸入元素的流帶來良好的性能提高。但請記住,某些並行流操做reduce
,collect
須要額外的計算(組合操做),這在順序執行時是不須要的。
此外,咱們瞭解到全部並行流操做共享相同的JVM範圍ForkJoinPool。所以,您可能但願避免實施慢速阻塞流操做,由於這可能會減慢嚴重依賴並行流的應用程序的其餘部分。