Java基礎系列-Java8 Stream 簡明教程

Stream 是 Java8 中一個重大的更新。Stream 爲Java 真正帶來了函數式編程的特性。對函數式編程不瞭解的人每每不知道如何動手,經過Benjamin 的教程來完整的學習一下 Java 的這個特性,學會這些技能會讓你的代碼看起來更酷。html


這是一個經過代碼示例來深度講解 Java8 Stream 的教程。當我第一次看到 Stream 的 API 時,我感到很迷惑,由於這個名稱聽起來和 Java I/O 包中的 InputStreamOutputStream 有關係。可是實際上它們是徹底不一樣的東西。 Stream 是 Monad(函數式編程),它爲 Java 帶來了函數式編程的特性,下面是維基百科對 Monad 的解釋:java

In functional programming, a monad is a structure that represents computations defined as sequences of steps. A type with a monad structure defines what it means to chain operations, or nest functions of that type together.shell

這份教程會講解 Java8 Stream 的原理以及不一樣操做之間的區別。你將會學習到 Stream 操做的處理順序以及不一樣的順序對性能的影響。還會對經常使用的操做如 ReducecollectflatMap 進行詳細講解。在教程的最後會說明並行 Stream 的優勢。編程

注:Stream 中的 API 稱之爲操做api

若是你還不熟悉 Java8 的 lambda 表達式、函數式接口以及方法引用,能夠先去讀一下這份Java8 教程數組

Stream 原理

一個 Stream 表明着一組元素以及支持對這些元素進行計算的不一樣操做微信

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2
複製代碼

Stream 操做分爲中間操做終端操做。中間操做會返回一個 Stream 對象,因此能夠對中間操做進行鏈式操做。終端操做會返回一個 void 或者非 Stream 的對象。在上面的例子中,filtermapsorted 都是中間操做,而 forEach 則是一個終端操做。Stream 完整的操做 API 能夠查看文檔。Stream 鏈式操做能夠查看上面的例子,鏈式操做也稱之爲管道操做數據結構

許多 Stream 操做接受 Lambda 或者函數式接口來限定操做範圍。這些操做中絕大多數都必須是non-interfering無狀態的,這是什麼意思呢?oracle

注:在函數式編程中,函數自己是能夠做爲參數的app

non-interfering 表示方法在執行的過程當中不會改動流中原數據,好比在前面的例子中沒有 lambda 表達式修改了 myList 中的元素。

無狀態表示方法屢次執行的結果是肯定的,好比前面的例子中沒有 lambda 表達式會依賴在執行過程當中會被修改的外部做用域中的變量。

不一樣種類的 Stream

Stream 能夠經過多種方式建立,尤爲是各類容器對象。List 和 Set 都支持 stream()parallelStream() 方法來建立串行或者並行的 Stream。並行 Stream 能夠同時運行在多個線程上,在下文會詳細講解,當前先經過串行 Stream 來演示:

Arrays.asList("a1", "a2", "a3")
        .stream()
        .findFirst()
        .ifPresent(System.out::println); //a1
複製代碼

調用 List 的 stream() 方法會返回一個 Stream 對象。可是獲得 Stream 對象不必定要建立 Collection 對象,看下面的代碼:

Stream.of("a1", "a2", "a3")
         .findFirst()
         .ifPresent(System.out.println);
複製代碼

只須要經過 Stream.of() 就能夠把一堆對象建立爲 Stream。

另外在 Java8 還能夠經過 IntStreamLongStreamDoubleStream 等來操做原生數據類型 intlongdouble

IntStream 經過 range() 方法能夠替代 for 循環:

IntStream.range(1,4)
            .forEach(System.out::println);
 // 1
 // 2
 // 3
複製代碼

全部的原生類型均可以和其餘對象同樣使用 Stream,可是全部的原生類型 Stream 都使用專門的 lambda 表達式,好比 int 使用 IntFunction 而不是 Function,使用 IntPredicate 而不是 Predicate

而且原生類型 Stream 還另外支持終端聚合操做 sum() 以及 average():

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0
複製代碼

這些操做在將對象轉化爲原生類型的時候很是有用,反之亦然。出於這個目的,普通 Stream 支持特別的 map 操做,好比 mapToInt()mapToLong()mapToDouble()

