[譯] 一文帶你玩轉 Java8 Stream 流,今後操做集合 So Easy

本文翻譯自 winterbe.com/posts/2014/…html

做者: @Winterbejava

歡迎關注我的微信公衆號: 小哈學Java,便可免費無套路領取10G面試學習資料哦,文末資料截圖。面試

我的網站: www.exception.site/java8/java8…shell

Stream 流能夠說是 Java8 新特性中用起來最爽的一個功能了,有了它,今後操做集合告別繁瑣的 for 循環。可是還有不少小夥伴對 Stream 流不是很瞭解。今天就經過這篇 @Winterbe 的譯文,一塊兒深刻了解下如何使用它吧。編程

目錄

1、Stream 流是如何工做的?api

2、不一樣類型的 Stream 流bash

3、Stream 流的處理順序微信

4、中間操做順序這麼重要?多線程

5、數據流複用問題oracle

6、高級操做

  • 6.1 Collect
  • 6.2 FlatMap
  • 6.3 Reduce

7、並行流

8、結語


當我第一次閱讀 Java8 中的 Stream API 時,說實話,我很是困惑,由於它的名字聽起來與 Java I0 框架中的 InputStreamOutputStream 很是相似。可是實際上,它們徹底是不一樣的東西。

Java8 Stream 使用的是函數式編程模式,如同它的名字同樣,它能夠被用來對集合進行鏈狀流式的操做。

本文就將帶着你如何使用 Java 8 不一樣類型的 Stream 操做。同時您還將瞭解流的處理順序,以及不一樣順序的流操做是如何影響運行時性能的。

咱們還將學習終端操做 API reducecollect 以及flatMap的詳細介紹,最後咱們再來深刻的探討一下 Java8 並行流。

注意:若是您還不熟悉 Java 8 lambda 表達式,函數式接口以及方法引用,您能夠先閱讀一下小哈的另外一篇譯文 《Java8 新特性教程》

接下來,就讓咱們進入正題吧!

1、Stream 流是如何工做的?

流表示包含着一系列元素的集合,咱們能夠對其作不一樣類型的操做,用來對這些元素執行計算。聽上去可能有點拗口,讓咱們用代碼說話:

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream() // 建立流
    .filter(s -> s.startsWith("c")) // 執行過濾,過濾出以 c 爲前綴的字符串
    .map(String::toUpperCase) // 轉換成大寫
    .sorted() // 排序
    .forEach(System.out::println); // for 循環打印

// C1
// C2
複製代碼

咱們能夠對流進行中間操做或者終端操做。小夥伴們可能會疑問?什麼是中間操做?什麼又是終端操做?

Stream中間操做,終端操做
Stream中間操做,終端操做

  • :中間操做會再次返回一個流,因此,咱們能夠連接多箇中間操做,注意這裏是不用加分號的。上圖中的filter 過濾,map 對象轉換,sorted 排序,就屬於中間操做。
  • :終端操做是對流操做的一個結束動做,通常返回 void 或者一個非流的結果。上圖中的 forEach循環 就是一個終止操做。

看完上面的操做,感受是否是很像一個流水線式操做呢。

實際上,大部分流操做都支持 lambda 表達式做爲參數,正確理解,應該說是接受一個函數式接口的實現做爲參數。

2、不一樣類型的 Stream 流

咱們能夠從各類數據源中建立 Stream 流,其中以 Collection 集合最爲常見。如 ListSet 均支持 stream() 方法來建立順序流或者是並行流。

並行流是經過多線程的方式來執行的,它可以充分發揮多核 CPU 的優點來提高性能。本文在最後再來介紹並行流,咱們先討論順序流:

Arrays.asList("a1", "a2", "a3")
    .stream() // 建立流
    .findFirst() // 找到第一個元素
    .ifPresent(System.out::println);  // 若是存在,即輸出

// a1
複製代碼

在集合上調用stream()方法會返回一個普通的 Stream 流。可是, 您大可沒必要刻意地建立一個集合,再經過集合來獲取 Stream 流,您還能夠經過以下這種方式:

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1
複製代碼

例如上面這樣,咱們能夠經過 Stream.of() 從一堆對象中建立 Stream 流。

