Java 8 並行流:必備技巧

Java 8 並行流(parallel stream)採用共享線程池,對性能形成了嚴重影響。能夠包裝流來調用本身的線程池解決性能問題。html

問題java

Java 8 的並行流可讓咱們相對輕鬆地執行並行任務。git

myList.parallelStream.map(obj -> longRunningOperation()) 
複製代碼

可是這樣存在一個嚴重的問題:在 JVM 的後臺,使用通用的 fork/join
池來完成上述功能,該池是全部並行流共享的。默認狀況,fork/join
池會爲每一個處理器分配一個線程。假設你有一臺16核的機器,這樣你就只能建立16個線程。對 CPU
密集型的任務來講,這樣是有意義的,由於你的機器確實只能執行16個線程。可是真實狀況下,不是全部的任務都是 CPU 密集型的。例如:github

myList.parallelStream  
   .map(this::retrieveFromA)
   .map(this::processUsingB)
   .forEach(this::saveToC)
 
myList.parallelStream  
   .map(this::retrieveFromD)
   .map(this::processUsingE)
   .forEach(this::saveToD)
複製代碼

這兩個流很大程度上是受限於IO操做,因此會等待其餘系統。但這兩個流使用相同的(小)線程池,所以會相互等待而被阻塞。這個很是很差,能夠改進。咱們以一個流爲例:bash

final List<Integer> firstRange = buildIntRange();  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
});
複製代碼

完整的代碼能夠在gist上查看。異步

在執行期間,我獲取了一份線程dump的文件。這是相關的線程(在個人Macbook上):性能

ForkJoinPool.commonPool-worker-1 
ForkJoinPool.commonPool-worker-2 
ForkJoinPool.commonPool-worker-3 
ForkJoinPool.commonPool-worker-4
複製代碼

如今,我要並行的執行這兩個並行流ui

Runnable firstTask = () -> {  
   firstRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};
 
Runnable secondTask = () -> {  
   secondRange.parallelStream().forEach((number) -> {
      try {
         // do something slow
         Thread.sleep(5);
      } catch (InterruptedException e) { }
   });
};
// run threads
複製代碼

完整的代碼能夠在gist上查看。this

此次咱們再看一下線程dump文件:spa

ForkJoinPool.commonPool-worker-1 
ForkJoinPool.commonPool-worker-2 
ForkJoinPool.commonPool-worker-3 
ForkJoinPool.commonPool-worker-4
複製代碼

正如你所見,結果是同樣的。咱們只使用了4個線程。

一種變通方案

正如我所提到的,JVM 後臺使用 fork/join 池,在 ForkJoinTask 的文檔中,咱們能夠看到:

若是合適,安排一個異步執行的任務到當前正在運行的池中。若是任務不在inForkJoinPool()中,也能夠調用ForkJoinPool.commonPool()獲取新的池來執行。

讓我試一試……

ForkJoinPool forkJoinPool = new ForkJoinPool(3);  
forkJoinPool.submit(() -> {  
    firstRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) { }
    });
});
 
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);  
forkJoinPool2.submit(() -> {  
    secondRange.parallelStream().forEach((number) -> {
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
        }
    });
});
複製代碼

完整的代碼能夠在gist上查看。

如今,咱們再次查看線程池:

ForkJoinPool-1-worker-1 
ForkJoinPool-1-worker-2 
ForkJoinPool-1-worker-3 
ForkJoinPool-1-worker-4 
ForkJoinPool-2-worker-1 
ForkJoinPool-2-worker-2 
ForkJoinPool-2-worker-3 
ForkJoinPool-1-worker-4
複製代碼

由於咱們建立本身的線程池,因此能夠避免共享線程池,若是有須要,甚至能夠分配比處理機數量更多的線程。

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>); 
複製代碼


歡迎關注知乎專欄《跟上Java8》,分享優秀的Java8中文指南、教程,同時歡迎投稿高質量的文章。


原文連接: tobyhobson 翻譯: ImportNew.com - paddx
譯文連接: www.importnew.com/16801.html
相關文章
相關標籤/搜索