Stream.of("a1", "a2", "a3")
    .map(s -> s.substring(1))
    .mapToInt(Integer::parseInt)
    .max()
    .ifPresent(System.out::println);  // 3
複製代碼

原生數據類型能夠經過 mapToObj() 轉化爲對象:

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3
複製代碼

下面這個例子是一個組合操做:double Stream 的元素首先被轉成 int 最後被轉化成 String:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3
複製代碼

處理次序

上文中已經詳細描述瞭如何建立和使用不一樣類型的 Stream,下面會深刻研究 Stream 的操做是如何進行的。

中間操做的一個重要特徵是延遲,看下面這個沒有終端操做的例子:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });
複製代碼

當上面的代碼片斷執行完成的時候,控制檯並無輸出任何東西。這是由於中間操做在有終端操做的時候纔會執行。

給上面的例子加上終端操做 forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));
複製代碼

執行這段代碼會有以下的輸出:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c
複製代碼

輸出結果的順序可能會讓人驚訝。以前你可能會認爲 Stream 中的元素會在一個操做中所有處理完以後纔會進入到下一個操做。但實際的狀況是一個元素在全部的操做執行完成以後纔會輪到下一個元素。"d2" 首先被 filterforEach 的處理,而後 "a2" 纔會被處理。

這樣能夠減小每一個操做實際處理元素的個數,看下面這個例子:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map: d2
// anyMatch: D2
// map: a2
// anyMatch: A2
複製代碼

這個 anyMatch 操做只在輸入元素知足條件的狀況下才會返回 true。在上面的例子中,運行到第二個元素 "a2" 時就會返回 true,而後就會中止處理其餘元素,因此 map 操做也只是執行了兩次,這正是得益於 Stream 的鏈式處理次序。

爲何次序很關鍵

下面的這個例子由兩個中間操做 mapfilter 以及一個終端操做 forEach 組成。再看一下這些操做是如何執行的:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map: d2
// filter: D2
// map: a2
// filter: A2
// forEach: A2
// map: b1
// filter: B1
// map: b3
// filter: B3
// map: c
// filter: C
複製代碼

正如上面的例子所分析,map 和 filter 對每一個字符串各執行了 5 次,而 forEach 僅僅執行了一次。

能夠簡單的調整操做的順序來減小操做執行的總次數,下面的例子中把 filter 操做放到了 map 前面:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// map: a2
// forEach: A2
// filter: b1
// filter: b3
// filter: c
複製代碼

調整後,map 只執行了一次,整個操做管道在輸入大量元素時的執行速度會快不少。若是 Stream 有不少的操做,時序考慮一下能不能經過調整持續來優化。

在上面的例子中另外加上 sorted 操做:

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));
複製代碼

sotred 是一個另類的中間操做,它是有狀態的。由於在排序的過程當中必需要維護數據的狀態。

執行上面的例子會產生以下輸出:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2
複製代碼

首先,sorted 會把輸入的全部元素排好序以後纔會進入下一個操做,和其餘操做不一樣,sorted 是水平執行的。因此在上面的例子中 sorted 纔會被執行 8 次。

經過調整操做的次序能夠再一次提高執行的性能:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter: d2
// filter: a2
// filter: b1
// filter: b3
// filter: c
// map: a2
// forEach: A2
複製代碼

在這個例子中 sorted 永遠也不會被執行,在 filter 執行完了以後就剩下一個元素,也就沒有排序的必要。在輸入大量元素的狀況下,性能也會獲得極大的提高。

重用 Stream

Java8 中的 Stream 是不能被重用的。一旦執行了終端操做,那麼 Stream 就會被關閉:

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception
複製代碼

在 anyMatch 以後調用 noneMatch 會產生以下的異常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)
複製代碼

若是須要解決這一點,能夠爲每個終端操做建立一個新的 Stream,好比可使用 Supplier 來建立全部中間操做已經執行完成的 Stream:

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok
複製代碼

每調用一次 get() 方法都會建立一個新的 Stream,而後就能夠執行須要執行的終端操做了。

進階操做