除了常規對象流以外,Java 8還附帶了一些特殊類型的流,用於處理原始數據類型intlong以及double。說道這裏,你可能已經猜到了它們就是IntStreamLongStream還有DoubleStream

其中,IntStreams.range()方法還能夠被用來取代常規的 for 循環, 以下所示:

IntStream.range(1, 4)
    .forEach(System.out::println); // 至關於 for (int i = 1; i < 4; i++) {}

// 1
// 2
// 3
複製代碼

上面這些原始類型流的工做方式與常規對象流基本是同樣的,但仍是略微存在一些區別:

  • 原始類型流使用其獨有的函數式接口,例如IntFunction代替FunctionIntPredicate代替Predicate

  • 原始類型流支持額外的終端聚合操做,sum()以及average(),以下所示:

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1) // 對數值中的每一個對象執行 2*n + 1 操做
    .average() // 求平均值
    .ifPresent(System.out::println);  // 若是值不爲空,則輸出
// 5.0
複製代碼

可是,偶爾咱們也有這種需求,須要將常規對象流轉換爲原始類型流,這個時候,中間操做 mapToInt()mapToLong() 以及mapToDouble就派上用場了:

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1)) // 對每一個字符串元素從下標1位置開始截取
    .mapToInt(Integer::parseInt) // 轉成 int 基礎類型類型流
    .max() // 取最大值
    .ifPresent(System.out::println);  // 不爲空則輸出

// 3
複製代碼

若是說,您須要將原始類型流裝換成對象流,您可使用 mapToObj()來達到目的:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i) // for 循環 1->4, 拼接前綴 a
    .forEach(System.out::println); // for 循環打印

// a1
// a2
// a3
複製代碼

下面是一個組合示例,咱們將雙精度流首先轉換成 int 類型流,而後再將其裝換成對象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue) // double 類型轉 int
    .mapToObj(i -> "a" + i) // 對值拼接前綴 a
    .forEach(System.out::println); // for 循環打印

// a1
// a2
// a3
複製代碼

3、Stream 流的處理順序

上小節中,咱們已經學會了如何建立不一樣類型的 Stream 流,接下來咱們再深刻了解下數據流的執行順序。

在討論處理順序以前,您須要明確一點,那就是中間操做的有個重要特性 —— 延遲性。觀察下面這個沒有終端操做的示例代碼:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });
複製代碼

執行此代碼段時,您可能會認爲,將依次打印 "d2", "a2", "b1", "b3", "c" 元素。然而當你實際去執行的時候,它不會打印任何內容。

爲何呢?

緣由是:當且僅當存在終端操做時,中間操做操做纔會被執行。

是否是不信?接下來,對上面的代碼添加 forEach終端操做:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));
複製代碼

再次執行,咱們會看到輸出以下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c
複製代碼

輸出的順序可能會讓你很驚訝!你腦海裏確定會想,應該是先將全部 filter 前綴的字符串打印出來,接着纔會打印 forEach 前綴的字符串。

事實上,輸出的結果倒是隨着鏈條垂直移動的。好比說,當 Stream 開始處理 d2 元素時,它實際上會在執行完 filter 操做後,再執行 forEach 操做,接着纔會處理第二個元素。

是否是很神奇?爲何要設計成這樣呢?

緣由是出於性能的考慮。這樣設計能夠減小對每一個元素的實際操做數,看完下面代碼你就明白了:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A"); // 過濾出以 A 爲前綴的元素
    });

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
複製代碼

終端操做 anyMatch()表示任何一個元素以 A 爲前綴,返回爲 true,就中止循環。因此它會從 d2 開始匹配,接着循環到 a2 的時候,返回爲 true ,因而中止循環。

因爲數據流的鏈式調用是垂直執行的,map這裏只須要執行兩次。相對於水平執行來講,map會執行儘量少的次數,而不是把全部元素都 map 轉換一遍。

4、中間操做順序這麼重要?

下面的例子由兩個中間操做mapfilter,以及一個終端操做forEach組成。讓咱們再來看看這些操做是如何執行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A"); // 過濾出以 A 爲前綴的元素
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循環輸出

// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
複製代碼

學習了上面一小節,您應該已經知道了,mapfilter會對集合中的每一個字符串調用五次,而forEach卻只會調用一次,由於只有 "a2" 知足過濾條件。

