在Java 7以前,並行處理數據集合很是麻煩。第一,你得明確地把包含數據的數據結構分紅若干子部分。第二,你要給每一個子部分分配一個獨立的線程。第三,你須要在恰當的時候對它們進行同步來避免不但願出現的競爭條件,等待全部線程完成,最後把這些部分結果合併起來。Java 7引入了一個叫做分支/合併的框架,讓這些操做更穩定、更不易出錯。
Stream接口讓你不用太費力氣就能對數據集執行並行操做。它容許你聲明性地將順序流變爲並行流。此外,你將看到Java是如何變戲法的,或者更實際地來講, 流是如何在幕後應用Java 7引入的分支/合併框架的。java
並行流就是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每一個數據塊的流。算法
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
傳統寫法:
public static long iterativeSum(long n) {
long result = 0;
for (long i = 1L; i <= n; i++) {
result += i;
}
return result;
}複製代碼
能夠把流轉換成並行流,從而讓前面的函數歸約過程(也就是求和)並行運行——對順序流調用parallel方法:編程
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}複製代碼
在現實中,對順序流調用parallel方法並不意味着流自己有任何實際的變化。它在內部實際上就是設了一個boolean標誌,表示你想讓調用parallel以後進行的全部操做都並行執行。相似地,你只須要對並行流調用sequential方法就能夠把它變成順序流。請注意,你可能覺得把這兩個方法結合起來,就能夠更細化地控制在遍歷流時哪些操做要並行執行,哪些要順序執行。數組
配置並行流使用的線程池數據結構
看看流的parallel方法,你可能會想,並行流用的線程是從哪來的?有多少個?怎麼自定義這個過程呢?架構
並行流內部使用了默認的ForkJoinPool,它默認的線程數量就是你的處理器數量,這個值是由Runtime.getRuntime().available- Processors()獲得的。框架
可是你能夠經過系統屬性 java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池大小,以下所示:ide
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");函數
這是一個全局設置,所以它將影響代碼中全部的並行流。反過來講,目前還沒法專爲某個並行流指定這個值。通常而言,讓ForkJoinPool的大小等於處理器數量是個不錯的默認值,性能
除非你有很好的理由,不然咱們強烈建議你不要修改它。
並行編程可能很複雜,有時候甚至有點違反直覺。若是用得不對(好比採用了一 個不易並行化的操做,如iterate),它甚至可能讓程序的總體性能更差,因此在調用那個看似神奇的parallel操做時,瞭解背後到底發生了什麼是頗有必要的。並行化並非沒有代價的。並行化過程自己須要對流作遞歸劃分,把每一個子流的概括操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。但在多個內核之間移動數據的代價也可能比你想的要大,因此很重要的一點是要保證在內核中並行執行工做的時間比在內核之間傳輸數據的時間長。總而言之,不少狀況下不可能或不方便並行化。然而,在使用 並行Stream加速代碼以前,你必須確保用得對;若是結果錯了,算得快就毫無心義了。
錯用並行流而產生錯誤的首要緣由,就是使用的算法改變了某些共享狀態。下面是另外一種實現對前n個天然數求和的方法,但這會改變一個共享累加器:
public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add)
return accumulator.total;
}
public class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}複製代碼
這段代碼自己上就是順序的,由於每次訪問total都會出現數據競爭。接下來將這段代碼改成並行:
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;}
System.out.println("SideEffect parallel sum done in: " + measurePerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) +" msecs" );
Result: 5959989000692
Result: 7425264100768
Result: 6827235020033
Result: 7192970417739
Result: 6714157975331
Result: 7715125932481
SideEffect parallel sum done in: 49 msecs複製代碼
這回方法的性能可有可無了,惟一要緊的是每次執行都會返回不一樣的結果,都離正確值50000005000000差很遠。這是因爲多個線程在同時訪問累加器,執行total += value,而這一句然看似簡單,卻不是一個原子操做。問題的根源在於,forEach中調用的方法有反作用,它會改變多個線程共享的對象的可變狀態。要是你想用並行Stream又不想引起相似的意外,就必須避免這種狀況。如今你知道了,共享可變狀態會影響並行流以及並行計算。
並行流背後使用的基礎架構是Java 7中引入的分支/合併框架。
分支/合併框架的目的是以遞歸方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體結果。它是ExecutorService接口的一個實現,它把子任務分配給線程池(稱爲ForkJoinPool)中的工做線程。
要把任務提交到這個池,必須建立RecursiveTask
protected abstract R compute();複製代碼
這個方法同時定義了將任務拆分紅子任務的邏輯,以及沒法再拆分或不方便再拆分時,生成單個子任務結果的邏輯。下圖表示了遞歸任務的拆分過程:
讓咱們試着用這個框架爲一個數字範圍(這裏用一個 long[]數組表示)求和。如前所述,你須要先爲RecursiveTask類作一個實現,就是下面代碼清單中的ForkJoinSumCalculator。
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
public ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}複製代碼
這裏用了一個LongStream來生成包含前n個天然數的數組,而後建立一個ForkJoinTask (RecursiveTask的父類),並把數組傳遞給代碼清單7-2所示ForkJoinSumCalculator的公共構造函數。最後,你建立了一個新的ForkJoinPool,並把任務傳給它的調用方法 。在ForkJoinPool中執行時,最後一個方法返回的值就是ForkJoinSumCalculator類定義的任務結果。請注意在實際應用時,使用多個ForkJoinPool是沒有什麼意義的。正是出於這個緣由,通常來講把它實例化一次,而後把實例保存在靜態字段中,使之成爲單例,這樣就能夠在軟件中任何部分方便地重用了。這裏建立時用了其默認的無參數構造函數,這意味着想讓線程池使用JVM可以使用的全部處理器。更確切地說,該構造函數將使用Runtime.availableProcessors的返回值來決定線程使用的線程數。請注意availableProcessors方法雖然看起來是處理器, 但它實際上返回的是可用內核的數量,包括超線程生成的虛擬內核。當把ForkJoinSumCalculator任務傳給ForkJoinPool時,這個任務就由中的一個線程 執行,這個線程會調用任務的compute方法。該方法會檢查任務是否小到足以順序執行,若是不夠小則會把要求和的數組分紅兩半,分給兩個新的ForkJoinSumCalculator,而它們也由ForkJoinPool安排執行。所以,這一過程能夠遞歸重複,把原任務分爲更小的任務,直到知足不方便或不可能再進一步拆分的條件(本例中是求和的項目數小於等於10000)。這時會順序計算每一個任務的結果,而後由分支過程建立的(隱含的)任務二叉樹遍歷回到它的根。接下來會合並每一個子任務的部分結果,從而獲得總任務的結果。這一過程以下圖所示。
實際中,每一個子任務所花的時間可能天差地別,要麼是由於劃分策略效率低,要麼是有不可預知的緣由,好比磁盤訪問慢,或是須要和外部任務協調執行。分支/合併框架工程用一種稱爲工做竊取(work stealing)的技術來解決這個問題。在實際應用中,這意味着這些任務差很少被平均分配到ForkJoinPool中的全部線程上。每一個線程都爲分配給它的任務保存一個雙向鏈式隊列,每完成一個任務,就會從隊列頭上取出下一個任務開始執行。基於前面所述的緣由,某個線程可能早早完成了分配給它的全部任務,也就是它的隊列已經空了,而其餘的線程還很忙。這時,這個線程並無閒下來,而是隨機選了一個別的線程,從隊列的尾巴上「偷走」一個任務。這個過程一直繼續下去,直到全部的任務都執行完畢,全部的隊列都清空。這就是爲何要劃成許多小任務而不是少數幾個大任務,這有助於更好地在工做線程之間平衡負載。通常來講,這種工做竊取算法用於在池中的工做線程之間從新分配和平衡任務。
Spliterator是Java 8中加入的另外一個新接口;這個名字表明「可分迭代器」(splitable iterator)。和Iterator同樣,Spliterator也用於遍歷數據源中的元素,但它是爲了並行執行而設計的。
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}複製代碼
與往常同樣,T是Spliterator遍歷的元素的類型。tryAdvance方法的行爲相似於普通的 Iterator,由於它會按順序一個一個使用Spliterator中的元素,而且若是還有其餘元素要遍歷就返回true。但trySplit是專爲Spliterator接口設計的,由於它能夠把一些元素劃出去分給第二個Spliterator(由該方法返回),讓它們兩個並行處理。Spliterator還可經過 estimateSize方法估計還剩下多少元素要遍歷,由於即便不那麼確切,能快速算出來是一個值也有助於讓拆分均勻一點。
將Stream拆分紅多個部分的算法是一個遞過程,如圖7-6所示。第一步是對第一個 Spliterator調用trySplit,生成第二個Spliterator。第二步對這兩個Spliterator調用 trysplit,這樣總共就有了四個Spliterator。這個框架不斷對Spliterator調用trySplit直到它返回null,代表它處理的數據結構不能再分割,如第三步所示。最後,這個遞歸拆分過程到第四步就終止了,這時全部的Spliterator在調用trySplit時都返回了null。
Spliterator的特性
Spliterator接口聲明的最後一個抽象方法是characteristics,它將返回一個int,代 表Spliterator自己特性集的編碼。
使用Spliterator的客戶能夠用這些特性來更好地控制和優化它的使用。
表7-2總結了這些特性。(不幸的是,雖然它們在概念上與收集器的特性有重疊,編碼卻不同。)
![](https://user-gold-cdn.xitu.io/2019/8/28/16cd56026a7081e8?w=1260&h=456&f=png&s=67011)複製代碼
略
在本章中,你瞭解瞭如下內容。