將一個順序執行的流轉變成一個併發的流只要調用 parallel()方法
public static long parallelSum(
long n){
return Stream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L,Long::sum);
}
並行流就是一個把內容分紅多個數據塊,並用不不一樣的線程分別處理每一個數據塊的流。最後合併每一個數據塊的計算結果。
將一個併發流轉成順序的流只要調用sequential()方法
stream.parallel() .filter(...) .sequential() .map(...) .parallel() .reduce();
這兩個方法能夠屢次調用, 只有最後一個調用決定這個流是順序的仍是併發的。
併發流使用的默認線程數等於你機器的處理器核心數。
經過這個方法能夠修改這個值,這是全局屬性。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
並不是使用多線程並行流處理數據的性能必定高於單線程順序流的性能,由於性能受到多種因素的影響。
如何高效使用併發流的一些建議:
1. 若是不肯定, 就本身測試。
2. 儘可能使用基本類型的流 IntStream, LongStream, and DoubleStream
3. 有些操做使用併發流的性能會比順序流的性能更差,好比limit,findFirst , 依賴元素順序的操做在併發流中是極其消耗性能的 。findAny的性能就會好不少,應爲不依賴順序。
4. 考慮流中計算的性能(Q)和操做的性能(N)的對比, Q表示單個處理所需的時間, N表示須要處理的數量,若是Q的值越大, 使用併發流的性能就會越高。
5. 數據量不大時使用併發流,性能得不到提高。
6.考慮數據結構:併發流須要對數據進行分解,不一樣的數據結構被分解的性能時不同的。
流的數據源和可分解性
源 |
可分解性 |
ArrayList |
很是好 |
LinkedList |
差 |
IntStream.range |
很是好 |
Stream.iterate |
差 |
HashSet |
好 |
TreeSet |
好 |
7. 流的特性以及中間操做對流的修改都會對數據對分解性能形成影響。 好比固定大小的流在任務分解的時候就能夠平均分配,可是若是有filter操做,那麼流就不能預先知道在這個操做後還會剩餘多少元素。
8. 考慮最終操做的性能:若是最終操做在合併併發流的計算結果時的性能消耗太大,那麼使用併發流提高的性能就會得不償失。
9.須要理解併發流實現機制:
fork/join 框架
fork/join框架是jdk1.7引入的,java8的stream多線程並不是流的正是以這個框架爲基礎的,因此想要深刻理解併發流就要學習fork/join框架。
fork/join框架的目的是以遞歸方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體結果。它是ExecutorService接口的一個實現,它把子任務分配線程池(ForkJoinPool)中的工做線程。要把任務提交到這個線程池,必須建立RecursiveTask<R>的一個子類,若是任務不返回結果則是RecursiveAction的子類。
fork/join框架流程示意圖:
廢話很少說,上代碼:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;
/**
* Created by sunjin on 2016/7/5.
* 繼承RecursiveTask來建立能夠用於分支/合併的框架任務
*/
public class ForkJoinSumCalculator
extends RecursiveTask<Long> {
//要求和的數組
private final long[]
numbers;
//子任務處理的數組開始和終止的位置
private final int
start;
private final int
end;
//不在將任務分解成子任務的閥值大小
public static final int
THRESHOLD = 10000;
//用於建立組任務的構造函數
public ForkJoinSumCalculator(
long[] numbers){
this(numbers, 0, numbers.
length);
}
//用於遞歸建立子任務的構造函數
public ForkJoinSumCalculator(
long[] numbers,
int start,
int end){
this.
numbers = numbers;
this.
start = start;
this.
end = end;
}
//重寫接口的方法
@Override
protected Long compute() {
//當前任務負責求和的部分的大小
int length =
end -
start;
//若是小於等於閥值就順序執行計算結果
if(length <=
THRESHOLD){
return computeSequentially();
}
//建立子任務來爲數組的前一半求和
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(
numbers,
start,
start + length/2);
//將子任務拆分出去,丟到ForkJoinPool線程池異步執行。
leftTask.fork();
//建立子任務來爲數組的後一半求和
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(
numbers,
start + length/2,
end);
//第二個任務直接使用當前線程計算而再也不開啓新的線程。
long rightResult = rightTask.compute();
//讀取第一個子任務的結果,若是沒有完成則等待。
long leftResult = leftTask.join();
//合併兩個子任務的計算結果
return rightResult + leftResult;
}
//順序執行計算的簡單算法
private long computeSequentially(){
long sum = 0;
for(
int i =
start; i<
end; i++){
sum +=
numbers[i];
}
return sum;
}
//提供給外部使用的入口方法
public static long forkJoinSum(
long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task =
new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
}
注意事項:
1. 調用join 方法要等到調用這個方法的線程的本身的任務完成以後。
2. 不要直接去調用ForkJoinPool的invoke方法 ,只須要調用RecursiveTask的fork或者compute。
3. 拆解任務時只須要調用一次fork執行其中一個子任務, 另外一個子任務直接利用當前線程計算。應爲fork方法只是在ForkJoinPool中計劃一個任務。
4.任務拆分的粒度不宜太細,不否得不償失。
工做盜取
因爲各類因素,即使任務拆分是平均的,也不能保證全部子任務能同時執行結束, 大部分狀況是某些子任務已經結束, 其餘子任務還有不少, 在這個時候就會有不少資源空閒, 因此fork/join框架經過工做盜取機制來保證資源利用最大化, 讓空閒的線程去偷取正在忙碌的線程的任務。
在沒有任務線程中的任務存在一個隊列當中, 線程每次會從頭部獲取一個任務執行,執行完了再從queue的頭部獲取一個任務,直到隊列中的全部任務執行完,這個線程偷取別的線程隊列中的任務時會從隊列到尾部獲取任務,而且執行,直到全部任務執行結束。
從這個角度分析,任務的粒度越小, 資源利用越充分。
工做盜取示意圖
可拆分迭代器Spliterator
它和Iterator同樣也是用於遍歷數據源中的元素,可是他是爲並行執行而設計的。 java8 全部數據結構都實現了 這個接口, 通常狀況不須要本身寫實現代碼。可是瞭解它的實現方式會讓你對並行流的工做原理有更深的瞭解。(未完待續)