若是咱們改變中間操做的順序,將filter移動到鏈頭的最開始,就能夠大大減小實際的執行次數:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s)
        return s.startsWith("a"); // 過濾出以 a 爲前綴的元素
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循環輸出

// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
複製代碼

如今,map僅僅只需調用一次,性能獲得了提高,這種小技巧對於流中存在大量元素來講,是很是頗有用的。

接下來,讓咱們對上面的代碼再添加一箇中間操做sorted

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2); // 排序
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a"); // 過濾出以 a 爲前綴的元素
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase(); // 轉大寫
    })
    .forEach(s -> System.out.println("forEach: " + s)); // for 循環輸出
複製代碼

sorted 是一個有狀態的操做,由於它須要在處理的過程當中,保存狀態以對集合中的元素進行排序。

執行上面代碼,輸出以下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2
複製代碼

咦咦咦?此次怎麼又不是垂直執行了。你須要知道的是,sorted是水平執行的。所以,在這種狀況下,sorted會對集合中的元素組合調用八次。這裏,咱們也能夠利用上面說道的優化技巧,將 filter 過濾中間操做移動到開頭部分:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
複製代碼

從上面的輸出中,咱們看到了 sorted從未被調用過,由於通過filter事後的元素已經減小到只有一個,這種狀況下,是不用執行排序操做的。所以性能被大大提升了。

5、數據流複用問題

Java8 Stream 流是不能被複用的,一旦你調用任何終端操做,流就會關閉:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception
複製代碼

當咱們對 stream 調用了 anyMatch 終端操做之後,流即關閉了,再調用 noneMatch 就會拋出異常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)
複製代碼

爲了克服這個限制,咱們必須爲咱們想要執行的每一個終端操做建立一個新的流鏈,例如,咱們能夠經過 Supplier 來包裝一下流,經過 get() 方法來構建一個新的 Stream 流,以下所示:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok
複製代碼

經過構造一個新的流,來避開流不能被複用的限制, 這也是取巧的一種方式。

6、高級操做

Streams 支持的操做很豐富,除了上面介紹的這些比較經常使用的中間操做,如filtermap(參見Stream Javadoc)外。還有一些更復雜的操做,如collectflatMap以及reduce。接下來,就讓咱們學習一下:

本小節中的大多數代碼示例均會使用如下 List<Person>進行演示:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

// 構建一個 Person 集合
List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));
複製代碼

6.1 Collect

collect 是一個很是有用的終端操做,它能夠將流中的元素轉變成另一個不一樣的對象,例如一個ListSetMap。collect 接受入參爲Collector(收集器),它由四個不一樣的操做組成:供應器(supplier)、累加器(accumulator)、組合器(combiner)和終止器(finisher)。

這些都是個啥?別慌,看上去很是複雜的樣子,但好在大多數狀況下,您並不須要本身去實現收集器。由於 Java 8經過Collectors類內置了各類經常使用的收集器,你直接拿來用就好了。

讓咱們先從一個很是常見的用例開始:

List<Person> filtered =
    persons
        .stream() // 構建流
        .filter(p -> p.name.startsWith("P")) // 過濾出名字以 P 開頭的
        .collect(Collectors.toList()); // 生成一個新的 List

System.out.println(filtered);    // [Peter, Pamela]
複製代碼

你也看到了,從流中構造一個 List 異常簡單。若是說你須要構造一個 Set 集合,只須要使用Collectors.toSet()就能夠了。

接下來這個示例,將會按年齡對全部人進行分組:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age)); // 以年齡爲 key,進行分組

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
複製代碼

除了上面這些操做。您還能夠在流上執行聚合操做,例如,計算全部人的平均年齡:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age)); // 聚合出平均年齡

System.out.println(averageAge);     // 19.0
複製代碼

若是您還想獲得一個更全面的統計信息,摘要收集器能夠返回一個特殊的內置統計對象。經過它,咱們能夠簡單地計算出最小年齡、最大年齡、平均年齡、總和以及總數量。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age)); // 生成摘要統計

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
複製代碼

下一個這個示例,能夠將全部人名鏈接成一個字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18) // 過濾出年齡大於等於18的
    .map(p -> p.name) // 提取名字
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age.")); // 以 In Germany 開頭,and 鏈接各元素,再以 are of legal age. 結束

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
複製代碼

