在前面三章中,咱們已經看到了新的 Stream 接口可讓你以聲明性方式處理數據集。咱們還解釋了將外部迭代換爲內部迭代可以讓原生Java庫控制流元素的處理。這種方法讓Java程序員無需顯式實現優化來爲數據集的處理加速。到目前爲止,最重要的好處是能夠對這些集合執行操做流水線,可以自動利用計算機上的多個內核。java
例如,在Java 7以前,並行處理數據集合很是麻煩。第一,你得明確地把包含數據的數據結構分紅若干子部分。第二,你要給每一個子部分分配一個獨立的線程。第三,你須要在恰當的時候對它們進行同步來避免不但願出現的競爭條件,等待全部線程完成,最後把這些部分結果合併起來。Java 7引入了一個叫做分支/合併的框架,讓這些操做更穩定、更不易出錯。git
在本章中,咱們將瞭解 Stream 接口如何讓你不用太費力氣就能對數據集執行並行操做。它容許你聲明性地將順序流變爲並行流。此外,你將看到Java是如何變戲法的,或者更實際地來講,流是如何在幕後應用Java 7引入的分支/合併框架的。你還會發現,瞭解並行流內部是如何工做的很重要,由於若是你忽視這一方面,就可能因誤用而獲得意外的(極可能是錯的)結果。程序員
咱們會特別演示,在並行處理數據塊以前,並行流被劃分爲數據塊的方式在某些狀況下偏偏是這些錯誤且沒法解釋的結果的根源。所以,咱們將會學習如何經過實現和使用你本身的Spliterator 來控制這個劃分過程。github
在第4章的筆記中,咱們簡要地瞭解到了 Stream 接口可讓你很是方便地處理它的元素:能夠經過對收集源調用 parallelStream 方法來把集合轉換爲並行流。並行流就是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每一個數據塊的流。這樣一來,你就能夠自動把給定操做的工做負荷分配給多核處理器的全部內核,讓它們都忙起來。讓咱們用一個簡單的例子來試驗一下這個思想。算法
假設你須要寫一個方法,接受數字n做爲參數,並返回從1到給定參數的全部數字的和。一個直接(也許有點土)的方法是生成一個無窮大的數字流,把它限制到給定的數目,而後用對兩個數字求和的 BinaryOperator 來歸約這個流,以下所示:編程
public static long sequentialSum(long n) { // 生成天然數無限流 return Stream.iterate(1L, i -> i + 1) // 限制到前n個數 .limit(n) // 對全部數字求和來概括流 .reduce(0L, Long::sum); }
用更爲傳統的Java術語來講,這段代碼與下面的迭代等價:數組
public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; }
這彷佛是利用並行處理的好機會,特別是n很大的時候。那怎麼入手呢?你要對結果變量進行同步嗎?用多少個線程呢?誰負責生成數呢?誰來作加法呢?根本用不着擔憂啦。用並行流的話,這問題就簡單多了!數據結構
咱們能夠把流轉換成並行流,從而讓前面的函數歸約過程(也就是求和)並行運行——對順序流調用 parallel 方法:架構
public static long parallelSum(long n) { // 生成天然數無限流 return Stream.iterate(1L, i -> i + 1) // 限制到前n個數 .limit(n) // 將流轉爲並行流 .parallel() // 對全部數字求和來概括流 .reduce(0L, Long::sum); }
並行流的執行過程:app
請注意,在現實中,對順序流調用 parallel 方法並不意味着流自己有任何實際的變化。它在內部實際上就是設了一個 boolean 標誌,表示你想讓調用 parallel 以後進行的全部操做都並行執行。相似地,你只須要對並行流調用 sequential 方法就能夠把它變成順序流。請注意,你可能覺得把這兩個方法結合起來,就能夠更細化地控制在遍歷流時哪些操做要並行執行,哪些要順序執行。例如,你能夠這樣作:
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
但最後一次 parallel 或 sequential 調用會影響整個流水線。在本例中,流水線會並行執行,由於最後調用的是它。
回到咱們的數字求和練習,咱們說過,在多核處理器上運行並行版本時,會有顯著的性能提高。如今你有三個方法,用三種不一樣的方式(迭代式、順序概括和並行概括)作徹底相同的操做,讓咱們看看誰最快吧!
咱們聲稱並行求和方法應該比順序和迭代方法性能好。然而在軟件工程上,靠猜絕對不是什麼好辦法!特別是在優化性能時,你應該始終遵循三個黃金規則:測量,測量,再測量。
public static long measurePerf(Function<Long, Long> adder, long n) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); long sum = adder.apply(n); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + sum); if (duration < fastest) { fastest = duration; } } return fastest; }
這個方法接受一個函數和一個 long 做爲參數。它會對傳給方法的 long 應用函數10次,記錄每次執行的時間(以毫秒爲單位),並返回最短的一次執行時間。假設你把先前開發的全部方法都放進了一個名爲 ParallelStreams 的類,你就能夠用這個框架來測試順序加法器函數對前一千萬個天然數求和要用多久:
System.out.println("Sequential sum done in:" + measurePerf(ParallelStreams::sequentialSum, 10_000_000) + " msecs");
請注意,咱們對這個結果應持保留態度。影響執行時間的因素有不少,好比你的電腦支持多少個內核。你能夠在本身的機器上跑一下這些代碼。在一臺i5 6200U 的筆記本上運行它,輸出是這樣的:
Sequential sum done in:110 msecs
用傳統 for 循環的迭代版本執行起來應該會快不少,由於它更爲底層,更重要的是不須要對原始類型作任何裝箱或拆箱操做。若是你試着測量它的性能:
System.out.println("Iterative sum done in:" + measurePerf(ParallelStreams::iterativeSum, 10_000_000) + " msecs");
將獲得:
Iterative sum done in:4 msecs
如今咱們來對函數的並行版本作測試:
System.out.println("Parallel sum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000) + " msecs");
看看會出現什麼狀況:
Parallel sum done in: 525 msecs
這至關使人失望,求和方法的並行版本比順序版本要慢不少。你如何解釋這個意外的結果呢?這裏實際上有兩個問題:
第二個問題更有意思一點,由於你必須意識到某些流操做比其餘操做更容易並行化。具體來講, iterate 很難分割成可以獨立執行的小塊,由於每次應用這個函數都要依賴前一次應用的結果。
這意味着,在這個特定狀況下,概括進程不是像上圖那樣進行的;整張數字列表在概括過程開始時沒有準備好,於是沒法有效地把流劃分爲小塊來並行處理。把流標記成並行,你實際上是給順序處理增長了開銷,它還要把每次求和操做分到一個不一樣的線程上。
這就說明了並行編程可能很複雜,有時候甚至有點違反直覺。若是用得不對(好比採用了一個不易並行化的操做,如 iterate ),它甚至可能讓程序的總體性能更差,因此在調用那個看似神奇的 parallel 操做時,瞭解背後到底發生了什麼是頗有必要的。
使用更有針對性的方法
那到底要怎麼利用多核處理器,用流來高效地並行求和呢?咱們在第5章中討論了一個叫LongStream.rangeClosed 的方法。這個方法與 iterate 相比有兩個優勢。
讓咱們先看一下它用於順序流時的性能如何,看看拆箱的開銷到底要沒關係:
public static long rangedSum(long n) { return LongStream.rangeClosed(1, n) .reduce(0L, Long::sum); }
這一次的輸出是:
Ranged sum done in: 5 msecs
這個數值流比前面那個用 iterate 工廠方法生成數字的順序執行版本要快得多,由於數值流避免了非針對性流那些不必的自動裝箱和拆箱操做。因而可知,選擇適當的數據結構每每比並行化算法更重要。但要是對這個新版本應用並行流呢?
public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); }
如今把這個函數傳給的測試方法:
System.out.println("Parallel range sum done in:" + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs");
你會獲得:
Parallel range sum done in:2 msecs
amazing!終於,咱們獲得了一個比順序執行更快的並行概括,由於這一次概括操做能夠像並行流執行圖那樣執行了。這也代表,使用正確的數據結構而後使其並行工做可以保證最佳的性能。
儘管如此,請記住,並行化並非沒有代價的。並行化過程自己須要對流作遞歸劃分,把每一個子流的概括操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。但在多個內核之間移動數據的代價也可能比你想的要大,因此很重要的一點是要保證在內核中並行執行工做的時間比在內核之間傳輸數據的時間長。總而言之,不少狀況下不可能或不方便並行化。然而,在使用並行 Stream 加速代碼以前,你必須確保用得對;若是結果錯了,算得快就毫無心義了。讓咱們來看一個常見的陷阱。
錯用並行流而產生錯誤的首要緣由,就是使用的算法改變了某些共享狀態。下面是另外一種實現對前n個天然數求和的方法,但這會改變一個共享累加器:
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } }
這種代碼很是廣泛,特別是對那些熟悉指令式編程範式的程序員來講。這段代碼和你習慣的那種指令式迭代數字列表的方式很像:初始化一個累加器,一個個遍歷列表中的元素,把它們和累加器相加。
那這種代碼又有什麼問題呢?不幸的是,它真的無可救藥,由於它在本質上就是順序的。每次訪問 total 都會出現數據競爭。若是你嘗試用同步來修復,那就徹底失去並行的意義了。爲了說明這一點,讓咱們試着把 Stream 變成並行的:
public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n) .parallel() .forEach(accumulator::add); return accumulator.total; }
執行測試方法,並打印每次執行的結果:
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + " msecs");
你可能會獲得相似於下面這種輸出:
Result: 9869563545574 Result: 12405006536090 Result: 8268141260766 Result: 11208597038187 Result: 12358062322272 Result: 19218969315182 Result: 11255083226412 Result: 25746147125980 Result: 13327069088874 SideEffect parallel sum done in: 4 msecs
這回方法的性能可有可無了,惟一要緊的是每次執行都會返回不一樣的結果,都離正確值50000005000000 差很遠。這是因爲多個線程在同時訪問累加器,執行 total += value ,而這一句雖然看似簡單,卻不是一個原子操做。問題的根源在於, forEach 中調用的方法有反作用,它會改變多個線程共享的對象的可變狀態。要是你想用並行 Stream 又不想引起相似的意外,就必須避免這種狀況。
如今你知道了,共享可變狀態會影響並行流以及並行計算。如今,記住要避免共享可變狀態,確保並行 Stream 獲得正確的結果。接下來,咱們會看到一些實用建議,你能夠由此判斷何時能夠利用並行流來提高性能。
通常而言,想給出任何關於何時該用並行流的定量建議都是不可能也毫無心義的,由於任何相似於「僅當至少有一千個(或一百萬個或隨便什麼數字)元素的時候才用並行流)」的建議對於某臺特定機器上的某個特定操做多是對的,但在略有差別的另外一種狀況下可能就是大錯特錯。儘管如此,咱們至少能夠提出一些定性意見,幫你決定某個特定狀況下是否有必要使用並行流。
最後,咱們還要強調並行流背後使用的基礎架構是Java 7中引入的分支/合併框架。並行彙總的示例證實了要想正確使用並行流,瞭解它的內部原理相當重要,因此咱們會在下一節仔細研究分支/合併框架。
分支/合併框架的目的是以遞歸方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體結果。它是 ExecutorService 接口的一個實現,它把子任務分配給線程池(稱爲 ForkJoinPool )中的工做線程。首先來看看如何定義任務和子任務。
要把任務提交到這個池,必須建立 RecursiveTask<R> 的一個子類,其中 R 是並行化任務(以及全部子任務)產生的結果類型,或者若是任務不返回結果,則是 RecursiveAction 類型(固然它可能會更新其餘非局部機構)。要定義 RecursiveTask, 只需實現它惟一的抽象方法compute :
protected abstract R compute();
這個方法同時定義了將任務拆分紅子任務的邏輯,以及沒法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。正因爲此,這個方法的實現相似於下面的僞代碼:
if (任務足夠小或不可分) { 順序計算該任務 } else { 將任務分紅兩個子任務 遞歸調用本方法,拆分每一個子任務,等待全部子任務完成 合併每一個子任務的結果 }
通常來講並無確切的標準決定一個任務是否應該再拆分,但有幾種試探方法能夠幫助你作出這一決定。
你可能已經注意到,這只不過是著名的分治算法的並行版本而已。這裏舉一個用分支/合併框架的實際例子,還之前面的例子爲基礎,讓咱們試着用這個框架爲一個數字範圍(這裏用一個long[] 數組表示)求和。如前所述,你須要先爲RecursiveTask類作一個實現,就是下面代碼清單中的ForkJoinSumCalculator 。
public class ForkJoinSumCalculator extends RecursiveTask<Long> { /** * 再也不將任務分解爲子任務的數組大小 */ public static final long THRESHOLD = 10_000; /** * 要求和的數組 */ private final long[] numbers; /** * 子任務處理的數組的起始和終止位置 */ private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { // 該任務負責求和的部分的大小 int length = end - start; // 若是大小小於或等於閾值,順序計算結果 if (length <= THRESHOLD) { return computeSequentially(); } // 建立一個子任務來爲數組的前一半求和 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); // 利用另外一個ForkJoinPool線程異步執行新建立的子任務 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); // 同步執行第二個子任務,有可能容許進一步遞歸劃分 Long rightResult = rightTask.compute(); // 讀取第一個子任務的結果,若是還沒有完成就等待 Long leftResult = leftTask.join(); // 該任務的結果是兩個子任務結果的組合 return leftResult + rightResult; } private Long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } }
如今編寫一個方法來並行對前n個天然數求和就很簡單了。你只需把想要的數字數組傳給ForkJoinSumCalculator 的構造函數:
public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); }
這裏用了一個 LongStream 來生成包含前n個天然數的數組,而後建立一個 ForkJoinTask( RecursiveTask 的父類),並把數組傳遞 ForkJoinSumCalculator 的公共構造函數。最後,你建立了一個新的 ForkJoinPool ,並把任務傳給它的調用方法 。在ForkJoinPool 中執行時,最後一個方法返回的值就是 ForkJoinSumCalculator 類定義的任務結果。
請注意在實際應用時,使用多個 ForkJoinPool 是沒有什麼意義的。正是出於這個緣由,通常來講把它實例化一次,而後把實例保存在靜態字段中,使之成爲單例,這樣就能夠在軟件中任何部分方便地重用了。這裏建立時用了其默認的無參數構造函數,這意味着想讓線程池使用JVM可以使用的全部處理器。更確切地說,該構造函數將使用 Runtime.availableProcessors 的返回值來決定線程池使用的線程數。請注意 availableProcessors 方法雖然看起來是處理器,但它實際上返回的是可用內核的數量,包括超線程生成的虛擬內核。
當把 ForkJoinSumCalculator 任務傳給 ForkJoinPool 時,這個任務就由池中的一個線程執行,這個線程會調用任務的 compute 方法。該方法會檢查任務是否小到足以順序執行,若是不夠小則會把要求和的數組分紅兩半,分給兩個新的 ForkJoinSumCalculator ,而它們也由ForkJoinPool 安排執行。所以,這一過程能夠遞歸重複,把原任務分爲更小的任務,直到知足不方便或不可能再進一步拆分的條件(本例中是求和的項目數小於等於10 000)。這時會順序計算每一個任務的結果,而後由分支過程建立的(隱含的)任務二叉樹遍歷回到它的根。接下來會合並每一個子任務的部分結果,從而獲得總任務的結果。
你能夠再用一次本章開始時寫的測試框架,來看看顯式使用分支/合併框架的求和方法的性能:
System.out.println("ForkJoin sum done in: " + measurePerf( ForkJoinSumCalculator::forkJoinSum, 10_000_000) + " msecs");
它生成如下輸出:
ForkJoin sum done in: 41 msecs
這個性能看起來比用並行流的版本要差,但這只是由於必須先要把整個數字流都放進一個long[] ,以後才能在 ForkJoinSumCalculator 任務中使用它。
雖然分支/合併框架還算簡單易用,不幸的是它也很容易被誤用。如下是幾個有效使用它的最佳作法。
對於分支/合併拆分策略還有最後一點補充:你必須選擇一個標準,來決定任務是要進一步拆分仍是已小到能夠順序求值。
在 ForkJoinSumCalculator 的例子中,咱們決定在要求和的數組中最多包含10 000個項目時就再也不建立子任務了。這個選擇是很隨意的,但大多數狀況下也很難找到一個好的啓發式方法來肯定它,只能試幾個不一樣的值來嘗試優化它。在咱們的測試案例中,咱們先用了一個有1000萬項目的數組,意味着 ForkJoinSumCalculator 至少會分出1000個子任務來。這彷佛有點浪費資源,由於咱們用來運行它的機器上只有四個內核。在這個特定例子中可能確實是這樣,由於全部的任務都受CPU約束,預計所花的時間也差很少。
但分出大量的小任務通常來講都是一個好的選擇。這是由於,理想狀況下,劃分並行任務時,應該讓每一個任務都用徹底相同的時間完成,讓全部的CPU內核都一樣繁忙。不幸的是,實際中,每一個子任務所花的時間可能天差地別,要麼是由於劃分策略效率低,要麼是有不可預知的緣由,好比磁盤訪問慢,或是須要和外部服務協調執行。
分支/合併框架工程用一種稱爲工做竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味着這些任務差很少被平均分配到 ForkJoinPool 中的全部線程上。每一個線程都爲分配給它的任務保存一個雙向鏈式隊列,每完成一個任務,就會從隊列頭上取出下一個任務開始執行。基於前面所述的緣由,某個線程可能早早完成了分配給它的全部任務,也就是它的隊列已經空了,而其餘的線程還很忙。這時,這個線程並無閒下來,而是隨機選了一個別的線程,從隊列的尾巴上「偷走」一個任務。這個過程一直繼續下去,直到全部的任務都執行完畢,全部的隊列都清空。這就是爲何要劃成許多小任務而不是少數幾個大任務,這有助於更好地在工做線程之間平衡負載。
通常來講,這種工做竊取算法用於在池中的工做線程之間從新分配和平衡任務。當工做線程隊列中有一個任務被分紅兩個子任務時,一個子任務就被閒置的工做線程「偷走」了。如前所述,這個過程能夠不斷遞歸,直到規定子任務應順序執行的條件爲真。
如今你應該清楚流如何使用分支/合併框架來並行處理它的項目了,不過還有一點沒有講。本節中咱們分析了一個例子,你明確地指定了將數字數組拆分紅多個任務的邏輯。可是,使用本章前面講的並行流時就用不着這麼作了,這就意味着,確定有一種自動機制來爲你拆分流。這種新的自動機制稱爲 Spliterator ,咱們會在下一節中討論。
Spliterator 是Java 8中加入的另外一個新接口;這個名字表明「可分迭代器」(splitableiterator)。和 Iterator 同樣, Spliterator 也用於遍歷數據源中的元素,但它是爲了並行執行而設計的。雖然在實踐中可能用不着本身開發 Spliterator ,但瞭解一下它的實現方式會讓你對並行流的工做原理有更深刻的瞭解。Java 8已經爲集合框架中包含的全部數據結構提供了一個默認的 Spliterator 實現。集合實現了 Spliterator 接口,接口提供了一個 spliterator 方法。這個接口定義了若干方法,以下面的代碼清單所示。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); }
與往常同樣, T 是 Spliterator 遍歷的元素的類型。 tryAdvance 方法的行爲相似於普通的Iterator ,由於它會按順序一個一個使用 Spliterator 中的元素,而且若是還有其餘元素要遍歷就返回 true 。但 trySplit 是專爲 Spliterator 接口設計的,由於它能夠把一些元素劃出去分給第二個 Spliterator (由該方法返回),讓它們兩個並行處理。 Spliterator 還可經過estimateSize 方法估計還剩下多少元素要遍歷,由於即便不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點。
重要的是,要了解這個拆分過程在內部是如何執行的,以便在須要時可以掌控它。所以,咱們會在下一節中詳細地分析它。
將 Stream 拆分紅多個部分的算法是一個遞歸過程。第一步是對第一個Spliterator 調用 trySplit ,生成第二個 Spliterator 。第二步對這兩個 Spliterator 調用trysplit ,這樣總共就有了四個 Spliterator 。這個框架不斷對 Spliterator 調用 trySplit直到它返回 null ,代表它處理的數據結構不能再分割,如第三步所示。最後,這個遞歸拆分過程到第四步就終止了,這時全部的 Spliterator 在調用 trySplit 時都返回了 null 。
這個拆分過程也受 Spliterator 自己的特性影響,而特性是經過 characteristics 方法聲明的。
讓咱們來看一個可能須要你本身實現 Spliterator 的實際例子。咱們要開發一個簡單的方法來數數一個 String 中的單詞數。這個方法的一個迭代版本能夠寫成下面的樣子。
public static int countWordsIteratively(String s) { int counter = 0; boolean lastSpace = true; for (char c : s.toCharArray()) { if (Character.isWhitespace(c)) { lastSpace = true; } else { if (lastSpace) { counter++; } lastSpace = Character.isWhitespace(c); } } return counter; }
讓咱們把這個方法用在但丁的《神曲》的《地獄篇》的第一句話上:
public static final String SENTENCE = " Nel mezzo del cammin di nostra vita " + "mi ritrovai in una selva oscura" + " che la dritta via era smarrita "; System.out.println("Found " + countWordsIteratively(SENTENCE) + " words");
請注意,咱們在句子裏添加了一些額外的隨機空格,以演示這個迭代實現即便在兩個詞之間存在多個空格時也能正常工做。正如咱們所料,這段代碼將打印如下內容:
Found 19 words
理想狀況下,你會想要用更爲函數式的風格來實現它,由於就像咱們前面說過的,這樣你就能夠用並行 Stream 來並行化這個過程,而無需顯式地處理線程和同步問題。
首先你須要把 String 轉換成一個流。不幸的是,原始類型的流僅限於 int 、 long 和 double ,
因此你只能用 Stream<Character> :
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt);
你能夠對這個流作歸約來計算字數。在歸約流時,你得保留由兩個變量組成的狀態:一個 int用來計算到目前爲止數過的字數,還有一個 boolean 用來記得上一個遇到的 Character 是否是空格。由於Java沒有元組(tuple,用來表示由異類元素組成的有序列表的結構,不須要包裝對象),因此你必須建立一個新類 WordCounter 來把這個狀態封裝起來,以下所示。
private static class WordCounter { private final int counter; private final boolean lastSpace; public WordCounter(int counter, boolean lastSpace) { this.counter = counter; this.lastSpace = lastSpace; } public WordCounter accumulate(Character c) { if (Character.isWhitespace(c)) { return lastSpace ? this : new WordCounter(counter, true); } else { return lastSpace ? new WordCounter(counter + 1, false) : this; } } public WordCounter combine(WordCounter wordCounter) { return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace); } public int getCounter() { return counter; } }
在這個列表中, accumulate 方法定義瞭如何更改 WordCounter 的狀態,或更確切地說是用哪一個狀態來創建新的 WordCounter ,由於這個類是不可變的。每次遍歷到 Stream 中的一個新的Character 時,就會調用 accumulate 方法。具體來講,就像 countWordsIteratively 方法同樣,當上一個字符是空格,新字符不是空格時,計數器就加一。
調用第二個方法 combine 時,會對做用於 Character 流的兩個不一樣子部分的兩個WordCounter 的部分結果進行彙總,也就是把兩個 WordCounter 內部的計數器加起來。
private static int countWords(Stream<Character> stream) { WordCounter wordCounter = stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine); return wordCounter.getCounter(); }
如今你就能夠試一試這個方法,給它由包含但丁的《神曲》中《地獄篇》第一句的 String建立的流:
Stream<Character> stream = IntStream.range(0, SENTENCE.length()) .mapToObj(SENTENCE::charAt); System.out.println("Found " + countWords(stream) + " words");
你能夠和迭代版本比較一下輸出:
Found 19 words
到如今爲止都很好,但咱們以函數式實現 WordCounter 的主要緣由之一就是能輕鬆地並行處理,讓咱們來看看具體是如何實現的。
你能夠嘗試用並行流來加快字數統計,以下所示:
System.out.println("Found " + countWords(stream.parallel()) + " words");
不幸的是,此次的輸出是:
Found 25 words
顯然有什麼不對,可究竟是哪裏不對呢?問題的根源並不難找。由於原始的 String 在任意位置拆分,因此有時一個詞會被分爲兩個詞,而後數了兩次。這就說明,拆分流會影響結果,而把順序流換成並行流就可能使結果出錯。
如何解決這個問題呢?解決方案就是要確保 String 不是在隨機位置拆開的,而只能在詞尾拆開。要作到這一點,你必須爲 Character 實現一個 Spliterator ,它只能在兩個詞之間拆開String (以下所示),而後由此建立並行流。
private static class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { action.accept(string.charAt(currentChar++)); return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } }
這個 Spliterator 由要解析的 String 建立,並遍歷了其中的 Character ,同時保存了當前正在遍歷的字符位置。讓咱們快速回顧一下實現了Spliterator接口的WordCounterSpliterator 中的各個函數。
如今就能夠用這個新的 WordCounterSpliterator 來處理並行流了,以下所示:
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true);
傳給 StreamSupport.stream 工廠方法的第二個布爾參數意味着你想建立一個並行流。把這個並行流傳給 countWords 方法:
System.out.println("Found " + countWords(stream.parallel()) + " words");
能夠獲得意料之中的正確輸出:
Found 19 words
你已經看到了 Spliterator 如何讓你控制拆分數據結構的策略。 Spliterator 還有最後一個值得注意的功能,就是能夠在第一次遍歷、第一次拆分或第一次查詢估計大小時綁定元素的數據源,而不是在建立時就綁定。這種狀況下,它稱爲延遲綁定(late-binding)的 Spliterator 。