Stream 支持大量不一樣的操做,在上面的例子中已經介紹了最重要的操做如 filtermap。完整的操做能夠在官方文檔中查看。下面會重點介紹更加複雜的操做 collectflatMapreduce

這節絕大部分的代碼例子都會使用下面 Person list 做爲演示數據:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));
複製代碼

Collect

若是須要將 Stream 中運行的結果轉成一個不一樣的類型,好比 List、Set 或者 Map,collect 就很是有用。collect 操做接受由 suppileraccumulatorcombinerfinisher 等四個部分組成的 Collector 對象。聽起來很複雜,但 java8 中 Collectors 類中的大量方法開箱即用,對不少通用的操做並不須要本身去實現:

注:suppiler, accumulator, combiner, finisher 都是函數式接口

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]
複製代碼

很簡單就能夠從 Stream 中獲取一個 List,若是須要一個 Set,調用 Collectors.toSet() 就好了。

下面的這個例子是經過年齡來給 person 分組:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]
複製代碼

Collectors 功能不少,還能夠用來對 Stream 中的元素作聚合操做,好比計算全部 person 的平均年齡:

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0
複製代碼

還能夠用來作統計,summarizing 會返回一個內建的統計對象,經過這個對象能夠很方便的獲得最大年齡、最小年齡、平均年齡等統計結果:

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
複製代碼

下面的例子中把全部 person 的名字拼成了一個字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
複製代碼

joining 接收一個間隔符和可選的前綴、後綴字符串。

爲了輸出 map 結果。必須指定 map 的 key 和 value。須要注意 key 必須是惟一的,不然會報 IllegalStateException 異常。能夠經過傳入另一個合併方法做爲參數來避免這個異常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}
複製代碼

上面介紹了一些很強大 Collectors 的內建方法。下面來實現一個自定義的 collector。將全部 Person 的名稱轉成大寫並輸入到一個字符串中,每一個名字使用 | 來隔開。自定義的 collecotr 使用 Collecotr.of() 來實現,須要實現其中的四個部分:supplieraccumulatorcombinerfinisher

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID
複製代碼

在 Java 中,String 對象是不可變的。因此須要一個 StringJoiner 來組合字符串,suppiler 實例化一個帶 | 分隔符的 StringJoiner 對象。accumulator 把字符串轉成大寫而且放進 StringJoiner 對象,combiner 將兩個 StringJoiner 對象合成一個,最後 finisher 把 StringJoiner 對象輸出爲 String 對象。

flatMap

在上面已經介紹瞭如何使用 map 將 Stream 中的對象轉成另一種類型的對象。map 只能把一種類型轉成另一種特定的類型,在把一種類型轉成任意種類型的狀況下,map 就有點受限制了。而 flatMap 正是來解決這個問題的。

flatMap 會把 Stream 中的每一個元素轉成另外一個 Stream 中的其餘對象。因此每一個元素依賴 STream 會被轉成 0 個,1 個或者多個其餘對象。這些生成的新的 stream 會在 flatMap 操做結束的時候返回。

在使用 flatMap 以前,須要定義如下的數據結構:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}
複製代碼

接下來,利用 Stream 初始化一些對象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i))); 

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
複製代碼

如今,生成了包含三個 foo 對象的 list,每一個 foo 對象中又包含三個 bar 對象。

flatMap 接收一個返回 Stream 對象的方法做爲參數,爲了分解 foo 中的每一個 bar 對象,傳入一個合適的方法:

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
複製代碼

上面那個例子成功的把一個包含三個 foo 對象的 Stream 轉成了包含 9 個 bar 對象的 Stream。

並且,上面的那些代碼能夠被簡化成一個 Stream 管道操做:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));
複製代碼

flatMap 操做對 java8 中的 Optional 對象也有用,Optional 對象的操做會返回另外一個類型的 Optional 對象。因此這個特性能夠用來消除空指針檢查。

定義類的抽象層次以下:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}
複製代碼

爲了從 Outer 對象中調用 Inner 對象中的 foo 字符串,須要作不少的空指針檢查來避免潛在的空指針異常:

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}
複製代碼

這些操做能夠經過 flatMap 來進行優化:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);
複製代碼

