1.一般咱們在獲取到一個list列表後須要一個挨着一個的進行遍歷處理數據,若是每次處理都須要長時間操做,那整個流程下來時間就是每一次處理時間的總和。java
2.Java8的stream接口極大地減小了for循環寫法的複雜性,stream提供了map/reduce/collect等一系列聚合接口,還支持併發操做:parallelStream。web
定義一個位置類和服務,其中建立10個地址位置算法
package cn.chinotan.dto; /** * @program: test * @description: 位置 * @author: xingcheng * @create: 2018-11-17 19:36 **/ public class Location { String name; public Location() { } public Location(String name) { this.name = name; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
package cn.chinotan.controller; import cn.chinotan.dto.Location; import org.apache.commons.lang3.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.ArrayList; import java.util.List; /** * @program: test * @description: 位置服務 * @author: xingcheng * @create: 2018-11-17 19:42 **/ @RestController public class LocationController { @GetMapping("/location") public List<Location> getLocations() { List<Location> locations = new ArrayList<>(); for (int i = 0; i < 10; i++) { locations.add(new Location(StringUtils.join("London", i))); } return locations; } }
定義一個溫度類和服務,其中溫度值介於30到50之間。 在實現中添加500 ms的延遲以模擬耗時操做spring
package cn.chinotan.dto; /** * @program: test * @description: 溫度 * @author: xingcheng * @create: 2018-11-17 19:35 **/ public class Temperature { private Double temperature; private String scale; public Double getTemperature() { return temperature; } public void setTemperature(Double temperature) { this.temperature = temperature; } public String getScale() { return scale; } public void setScale(String scale) { this.scale = scale; } }
package cn.chinotan.controller; import cn.chinotan.dto.Temperature; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; import java.util.Random; /** * @program: test * @description: 溫度服務 * @author: xingcheng * @create: 2018-11-17 19:45 **/ @RestController public class TemperatureController { @GetMapping("/temperature/{city}") public Temperature getAverageTemperature(@PathVariable("city") String city) { Temperature temperature = new Temperature(); temperature.setTemperature((double) (new Random().nextInt(20) + 30)); temperature.setScale("Celsius"); try { Thread.sleep(500); } catch (InterruptedException ignored) { } return temperature; } }
定義一個天氣預報類和響應apache
package cn.chinotan.dto; /** * @program: test * @description: 天氣預報 * @author: xingcheng * @create: 2018-11-17 19:37 **/ public class Forecast implements Comparable<Forecast> { private Location location; private Temperature temperature; public Forecast(Location location) { this.location = location; } public Forecast setTemperature(final Temperature temperature) { this.temperature = temperature; return this; } public Location getLocation() { return location; } public void setLocation(Location location) { this.location = location; } public Temperature getTemperature() { return temperature; } @Override public int compareTo(Forecast o) { if (o.getTemperature().getTemperature() > temperature.getTemperature()) { return 1; } else { return -1; } } }
package cn.chinotan.dto.response; import cn.chinotan.dto.Forecast; import java.util.ArrayList; import java.util.List; /** * @program: test * @description: 服務響應 * @author: xingcheng * @create: 2018-11-17 19:39 **/ public class ServiceResponse { private long processingTime; private List<Forecast> forecasts = new ArrayList<>(); public void setProcessingTime(long processingTime) { this.processingTime = processingTime; } public ServiceResponse forecasts(List<Forecast> forecasts) { this.forecasts = forecasts; return this; } public long getProcessingTime() { return processingTime; } public List<Forecast> getForecasts() { return forecasts; } public void setForecasts(List<Forecast> forecasts) { this.forecasts = forecasts; } }
package cn.chinotan.controller; import cn.chinotan.dto.Forecast; import cn.chinotan.dto.Location; import cn.chinotan.dto.Temperature; import cn.chinotan.dto.response.ServiceResponse; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.RestTemplate; import rx.Observable; import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; /** * @program: test * @description: 天氣預報 * @author: xingcheng * @create: 2018-11-17 19:47 **/ @RestController public class ForecastController { @Autowired RestTemplate restTemplate; private final static String SERVER_ADDRESS = "http://localhost:9000"; @GetMapping("/v1/forecast") public ServiceResponse getLocationsWithTemperatureV1() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/location"), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference<List<Location>>() { }).getBody(); locations.forEach(location -> { Temperature temperature = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/temperature/", location.getName()), HttpMethod.GET, HttpEntity.EMPTY, Temperature.class).getBody(); response.getForecasts().add(new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return response; } @GetMapping("/v2/forecast") public ServiceResponse getLocationsWithTemperatureV2() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/location"), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference<List<Location>>() { }).getBody(); List<Forecast> collect = locations.parallelStream().map(location -> { Temperature temperature = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/temperature/", location.getName()), HttpMethod.GET, HttpEntity.EMPTY, Temperature.class).getBody(); return new Forecast(location).setTemperature(temperature); }).collect(Collectors.toList()); long sortStart = System.currentTimeMillis(); List<Forecast> collectResponse = collect.parallelStream().sorted().collect(Collectors.toList()); long sortEnd = System.currentTimeMillis(); System.out.println("排序花費時間:" + (sortEnd - sortStart)); response.setForecasts(collectResponse); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return response; } @GetMapping("/v3/forecast") public ServiceResponse getLocationsWithTemperatureV3() { long startTime = System.currentTimeMillis(); ServiceResponse response = new ServiceResponse(); List<Location> locations = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/location"), HttpMethod.GET, HttpEntity.EMPTY, new ParameterizedTypeReference<List<Location>>() { }).getBody(); locations.parallelStream().forEachOrdered(location -> { Temperature temperature = restTemplate.exchange(StringUtils.join(SERVER_ADDRESS, "/temperature/", location.getName()), HttpMethod.GET, HttpEntity.EMPTY, Temperature.class).getBody(); response.getForecasts().add(new Forecast(location).setTemperature(temperature)); }); long endTime = System.currentTimeMillis(); response.setProcessingTime(endTime - startTime); return response; } }
其中,v1接口是普通的list遍歷實現,接口響應:api
能夠看到接口響應時間是每次http調用的時間(500毫秒)總和多一些數組
接下來調用v2接口:安全
能夠看到時間縮短了5倍服務器
先了解什麼是流?數據結構
Stream是java8中新增長的一個特性, 統稱爲流.
Stream 不是集合元素,它不是數據結構並不保存數據,它是有關算法和計算的,它更像一個高級版本的 Iterator。原始版本的 Iterator,用戶只能顯式地一個一個遍歷元素並對其執行某些操做;高級版本的 Stream,用戶只要給出須要對其包含的元素執行什麼操做,好比 「過濾掉長度大於 10 的字符串」、「獲取每一個字符串的首字母」等,Stream 會隱式地在內部進行遍歷,作出相應的數據轉換。
Stream 就如同一個迭代器(Iterator),單向,不可往復,數據只能遍歷一次,遍歷過一次後即用盡了,就比如流水從面前流過,一去不復返。
而和迭代器又不一樣的是,Stream 能夠並行化操做,迭代器只能命令式地、串行化操做。顧名思義,當使用串行方式去遍歷時,每一個 item 讀完後再讀下一個 item。而使用並行去遍歷時,數據會被分紅多個段,其中每個都在不一樣的線程中處理,而後將結果一塊兒輸出。Stream 的並行操做依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的並行 API 演變歷程基本以下:
.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda
parallelStream其實就是一個並行執行的流.它經過默認的ForkJoinPool,可能提升你的多線程任務的速度.
Stream具備平行處理能力,處理的過程會分而治之,也就是將一個大任務切分紅多個小任務,這表示每一個任務都是一個操做,所以像如下的程式片斷:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream() .forEach(out::println);
你獲得的展現順序不必定會是一、二、三、四、五、六、七、八、9,而多是任意的順序,就forEach()這個操做來講,若是平行處理時,但願最後順序是按照原來Stream的數據順序,那能夠調用forEachOrdered()。例如:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); numbers.parallelStream() .forEachOrdered(out::println);
注意:若是forEachOrdered()中間有其餘如filter()的中介操做,會試着平行化處理,而後最終forEachOrdered()會以原數據順序處理,所以,使用forEachOrdered()這類的有序處理,可能會(或徹底失去)失去平行化的一些優點,實際上中介操做亦有可能如此,例如sorted()方法。
例如:執行v3接口:
能夠看到執行時間和普通的流同樣。。。
要想深刻的研究parallelStream以前,那麼咱們必須先了解ForkJoin框架和ForkJoinPool.由於兩種關係甚密,故在此簡單介紹一下ForkJoinPool,若有興趣能夠更深刻的去了解下ForkJoin***(固然,若是你想真正的搞透parallelStream,那麼你依然須要先搞透ForkJoinPool).*
ForkJoin框架是從jdk7中新特性,它同ThreadPoolExecutor同樣,也實現了Executor和ExecutorService接口。它使用了一個無限隊列來保存須要執行的任務,而線程的數量則是經過構造函數傳入,若是沒有向構造函數中傳入但願的線程數量,那麼當前計算機可用的CPU數量會被設置爲線程數量做爲默認值。
ForkJoinPool主要用來使用分治法(Divide-and-Conquer Algorithm)來解決問題。典型的應用好比快速排序算法。這裏的要點在於,ForkJoinPool須要使用相對少的線程來處理大量的任務。好比要對1000萬個數據進行排序,那麼會將這個任務分割成兩個500萬的排序任務和一個針對這兩組500萬數據的合併任務。以此類推,對於500萬的數據也會作出一樣的分割處理,到最後會設置一個閾值來規定當數據規模到多少時,中止這樣的分割處理。好比,當元素的數量小於10時,會中止分割,轉而使用插入排序對它們進行排序。那麼到最後,全部的任務加起來會有大概2000000+個。問題的關鍵在於,對於一個任務而言,只有當它全部的子任務完成以後,它纔可以被執行。
因此當使用ThreadPoolExecutor時,使用分治法會存在問題,由於ThreadPoolExecutor中的線程沒法像任務隊列中再添加一個任務而且在等待該任務完成以後再繼續執行。而使用ForkJoinPool時,就可以讓其中的線程建立新的任務,並掛起當前的任務,此時線程就可以從隊列中選擇子任務執行。
那麼使用ThreadPoolExecutor或者ForkJoinPool,會有什麼性能的差別呢?
首先,使用ForkJoinPool可以使用數量有限的線程來完成很是多的具備父子關係的任務,好比使用4個線程來完成超過200萬個任務。可是,使用ThreadPoolExecutor時,是不可能完成的,由於ThreadPoolExecutor中的Thread沒法選擇優先執行子任務,須要完成200萬個具備父子關係的任務時,也須要200萬個線程,顯然這是不可行的。
forkjoin最核心的地方就是利用了現代硬件設備多核,在一個操做時候會有空閒的cpu,那麼如何利用好這個空閒的cpu就成了提升性能的關鍵,而這裏咱們要提到的工做竊取(work-stealing)算法就是整個forkjion框架的核心理念,工做竊取(work-stealing)算法是指某個線程從其餘隊列裏竊取任務來執行。
那麼爲何須要使用工做竊取算法呢?
假如咱們須要作一個比較大的任務,咱們能夠把這個任務分割爲若干互不依賴的子任務,爲了減小線程間的競爭,因而把這些子任務分別放到不一樣的隊列裏,併爲每一個隊列建立一個單獨的線程來執行隊列裏的任務,線程和隊列一一對應,好比A線程負責處理A隊列裏的任務。可是有的線程會先把本身隊列裏的任務幹完,而其餘線程對應的隊列裏還有任務等待處理。幹完活的線程與其等着,不如去幫其餘線程幹活,因而它就去其餘線程的隊列裏竊取一個任務來執行。而在這時它們會訪問同一個隊列,因此爲了減小竊取任務線程和被竊取任務線程之間的競爭,一般會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
工做竊取算法的優勢是充分利用線程進行並行計算,並減小了線程間的競爭,其缺點是在某些狀況下仍是存在競爭,好比雙端隊列裏只有一個任務時。而且消耗了更多的系統資源,好比建立多個線程和多個雙端隊列。
上文中已經提到了在Java 8引入了自動並行化的概念。它可以讓一部分Java代碼自動地以並行的方式執行,也就是咱們使用了ForkJoinPool的ParallelStream。
Java 8爲ForkJoinPool添加了一個通用線程池,這個線程池用來處理那些沒有被顯式提交到任何線程池的任務。它是ForkJoinPool類型上的一個靜態元素,它擁有的默認線程數量等於運行計算機上的處理器數量。當調用Arrays類上添加的新方法時,自動並行化就會發生。好比用來排序一個數組的並行快速排序,用來對一個數組中的元素進行並行遍歷。自動並行化也被運用在Java 8新添加的Stream API中。
對於列表中的元素的操做都會以並行的方式執行。forEach方法會爲每一個元素的計算操做建立一個任務,該任務會被前文中提到的ForkJoinPool中的通用線程池處理。以上的並行計算邏輯固然也可使用ThreadPoolExecutor完成,可是就代碼的可讀性和代碼量而言,使用ForkJoinPool明顯更勝一籌。
對於ForkJoinPool通用線程池的線程數量,一般使用默認值就能夠了,即運行時計算機的處理器數量。我這裏提供了一個示例的代碼讓你瞭解jvm所使用的ForkJoinPool的線程數量, 你能夠能夠經過設置系統屬性:-Djava.util.concurrent.ForkJoinPool.common.parallelism=N (N爲線程數量),來調整ForkJoinPool的線程數量,能夠嘗試調整成不一樣的參數來觀察每次的輸出結果。
可是它會將執行forEach自己的線程也做爲線程池中的一個工做線程。所以,即便將ForkJoinPool的通用線程池的線程數量設置爲1,實際上也會有2個工做線程。所以在使用forEach的時候,線程數爲1的ForkJoinPool通用線程池和線程數爲2的ThreadPoolExecutor是等價的。
因此當ForkJoinPool通用線程池實際須要4個工做線程時,能夠將它設置成3,那麼在運行時可用的工做線程就是4了。
1. 當須要處理遞歸分治算法時,考慮使用ForkJoinPool。
2. 仔細設置再也不進行任務劃分的閾值,這個閾值對性能有影響。
3. Java 8中的一些特性會使用到ForkJoinPool中的通用線程池。在某些場合下,須要調整該線程池的默認的線程數量。
上文中咱們已經看到了ParallelStream他強大無比的特性,但這裏咱們就講告訴你ParallelStreams不是萬金油,而是一把雙刃劍,若是錯誤的使用反倒可能傷人傷己.
可能有不少朋友在jdk7用future配合countDownLatch本身實現的這個功能,可是jdk8的朋友基本都會用上面的實現方式,那麼自信深究一下究竟本身用future實現的這個功能和利用jdk8的parallelStream來實現這個功能有什麼不一樣點呢?坑又在哪裏呢?
讓咱們細思思考一下整個功能到底是如何運轉的。首先咱們的集合元素engines 由ParallelStreams並行的去進行map操做(ParallelStreams使用JVM默認的forkJoin框架的線程池由當前線程去執行並行操做).
然而,這裏須要注意的一地方是咱們在調用第三方的api請求是一個響應略慢並且會阻塞操做的一個過程。因此在某時刻全部線程都會調用 get() 方法而且在那裏等待結果返回.
再回過頭仔細思考一下這個功能的實現過程是咱們一開始想要的嗎?咱們是在同一時間等待全部的結果,而不是遍歷這個列表按順序等待每一個回答.然而,因爲ForkJoinPool workders的存在,這樣平行的等待相對於使用主線程的等待會產生的一種反作用.
如今ForkJoin pool (關於forkjion的更多實現你能夠去搜索引擎中去看一下他的具體實現方式) 的實現是: 它並不會由於產生了新的workers而抵消掉阻塞的workers。那麼在某個時間全部 ForkJoinPool.common() 的線程都會被用光.也就是說,下一次你調用這個查詢方法,就可能會在一個時間與其餘的parallel stream同時運行,而致使第二個任務的性能大大受損。或者說,例如你在這個功能裏是用來快速返回調用的第三方api的,而在其餘的功能裏是用於一些簡單的數據並行計算的,可是假如你先調用了這個功能,同一時間以後調用計算的函數,那麼這裏forkjionPool的實現會讓你計算的函數大打折扣.
不過也不要急着去吐槽ForkJoinPool的實現,在不一樣的狀況下你能夠給它一個ManagedBlocker實例而且確保它知道在一個阻塞調用中應該何時去抵消掉卡住的workers.如今有意思的一點是,在一個parallel stream處理中並不必定是阻塞調用會拖延程序的性能。任何被用於映射在一個集合上的長時間運行的函數都會產生一樣的問題.
正如咱們上面那個列子的狀況分析得知,lambda的執行並非瞬間完成的,全部使用parallel streams的程序都有可能成爲阻塞程序的源頭,而且在執行過程當中程序中的其餘部分將沒法訪問這些workers,這意味着任何依賴parallel streams的程序在什麼別的東西佔用着common ForkJoinPool時將會變得不可預知而且暗藏危機.
此外,parallelStream是並行操做,不是線程安全的,那麼是否是在其中的進行的非原子操做都要加鎖呢?
答案是:paralleStream的forEach接口確實不能保證同步,同時也提出瞭解決方案:使用collect和reduce接口,Collections框架提供了同步的包裝,使得其中的操做線程安全。
即代碼中的:
若是你正在寫一個其餘地方都是單線程的程序而且準確地知道何時你應該要使用parallel streams,這樣的話你可能會以爲這個問題有一點膚淺。然而,咱們不少人是在處理web應用、各類不一樣的框架以及重量級應用服務。一個服務器是怎樣被設計成一個能夠支持多種獨立應用的主機的?誰知道呢,給你一個能夠並行的卻不能控制輸入的parallel stream.
很抱歉,請原諒我用的標註[怎麼正確使用parallelStream],由於目前爲止我也沒有發現一個好的方式來讓我真正的正確使用parallelStream.下面的網上寫的兩種方式:
一種方式是限制ForkJoinPool提供的並行數。能夠經過使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=1 來限制線程池的大小爲1。再也不從並行化中獲得好處能夠杜絕錯誤的使用它(其實這個方式仍是有點搞笑的,既然這樣搞那我還不如不去使用並行流)。
另外一種方式就是,一個被稱爲工做區的可讓ForkJoinPool平行放置的 parallelStream() 實現。不幸的是如今的JDK尚未實現。
Parallel streams 是沒法預測的,並且想要正確地使用它有些棘手。幾乎任何parallel streams的使用都會影響程序中無關部分的性能,並且是一種沒法預測的方式。。可是在調用stream.parallel() 或者parallelStream()時候在個人代碼裏以前我仍然會從新審視一遍他給個人程序究竟會帶來什麼問題,他能有多大的提高,是否有使用他的意義.
上面咱們也看到了parallelStream所帶來的隱患和好處,那麼,在從stream和parallelStream方法中進行選擇時,咱們能夠考慮如下幾個問題:
1. 是否須要並行?
2. 任務之間是不是獨立的?是否會引發任何競態條件?
3. 結果是否取決於任務的調用順序?
對於問題1,在回答這個問題以前,你須要弄清楚你要解決的問題是什麼,數據量有多大,計算的特色是什麼?並非全部的問題都適合使用併發程序來求解,好比當數據量不大時,順序執行每每比並行執行更快。畢竟,準備線程池和其它相關資源也是須要時間的。可是,當任務涉及到I/O操做而且任務之間不互相依賴時,那麼並行化就是一個不錯的選擇。一般而言,將這類程序並行化以後,執行速度會提高好幾個等級。
對於問題2,若是任務之間是獨立的,而且代碼中不涉及到對同一個對象的某個狀態或者某個變量的更新操做,那麼就代表代碼是能夠被並行化的。
對於問題3,因爲在並行環境中任務的執行順序是不肯定的,所以對於依賴於順序的任務而言,並行化也許不能給出正確的結果。
參考文章:https://blog.csdn.net/u011001723/article/details/52794455