對於從事Java開發的童鞋來講,相信對於Java8的並行流並不陌生,沒錯,咱們經常用它來執行並行任務,可是因爲並行流(parallel stream)採用的是享線程池,可能會對咱們的性能形成嚴重影響,那怎麼處理呢?java
問題異步
首先咱們來看看具體的問題。在開發中,咱們經常經過如下方法,實現並行流執行並行任務:性能
myList.parallelStream.map(obj -> longRunningOperation())網站
可是這存在一個嚴重的問題:在 JVM 的後臺,使用通用的 fork/join 池來完成上述功能,該池是全部並行流共享的。默認狀況,fork/join 池會爲每一個處理器分配一個線程。假設你有一臺16核的機器,這樣你就只能建立16個線程。對 CPU 密集型的任務來講,這樣是有意義的,由於你的機器確實只能執行16個線程。可是真實狀況下,不是全部的任務都是 CPU 密集型的。例如:ui
myList.parallelStream this
.map(this::retrieveFromA)spa
.map(this::processUsingB)線程
.forEach(this::saveToC)blog
myList.parallelStream 開發
.map(this::retrieveFromD)
.map(this::processUsingE)
.forEach(this::saveToD)
這兩個流很大程度上是受限於IO操做,因此會等待其餘系統。但這兩個流使用相同的(小)線程池,所以會相互等待而被阻塞,很是不友好。好比:
final List<Integer> firstRange = buildIntRange();
firstRange.parallelStream().forEach((number) -> {
try {
// do something slow
Thread.sleep(5);
} catch (InterruptedException e) { }
});
在執行期間,我獲取了一份線程dump的文件。這是相關的線程:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
如今,我要並行的執行這兩個並行流:
Runnable firstTask = () -> {
firstRange.parallelStream().forEach((number) -> {
try {
// do something slow
Thread.sleep(5);
} catch (InterruptedException e) { }
});
};
Runnable secondTask = () -> {
secondRange.parallelStream().forEach((number) -> {
try {
// do something slow
Thread.sleep(5);
} catch (InterruptedException e) { }
});
};
// run threads
此次咱們再看一下線程dump文件:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
正如你所見,結果是同樣的。咱們只使用了4個線程。
解決辦法
對於上面的問題,咱們能夠在JVM 後臺使用 fork/join 池,在 ForkJoinTask 的文檔中,咱們能夠看到:
若是合適,安排一個異步執行的任務到當前正在運行的池中。若是任務不在inForkJoinPool()中,也能夠調用ForkJoinPool.commonPool()獲取新的池來執行,好比:
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(() -> {
firstRange.parallelStream().forEach((number) -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) { }
});
});
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
forkJoinPool2.submit(() -> {
secondRange.parallelStream().forEach((number) -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
}
});
});
如今,咱們再次查看線程池:
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-4
ForkJoinPool-2-worker-1
ForkJoinPool-2-worker-2
ForkJoinPool-2-worker-3
ForkJoinPool-1-worker-4
上面這種方法爲何又正確顯示了呢?由於咱們建立本身的線程池,因此能夠避免共享線程池,若是有須要,甚至能夠分配比處理機數量更多的線程。
ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);
以上就是Java8並行流在使用中所存在的一些問題及解決辦法,部份內容參考自一個Java教學網站,但願對Java初學者有所幫助。