前言
「Java8系列」神奇的函數式接口 繼上兩篇以後,本文已經java8系列的第三篇了。本篇文章比較長,但我但願你們都能認真讀完。讀不完能夠先收藏,在找時間讀。沒看過前兩篇的能夠點上邊的連接看看,前兩篇文章算是對是用Stream鋪墊的一點基礎吧,不過不看也能夠學會使用Stream,但看了會有助於更好的理解和使用。在沒有深刻了解以前,我覺得Stream也是數據的載體,但後來發現並非。那麼它究竟是什麼?聽我慢慢道來。git
什麼是Stream?
Stream它並非一個容器,它只是對容器的功能進行了加強,添加了不少便利的操做,例如查找、過濾、分組、排序等一系列的操做。而且有串行、並行兩種執行模式,並行模式充分的利用了多核處理器的優點,使用fork/join框架進行了任務拆分,同時提升了執行速度。簡而言之,Stream就是提供了一種高效且易於使用的處理數據的方式。github
- 特色:
- Stream本身不會存儲元素。
- Stream的操做不會改變源對象。相反,他們會返回一個持有結果的新Stream。
- Stream 操做是延遲執行的。它會等到須要結果的時候才執行。也就是執行終端操做的時候。
- 圖解: 一個Stream的操做就如上圖,在一個管道內,分爲三個步驟,第一步是建立Stream,從集合、數組中獲取一個流,第二步是中間操做鏈,對數據進行處理。第三步是終端操做,用來執行中間操做鏈,返回結果。
怎麼建立Stream?
- 由集合建立: Java8 中的 Collection 接口被擴展,提供了兩個獲取流的方法,這兩個方法是default方法,也就是說全部實現Collection接口的接口都不須要實現就能夠直接使用:
- default Stream<e> stream() : 返回一個順序流。
- default Stream<e> parallelStream() : 返回一個並行流。<br>
例如: List<integer> integerList = new ArrayList<>(); integerList.add(1); integerList.add(2); Stream<integer> stream = integerList.stream(); Stream<integer> stream1 = integerList.parallelStream();
- 由數組建立: Java8 中的 Arrays 的靜態方法 stream() 能夠獲取數組流:
- static <t> Stream<t> stream(T[] array): 返回一個流
- 重載形式,可以處理對應基本類型的數組: public static IntStream stream(int[] array) public static LongStream stream(long[] array) public static DoubleStream stream(double[] array)<br>
例如: int[] intArray = {1,2,3}; IntStream stream = Arrays.stream(intArray);
- 由值建立: 可使用靜態方法 Stream.of(), 經過顯示值 建立一個流。它能夠接收任意數量的參數。
- public static<t> Stream<t> of(T... values) : 返回一個流。<br>
例如: Stream<integer> integerStream = Stream.of(1, 2, 3, 4, 5, 6, 7, 8);
- 由函數建立:建立無限流 可使用靜態方法 Stream.iterate() 和 Stream.generate()建立無限流。
- 迭代 public static<t> Stream<t> iterate(final T seed, final UnaryOperator<t> f)
- 生成 public static<t> Stream<t> generate(Supplier<t> s)
例如: Stream.generate(Math::random).limit(5).forEach(System.out::print); List<integer> collect = Stream.iterate(0,i -> i + 1).limit(5).collect(Collectors.toList());
注意:使用無限流必定要配合limit截斷,否則會無限制建立下去。
Stream的中間操做
若是Stream只有中間操做是不會執行的,當執行終端操做的時候纔會執行中間操做,這種方式稱爲延遲加載或惰性求值。多箇中間操做組成一箇中間操做鏈,只有當執行終端操做的時候纔會執行一遍中間操做鏈,具體是由於什麼咱們在後面再說明。下面看下Stream有哪些中間操做。算法
- Stream
<t>
distinct(): 去重,經過流所生成元素的 hashCode() 和 equals() 去除重複元素。 - Stream
<t>
filter(Predicate<!--? super T--> predicate): Predicate函數在上一篇當中咱們已經講過,它是斷言型接口,因此filter方法中是接收一個和Predicate函數對應Lambda表達式,返回一個布爾值,從流中過濾某些元素。 - Stream
<t>
sorted(Comparator<!--? super T--> comparator): 指定比較規則進行排序。 - Stream
<t>
limit(long maxSize): 截斷流,使其元素不超過給定數量。若是元素的個數小於maxSize,那就獲取全部元素。 - Stream
<t>
skip(long n): 跳過元素,返回一個扔掉了前 n 個元素的流。若流中元素不足 n 個,則返回一個空流。與 limit(n) 互補。 - Stream
<r>
map(Function<!--? super T, ? extends R--> mapper): 接收一個Function函數做爲參數,該函數會被應用到每一個元素上,並將其映射成一個新的元素。也就是轉換操做,map還有三個應用於具體類型方法,分別是:mapToInt,mapToLong和mapToDouble。這三個方法也比較好理解,好比mapToInt就是把原始Stream轉換成一個新的Stream,這個新生成的Stream中的元素都是int類型。這三個方法能夠免除自動裝箱/拆箱的額外消耗。 - Stream
<r>
flatMap(Function<!--? super T, ? extends Stream<? extends R-->> mapper): 接收一個Function函數做爲參數,將流中的每一個值都轉換成另外一個流,而後把全部流鏈接成一個流。flatMap也有三個應用於具體類型的方法,分別是:flatMapToInt、flatMapToLong、flatMapToDouble,其做用於map的三個衍生方法相同。
Stream的終端操做
終端操做執行中間操做鏈,並返回結果。終端操做咱們就不一一介紹了,只介紹一下經常使用的操做。詳細可看java.util.stream.Stream接口中的方法。segmentfault
- void forEach(Consumer<!--? super T--> action): 內部迭代(須要用戶去作迭代,稱爲外部迭代。相反,Stream API使用內部迭代幫你把迭代作了)
users.stream().forEach(user -> System.out.println(user.getName()));
- <r, a> R collect(Collector<!--? super T, A, R--> collector): 收集、將流轉換爲其餘形式,好比轉換成List、Set、Map。collect方法是用Collector做爲參數,Collector接口中方法的實現決定了如何對流執行收集操做(如收集到 List、Set、Map)。可是 Collectors 實用類提供了不少靜態方法,能夠方便地建立常見收集器實例。例舉一些經常使用的:
List<user> users = Lists.newArrayList(); users.add(new User(15, "A", ImmutableList.of("1元", "5元"))); users.add(new User(25, "B", ImmutableList.of("10元", "50元"))); users.add(new User(21, "C", ImmutableList.of("100元"))); //收集名稱到List List<string> nameList = users.stream().map(User::getName).collect(Collectors.toList()); //收集名稱到List Set<string> nameSet = users.stream().map(User::getName).collect(Collectors.toSet()); //收集到map,名字做爲key,user對象做爲value Map<string, user> userMap = users.stream() .collect(Collectors.toMap(User::getName, Function.identity(), (k1, k2) -> k2));
- 其餘終端操做:
- boolean allMatch(Predicate<!--? super T--> predicate); 檢查是否匹配全部元素。
- boolean anyMatch(Predicate<!--? super T--> predicate); 檢查是否至少匹配一個元素。
- boolean noneMatch(Predicate<!--? super T--> predicate); 檢查是否沒有匹配全部元素。
- Optional
<t>
findFirst(); 返回當前流中的第一個元素。 - Optional
<t>
findAny(); 返回當前流中的任意元素。 - long count(); 返回流中元素總數。
- Optional
<t>
max(Comparator<!--? super T--> comparator); 返回流中最大值。 - Optional
<t>
min(Comparator<!--? super T--> comparator); 返回流中最小值。 - T reduce(T identity, BinaryOperator
<t>
accumulator); 能夠將流中元素反覆結合起來,獲得一個值。 返回 T。這是一個歸約操做。
Fork/Join框架
上面咱們提到過,說Stream的並行模式使用了Fork/Join框架,這裏簡單說下Fork/Join框架是什麼?Fork/Join框架是java7中加入的一個並行任務框架,能夠將任務拆分爲多個小任務,每一個小任務執行完的結果在合併成爲一個結果。在任務的執行過程當中使用工做竊取(work-stealing)算法,減小線程之間的競爭。api
- Fork/Join圖解
- 工做竊取圖解
Stream是怎麼實現的
先看下總體類圖:藍色箭頭表明繼承,綠色箭頭表明實現,紅色箭頭表明內部類。 實際上Stream只有兩種操做,中間操做、終端操做,中間操做只是一種標記,只有終端操做纔會實際觸發執行。因此Stream流水線式的操做大體應該是用某種方式記錄中間操做,只有調用終端操做纔會將全部的中間操做疊加在一塊兒在一次迭代中所有執行。這裏只作簡單的介紹,想詳細瞭解的能夠參考下面的參考資料中的連接。數組
- 操做怎麼記錄?<br> Stream的操做記錄是經過ReferencePipeline記錄的,ReferencePipeline有三個內部類Head、StatelessOp、StatefulOp,Stream中使用Stage的概念來描述一個完整的操做,並用某種實例化後的ReferencePipeline來表明Stage,Head用於表示第一個Stage,即調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage裏不包含任何操做,StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage,對應於無狀態和有狀態的中間操做。
- 操做怎麼疊加?<br> 操做是記錄完了,可是前面的Stage並不知道後面Stage到底執行了哪一種操做,以及回調函數是哪一種形式。這就須要有某種協議來協調相鄰Stage之間的調用關係。 這種協議由Sink接口完成,Sink接口包含的方法以下表所示:
- void begin(long size),開始遍歷元素以前調用該方法,通知Sink作好準備。
- void end(),全部元素遍歷完成以後調用,通知Sink沒有更多的元素了。
- boolean cancellationRequested(),是否能夠結束操做,可讓短路操做盡早結束。
- void accept(T t),遍歷元素時調用,接受一個待處理元素,並對元素進行處理。Stage把本身包含的操做和回調方法封裝到該方法裏,前一個Stage只須要調用當前Stage.accept(T t)方法就好了。<br>
每一個Stage都會將本身的操做封裝到一個Sink裏,前一個Stage只需調用後一個Stage的accept()方法便可,並不須要知道其內部是如何處理的。有了Sink對操做的包裝,Stage之間的調用問題就解決了,執行時只須要從流水線的head開始對數據源依次調用每一個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能夠了。app
- 操做怎麼執行? Sink完美封裝了Stream每一步操做,並給出了[處理->轉發]的模式來疊加操做。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啓動執行。是什麼啓動這一連串的操做呢?也許你已經想到了啓動的原始動力就是結束操做(Terminal Operation),一旦調用某個結束操做,就會觸發整個流水線的執行。
參考資料
https://www.ibm.com/developerworks/cn/java/j-lo-java8streamapi/less
http://www.javashuo.com/article/p-qetteurv-kt.html
https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/6-Stream%20Pipelines.md