每調用一次都會返回一個 Optional 對象,對象中包裹着目標對象或者 null。

Reduce

Reduce 組合 Stream 中全部的元素,而後產生一個單獨的結果。Java8 支持三種 reduce 方法。第一種 reduce 對於每一個 Stream 只會返回一個元素。下面這個例子計算除了年齡最大的人的名字:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela
複製代碼

reduce 方法接受一個 BinaryOperator 函數。在 Person 這個例子中,其實是一個 BiFunction,兩個操做數的類型都是一致的。BiFunction 與 Function 很像,可是前者接收兩個參數。這個例子中比較全部 person 的年齡屬性來找出最大年齡的 person。

第二種 reduce 方法接受一個目標對象和一個 BinaryOperator。下面這個方法能夠聚合全部的 person 屬性來建立一個新的 person:

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
複製代碼

第三種 reduce 接受三個參數:一個目標對象、一個 BiFunction、一個 BinaryOperator 類型的 combiner。由於這個傳入的值不必定是 Person 類型,因此咱們能夠利用這個特性來計算全部 Person 年齡的總和:

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76
複製代碼

最後的結果是 76,那麼中間的計算過程是什麼樣的的呢?下面 debug 了計算的過程:

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
複製代碼

能夠看到 accumulator 函數完成了全部的計算,調用的第一次獲得的是初始值 0 和 Max person。而後後續的三步完成了全部年齡的的累加。在最後一步獲得了全部年齡的累加結果 76。

可是上面的例子看起來稍微有點問題,由於 combiner 函數根本沒有執行,可是真的是這樣的嗎?看下面的代碼咱們就能發現祕密所在:

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
複製代碼

在並行執行的狀況下有着徹底不一樣的執行行爲。在這裏 combiner 執行了,accumulator 在並行狀況下被執行的時候,combiner 用來累加 accumulator 的執行結果。

在下一節會詳細分析並行 Stream。

並行 Stream

在輸入元素數量不少的狀況下,經過並行執行 Stream 能夠提高執行性能。並行 Stream 使用了 ForkJoinPool,這個對象能夠經過 ForkJoinPool.commonPool() 來獲得。底層的線程池最多能夠有五個線程,取決於物理機器能夠用的 CPU 有幾個。

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3
複製代碼

在個人機器上這個線程的數量被設定爲 3。這個值能夠經過 JVM 的參數來進行調整:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
複製代碼

Collection 對象能夠經過 parallelStream() 來建立一個並行的 Stream。或者也能夠對一個串行的 Stream 對象調用 parallel() 來轉成並行 Stream。

爲了理解 Stream 是如何並行執行的,下面這個例子把線程的狀況都打印出來了:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
複製代碼

經過研究 debug 輸出,能夠看到 Stream 執行過程當中哪些線程確實用到了:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
複製代碼

並行 Stream 執行操做的過程當中用到了線程池中全部的線程。上面輸出的結果順序可能每次都是不同的,這是由於線程執行的順序自己就是不同的。

給上面的例子加上 sort 操做:

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));
複製代碼

執行的結果看起來有點奇怪:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
複製代碼

能夠看到了 sort 操做只會在主線程中執行。並行 Stream 中的 sort 操做實際用到了 Java8 中的新接口 Arrays.parallelSort()。在 Javadoc 中說明了數組的長度決定了這個排序操做是串行仍是並行執行:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上面的例子,能夠發現 combiner 函數只會在並行狀況下執行。下面來看一下哪些線程確實執行了:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });
複製代碼

上面例子的輸出說明了 accumulator 和 combiner 在並行 Stream 中會在全部的可用線程上執行:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]
複製代碼

全部在輸入元素的量很大的狀況下,並行 Stream 會帶來很大的性能提高。可是須要注意一些操做好比 reducecollect 須要額外的 combine 操做,可是在串行 Stream 中並不須要。

此外,全部的並行 Stream 都依賴 ForkJoinPool 線程池。因此應當儘可能避免實現一些阻塞 Stream 的操做,由於這樣會下降那些依賴並行 Stream 的程序的性能。

(完)

原文

關注微信公衆號,聊點其餘的

相關文章
相關標籤/搜索