並行化流被分紅多個塊,每一個塊獨立處理,結果在最後彙總。html
CPU密集型代碼以下:java
private long countPrimes(int max) { return range(1, max).parallel().filter(this::isPrime).count(); } private boolean isPrime(long n) { return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0); } |
countPrimes 計算1到最大值之間的素數的數量。數字流由range方法建立,切換到並行模式,過濾掉非素數,剩餘的計算總數。因爲isPrime 方法極其無效且佔用大量CPU,咱們能夠利用並行化並利用全部可用的CPU內核。api
咱們來看另外一個例子:網絡
private List<StockInfo> getStockInfo(Stream<String> symbols) { return symbols.parallel() .map(this::getStockInfo) //slow network operation .collect(toList()); } |
輸入是一個股票代碼列表,咱們必須調用慢速網絡操做來獲取有關股票的一些細節。在這裏,咱們不處理CPU密集型操做,但咱們也能夠利用並行化。並行執行多個網絡請求是個好主意。一樣,並行流的一個很好的任務,你贊成嗎?this
若是您這樣作,請再次查看上一個示例。有一個很大的錯誤。你看到了嗎?問題是全部並行流都使用公共fork-join線程池。若是提交長時間運行的任務,則會有效地阻塞池中的全部線程。所以,您將阻塞使用並行流的全部其餘任務。spa
想象一下servlet環境,當一個請求調用時getStockInfo() ,另外一個請求調用 countPrimes()。即便每一個都須要不一樣的資源,也會阻止另外一個。更糟糕的是,你不能爲並行流指定線程池; 整個類加載器必須使用相同的。.net
讓咱們在下面的例子中說明它:線程
private void run() throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); // Simulating multiple threads in the system // if one of them is executing a long-running task. // Some of the other threads/tasks are waiting // for it to finish es.execute(() -> countPrimes(MAX, 1000)); //incorrect task es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.execute(() -> countPrimes(MAX, 0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS); } private void countPrimes(int max, int delay) { System.out.println( range(1, max).parallel() .filter(this::isPrime).peek(i -> sleep(delay)).count() ); } |
在這裏,咱們模擬系統中的六個線程。全部這些都在執行CPU密集型任務,第一個被「暫停」,在它找到素數後就睡了一秒鐘。這只是一我的爲的例子; 你能夠想象一個被卡住或執行阻塞操做的線程。code
問題是:執行此代碼時會發生什麼?咱們有六個任務; 其中一個將須要一成天才能完成,其他的應該更快完成。絕不奇怪,每次執行代碼時,都會獲得不一樣的結果。你想在生產系統中有這樣的行爲嗎?一個杜塞的任務取消了應用程序的其他部分?我猜不會。htm
關於如何確保永遠不會發生這樣的事情,只有兩種選擇。第一個是確保提交到公共fork-join池的全部任務都不會卡,必須在合理的時間內完成。但這提及來容易作起來難,尤爲是在複雜的應用程序中。
另外一種選擇是不使用並行流,並等到Oracle容許咱們指定用於並行流的線程池。