鏈接收集器的入參接受分隔符,以及可選的前綴以及後綴。

對於如何將流轉換爲 Map集合,咱們必須指定 Map 的鍵和值。這裏須要注意,Map 的鍵必須是惟一的,不然會拋出IllegalStateException 異常。

你能夠選擇傳遞一個合併函數做爲額外的參數來避免發生這個異常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2)); // 對於一樣 key 的,將值拼接

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
複製代碼

既然咱們已經知道了這些強大的內置收集器,接下來就讓咱們嘗試構建自定義收集器吧。

好比說,咱們但願將流中的全部人轉換成一個字符串,包含全部大寫的名稱,並以|分割。爲了達到這種效果,咱們須要經過Collector.of()建立一個新的收集器。同時,咱們還須要傳入收集器的四個組成部分:供應器、累加器、組合器和終止器。

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier 供應器
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator 累加器
        (j1, j2) -> j1.merge(j2),               // combiner 組合器
        StringJoiner::toString);                // finisher 終止器

String names = persons
    .stream()
    .collect(personNameCollector); // 傳入自定義的收集器

System.out.println(names);  // MAX | PETER | PAMELA | DAVID
複製代碼

因爲Java 中的字符串是 final 類型的,咱們須要藉助輔助類StringJoiner,來幫咱們構造字符串。

最開始供應器使用分隔符構造了一個StringJointer

累加器用於將每一個人的人名轉大寫,而後加到StringJointer中。

組合器將兩個StringJointer合併爲一個。

最終,終結器從StringJointer構造出預期的字符串。

6.2 FlatMap

上面咱們已經學會了如經過map操做, 將流中的對象轉換爲另外一種類型。可是,Map只能將每一個對象映射到另外一個對象。

若是說,咱們想要將一個對象轉換爲多個其餘對象或者根本不作轉換操做呢?這個時候,flatMap就派上用場了。

FlatMap 可以將流的每一個元素, 轉換爲其餘對象的流。所以,每一個對象能夠被轉換爲零個,一個或多個其餘對象,並以流的方式返回。以後,這些流的內容會被放入flatMap返回的流中。

在學習如何實際操做flatMap以前,咱們先新建兩個類,用來測試:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}
複製代碼

接下來,經過咱們上面學習到的流知識,來實例化一些對象:

List<Foo> foos = new ArrayList<>();

// 建立 foos 集合
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// 建立 bars 集合
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
複製代碼

咱們建立了包含三個foo的集合,每一個foo中又包含三個 bar

flatMap 的入參接受一個返回對象流的函數。爲了處理每一個foo中的bar,咱們須要傳入相應 stream 流:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
複製代碼

如上所示,咱們已成功將三個 foo對象的流轉換爲九個bar對象的流。

最後,上面的這段代碼能夠簡化爲單一的流式操做:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
複製代碼

flatMap也可用於Java8引入的Optional類。OptionalflatMap操做返回一個Optional或其餘類型的對象。因此它能夠用於避免繁瑣的null檢查。

接下來,讓咱們建立層次更深的對象:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}
複製代碼

爲了處理從 Outer 對象中獲取最底層的 foo 字符串,你須要添加多個null檢查來避免可能發生的NullPointerException,以下所示:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}
複製代碼

咱們還可使用OptionalflatMap操做,來完成上述相同功能的判斷,且更加優雅:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);
複製代碼

若是不爲空的話,每一個flatMap的調用都會返回預期對象的Optional包裝,不然返回爲nullOptional包裝類。

筆者補充:關於 Optional 可參見我另外一篇譯文《Java8 新特性如何防止空指針異常》

6.3 Reduce

規約操做能夠將流的全部元素組合成一個結果。Java 8 支持三種不一樣的reduce方法。第一種將流中的元素規約成流中的一個元素。

讓咱們看看如何使用這種方法,來篩選出年齡最大的那我的:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela
複製代碼

reduce方法接受BinaryOperator積累函數。該函數其實是兩個操做數類型相同的BiFunctionBiFunction功能和Function同樣,可是它接受兩個參數。示例代碼中,咱們比較兩我的的年齡,來返回年齡較大的人。

第二種reduce方法接受標識值和BinaryOperator累加器。此方法可用於構造一個新的 Person,其中包含來自流中全部其餘人的聚合名稱和年齡:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
複製代碼

