對順序流調用parallel方法:java
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
它在內部實際上就是設了一個boolean標誌,表示你想讓調用parallel以後進行的全部操做都並行執行。相似地,你只須要對並行流調用sequential方法就能夠把它變成順序流。但最後一次parallel或sequential調用會影響整個流水線。算法
iterate很難分割成可以獨立執行的小塊,由於每次應用這個函數都要依賴前一次應用的結果,整張數字列表在概括過程開始時沒有準備好,於是沒法有效地把流劃分爲小塊來並行處理。把流標記成並行,你實際上是給順序處理增長了開銷,它還要把每次求和操做分到一個不一樣的線程上。segmentfault
錯用並行流而產生錯誤的首要緣由,就是使用的算法改變了某些共享狀態。數據結構
public class Accumulator { public long total = 0; public void add(long value) { total += value; } } public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; }
上面的示例在本質上就是順序的,每次訪問total都會出現數據競爭.因爲多個線程在同時訪問累加器,執行total += value,而這一句雖然看似簡單,卻不是一個原子操做。所得的結果也是不可控的(錯誤的)。
詳見第六章相關內容
注意:不該該在RecursiveTask內部使用ForkJoinPool的invoke方法。相反,你應該始終直接調用compute或fork方法,只有順序代碼才應該用invoke來啓動並行計算。框架
Spliterator是Java 8中加入的另外一個新接口;這個名字表明「可分迭代器」(splitable iterator)。和Iterator同樣,Spliterator也用於遍歷數據源中的元素,但它是爲了並行執行而設計的。
Spliterator接口ide
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
與往常同樣,T是Spliterator遍歷的元素的類型。tryAdvance方法的行爲相似於普通的Iterator,由於它會按順序一個一個使用Spliterator中的元素,而且若是還有其餘元素要遍歷就返回true。但trySplit是專爲Spliterator接口設計的,由於它能夠把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個並行處理。Spliterator還可經過estimateSize方法估計還剩下多少元素要遍歷,由於即便不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點.函數
將Stream拆分紅多個部分的算法是一個遞歸過程,如圖所示。第一步是對第一個Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用trysplit,這樣總共就有了四個Spliterator。這個框架不斷對Spliterator調用trySplit直到它返回null,代表它處理的數據結構不能再分割,如第三步所示。最後,這個遞歸拆分過程到第四步就終止了,這時全部的Spliterator在調用trySplit時都返回了null。
性能
文中提到了reduce的三參數重載方法this
<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> combiner)它的三個參數:spa
- identity: 一個初始化的值;這個初始化的值其類型是泛型U,與Reduce方法返回的類型一致;注意此時Stream中元素的類型是T,與U能夠不同也能夠同樣,這樣的話操做空間就大了;無論Stream中存儲的元素是什麼類型,U均可以是任何類型,如U能夠是一些基本數據類型的包裝類型Integer、Long等;或者是String,又或者是一些集合類型ArrayList等;後面會說到這些用法。
- accumulator: 其類型是BiFunction,輸入是U與T兩個類型的數據,而返回的是U類型;也就是說返回的類型與輸入的第一個參數類型是同樣的,而輸入的第二個參數類型與Stream中元素類型是同樣的。
- combiner: 其類型是BinaryOperator,支持的是對U類型的對象進行操做;
第三個參數combiner主要是使用在並行計算的場景下;若是Stream是非並行時,第三個參數其實是不生效的。
代碼實現:
class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<?super Character> action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = (currentSize / 2) + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring( currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " ché la dritta via era smarrita "; private int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); } Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");
最後打印顯示
Found 19 words