並行計算涉及將問題劃分爲子問題,同時解決這些問題(並行地,每一個子問題運行在一個單獨的線程中),而後將子問題的解決結果組合起來。Java SE提供了fork/join框架,它使你可以更容易地在應用程序中實現並行計算,可是,使用這個框架,你必須指定問題如何被細分(分區),使用聚合操做,Java運行時將爲你執行這種分區和組合解決方案。html
在使用集合的應用程序中實現並行性的一個困難是集合不是線程安全的,這意味着多個線程不能在不引入線程干擾或內存一致性錯誤的狀況下操做集合,Collections
框架提供同步包裝器,能夠將自動同步添加到任意集合,使其線程安全。可是,同步會引入線程爭用,你但願避免線程爭用,由於這會阻止線程並行運行。聚合操做和並行流使你可以使用非線程安全的集合實現並行性,前提是在操做集合時不修改集合。java
請注意,並行性並不會自動比串行執行操做快,儘管若是你有足夠的數據和處理器內核,並行性能夠更快。雖然聚合操做使你可以更容易地實現並行性,可是肯定應用程序是否適合並行性仍然是你的職責。git
你能夠在示例ParallelismExamples中找到本節中描述的代碼摘錄。github
你能夠串行或並行執行流,當流並行執行時,Java運行時將流劃分爲多個子流,聚合操做迭代並並行處理這些子流,而後組合結果。算法
當你建立一個流時,它老是一個串行流,除非另有指定,要建立並行流,請調用操做Collection.parallelStream,或者,調用操做BaseStream.parallel,例如,下面的語句並行計算全部男性成員的平均年齡:express
double average = roster .parallelStream() .filter(p -> p.getGender() == Person.Sex.MALE) .mapToInt(Person::getAge) .average() .getAsDouble();
再次考慮如下按性別對成員進行分組的示例(在小節概括部分中進行了描述),這個例子調用了collect
操做,它將集合roster
概括爲Map
:segmentfault
Map<Person.Sex, List<Person>> byGender = roster .stream() .collect( Collectors.groupingBy(Person::getGender));
下面是等價的並行操做:api
ConcurrentMap<Person.Sex, List<Person>> byGender = roster .parallelStream() .collect( Collectors.groupingByConcurrent(Person::getGender));
這稱爲併發概括,若是如下全部條件對包含collect
操做的特定管道都成立,Java運行時將執行併發概括:安全
collect
操做的參數collector
具備Collector.Characteristics.CONCURRENT特徵,要肯定collector
的特徵,請調用Collector.characteristics方法。collector
具備Collector.Characteristics.UNORDERED特性,要確保流是無序的,請調用BaseStream.unordered操做。注意:這個示例返回ConcurrentMap而不是Map
的實例,並調用groupingByConcurrent操做而不是groupingBy
,與groupingByConcurrent
操做不一樣,並行流的groupingBy
操做執行得不好(這是由於它經過按鍵合併兩個映射來操做,這在計算上很是昂貴),相似地,Collectors.toConcurrentMap操做在並行流中比Collectors.toMap操做執行得更好。多線程
管道處理流元素的順序取決於流是串行執行仍是並行執行、流的源和中間操做,例如,考慮下面的例子,它使用forEach
操做屢次打印ArrayList
實例的元素:
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 }; List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray)); System.out.println("listOfIntegers:"); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("listOfIntegers sorted in reverse order:"); Comparator<Integer> normal = Integer::compare; Comparator<Integer> reversed = normal.reversed(); Collections.sort(listOfIntegers, reversed); listOfIntegers .stream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Parallel stream"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Another parallel stream:"); listOfIntegers .parallelStream() .forEach(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("With forEachOrdered:"); listOfIntegers .parallelStream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
這個例子由五個管道組成,打印輸出以下:
listOfIntegers: 1 2 3 4 5 6 7 8 listOfIntegers sorted in reverse order: 8 7 6 5 4 3 2 1 Parallel stream: 3 4 1 6 2 5 7 8 Another parallel stream: 6 3 1 5 7 8 4 2 With forEachOrdered: 8 7 6 5 4 3 2 1
這個示例執行如下操做:
listOfIntegers
的元素。listOfIntegers
的元素。forEachOrdered
的並行流操做,可能會失去並行性的好處。方法或表達式除了返回或生成值外,若是還修改計算機的狀態,則會產生反作用。例如可變概括,以及調用System.out.println
方法用於調試。JDK能夠很好地處理管道中的某些反作用,特別地,collect
方法被設計用於以並行安全的方式執行最多見的流操做,這些操做具備反作用,像forEach
和peek
這樣的操做是爲反作用而設計的,返回void
的lambda表達式,例如調用System.out.println
的表達式,除了反作用什麼都作不了。即使如此,你也應該當心使用forEach
和peek
操做,若是你將這些操做中的一個操做與並行流一塊兒使用,那麼Java運行時可能會從多個線程併發地調用你指定爲其參數的lambda表達式。此外,永遠不要做爲參數傳遞lambda表達式,這些表達式在filter
和map
等操做中有反作用,下面幾節討論干擾和有狀態lambda表達式,它們均可能是反作用的來源,而且可能返回不一致或不可預測的結果,特別是在並行流中。然而,惰性的概念首先被討論,由於它對干擾有直接的影響。
全部中間操做都是惰性的,若是表達式、方法或算法的值只在須要時計算,那麼它就是惰性的(若是一個算法被當即計算或處理,那麼它就是當即的),中間操做是惰性的,由於它們直到終端操做開始時纔開始處理流的內容。延遲處理流使Java編譯器和運行時可以優化它們處理流的方式,例如,在管道中,如聚合操做一節中描述的filter-mapToInt-average
示例,average
操做能夠從mapToInt
操做建立的流中獲取幾個整數,mapToInt
操做從filter
操做中獲取元素。average
操做將重複這個過程,直到從流中得到全部須要的元素,而後計算平均值。
流操做中的Lambda表達式不該該干涉,當管道處理流時修改流的源時發生干擾,例如,下面的代碼嘗試鏈接列表listofstring
中包含的字符串,可是,它拋出一個ConcurrentModificationException
:
try { List<String> listOfStrings = new ArrayList<>(Arrays.asList("one", "two")); // This will fail as the peek operation will attempt to add the // string "three" to the source after the terminal operation has // commenced. String concatenatedString = listOfStrings .stream() // Don't do this! Interference occurs here. .peek(s -> listOfStrings.add("three")) .reduce((a, b) -> a + " " + b) .get(); System.out.println("Concatenated string: " + concatenatedString); } catch (Exception e) { System.out.println("Exception caught: " + e.toString()); }
這個示例使用reduce
操做將listofstring
中包含的字符串鏈接到一個可選的String
值中,reduce
操做是一個終端操做,可是,這裏的管道調用中間操做peek
,該操做試圖向listofstring
添加一個新元素。記住,全部中間操做都是惰性的,這意味着本例中的管道在調用操做get
時開始執行,在執行get
操做完成時結束執行,peek
操做的參數試圖在管道執行期間修改流源,這將致使Java運行時拋出ConcurrentModificationException
。
避免在流操做中使用有狀態lambda表達式做爲參數,有狀態lambda表達式的結果取決於在管道執行期間可能發生變化的任何狀態,下面的示例使用map
中間操做將列表listOfIntegers
中的元素添加到一個新的List
實例中,它這樣作了兩次,第一次是串行流,而後是並行流:
List<Integer> serialStorage = new ArrayList<>(); System.out.println("Serial stream:"); listOfIntegers .stream() // Don't do this! It uses a stateful lambda expression. .map(e -> { serialStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); serialStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); System.out.println("Parallel stream:"); List<Integer> parallelStorage = Collections.synchronizedList( new ArrayList<>()); listOfIntegers .parallelStream() // Don't do this! It uses a stateful lambda expression. .map(e -> { parallelStorage.add(e); return e; }) .forEachOrdered(e -> System.out.print(e + " ")); System.out.println(""); parallelStorage .stream() .forEachOrdered(e -> System.out.print(e + " ")); System.out.println("");
lambda表達式e -> { parallelStorage.add(e); return e; }
是一個有狀態的lambda表達式,每次運行代碼時,它的結果均可能不一樣,這個例子打印瞭如下內容:
Serial stream: 8 7 6 5 4 3 2 1 8 7 6 5 4 3 2 1 Parallel stream: 8 7 6 5 4 3 2 1 1 3 6 2 4 5 8 7
forEachOrdered
操做按照流指定的順序處理元素,不管流是串行執行仍是並行執行,可是,當並行執行流時,map
操做處理Java運行時和編譯器指定的流的元素,所以,lambda表達式e -> { parallelStorage.add(e); return e; }
每次運行代碼時,向列表中添加元素的parallelStorage
都會發生變化,對於肯定性和可預測的結果,確保流操做中的lambda表達式參數不是有狀態的。
注意:這個例子調用了synchronizedList方法,所以列表parallelStorage
是線程安全的,記住集合不是線程安全的,這意味着多個線程不該該同時訪問特定的集合,假設你在建立parallelStorage
時沒有調用synchronizedList
方法:
List<Integer> parallelStorage = new ArrayList<>();
這個例子的行爲是不規律的,由於多線程訪問和修改parallelStorage
時,沒有同步之類的機制來調度特定線程什麼時候能夠訪問List
實例,所以,該示例能夠打印以下輸出:
Parallel stream: 8 7 6 5 4 3 2 1 null 3 5 4 7 8 1 2