第三種reduce方法接受三個參數:標識值,BiFunction累加器和類型的組合器函數BinaryOperator。因爲初始值的類型不必定爲Person,咱們可使用這個歸約函數來計算全部人的年齡總和:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76
複製代碼

結果爲76,可是內部究竟發生了什麼呢?讓咱們再打印一些調試日誌:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
複製代碼

你能夠看到,累加器函數完成了全部工做。它首先使用初始值0和第一我的年齡相加。接下來的三步中sum會持續增長,直到76。

等等?好像哪裏不太對!組合器歷來都沒有調用過啊?

咱們以並行流的方式運行上面的代碼,看看日誌輸出:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
複製代碼

並行流的執行方式徹底不一樣。這裏組合器被調用了。實際上,因爲累加器被並行調用,組合器須要被用於計算部分累加值的總和。

讓咱們在下一章深刻探討並行流。

7、並行流

流是能夠並行執行的,當流中存在大量元素時,能夠顯著提高性能。並行流底層使用的ForkJoinPool, 它由ForkJoinPool.commonPool()方法提供。底層線程池的大小最多爲五個 - 具體取決於 CPU 可用核心數:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
複製代碼

在個人機器上,公共池初始化默認值爲 3。你也能夠經過設置如下JVM參數能夠減少或增長此值:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
複製代碼

集合支持parallelStream()方法來建立元素的並行流。或者你能夠在已存在的數據流上調用中間方法parallel(),將串行流轉換爲並行流,這也是能夠的。

爲了詳細瞭解並行流的執行行爲,咱們在下面的示例代碼中,打印當前線程的信息:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
複製代碼

經過日誌輸出,咱們能夠對哪一個線程被用於執行流式操做,有個更深刻的理解:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
複製代碼

如您所見,並行流使用了全部的ForkJoinPool中的可用線程來執行流式操做。在持續的運行中,輸出結果可能有所不一樣,由於所使用的特定線程是非特定的。

讓咱們經過添加中間操做sort來擴展上面示例:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
複製代碼

運行代碼,輸出結果看上去有些奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
複製代碼

貌似sort只在主線程上串行執行。可是實際上,並行流中的sort在底層使用了Java8中新的方法Arrays.parallelSort()。如 javadoc官方文檔解釋的,這個方法會按照數據長度來決定以串行方式,或者以並行的方式來執行。

若是指定數據的長度小於最小數值,它則使用相應的Arrays.sort方法來進行排序。

回到上小節 reduce的例子。咱們已經發現了組合器函數只在並行流中調用,而不不會在串行流中被調用。

讓咱們來實際觀察一下涉及到哪一個線程:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });
複製代碼

經過控制檯日誌輸出,累加器和組合器均在全部可用的線程上並行執行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]
複製代碼

總之,你須要記住的是,並行流對含有大量元素的數據流提高性能極大。可是你也須要記住並行流的一些操做,例如reducecollect操做,須要額外的計算(如組合操做),這在串行執行時是並不須要。

此外,咱們也瞭解了,全部並行流操做都共享相同的 JVM 相關的公共ForkJoinPool。因此你可能須要避免寫出一些又慢又卡的流式操做,這頗有可能會拖慢你應用中,嚴重依賴並行流的其它部分代碼的性能。

8、結語

Java8 Stream 流編程指南到這裏就結束了。若是您有興趣瞭解更多有關 Java 8 Stream 流的相關信息,我建議您使用 Stream Javadoc 閱讀官方文檔。若是您想了解有關底層機制的更多信息,您也能夠閱讀 Martin Fowlers 關於 Collection Pipelines 的文章。

最後,祝您學習愉快!

贈送 10G 面試&學習福利資源

獲取方式: 關注微信公衆號: 小哈學Java, 後臺回覆"666",既可免費無套路獲取資源連接,下面是目錄以及部分截圖:

關注微信公衆號【小哈學Java】,回覆「666」,便可免費無套路領取哦
關注微信公衆號【小哈學Java】,回覆「666」,便可免費無套路領取哦

歡迎關注微信公衆號: 小哈學Java

小哈學Java,關注領取10G面試學習資料哦
小哈學Java,關注領取10G面試學習資料哦
相關文章
相關標籤/搜索