Java 8 Stream 教程

Java 8 Stream Tutorial


本文采用實例驅動的方式,對JAVA8的stream API進行一個深刻的介紹。雖然JAVA8中的stream API與JAVA I/O中的InputStream和OutputStream在名字上比較相似,可是實際上是另一個東西,Stream API是JAVA函數式編程中的一個重要組成部分。html

本文描述如何使用JAVA8的Stream API。經過本文,你能夠了解Stream API的執行順序,不一樣的執行順序會對stream api的執行效率有較大的影響。本文會詳細描述Stream API中的reducecollectflatMap等操做,結尾部分會深刻講解parallel streamsjava

若是你對JAVA8中新增的概念:lambda表達式,函數式接口,方法引用不熟悉。能夠從:Java 8 Tutorial一文中獲取相關的知識。git


Streams如何工做?

stream是一個能夠對個序列中的元素執行各類計算操做的一個元素序列。github

List<String> myList =
    Arrays.asList("a1", "a2", "b1", "c2", "c1");

myList
    .stream()
    .filter(s -> s.startsWith("c"))
    .map(String::toUpperCase)
    .sorted()
    .forEach(System.out::println);

// C1
// C2

stream包含中間(intermediate operations)和最終(terminal operation)兩種形式的操做。中間操做(intermediate operations)的返回值仍是一個stream,所以能夠經過鏈式調用將中間操做(intermediate operations)串聯起來。最終操做(terminal operation)只能返回void或者一個非stream的結果。在上述例子中:filter, map ,sorted是中間操做,而forEach是一個最終操做。更多關於stream的中可用的操做能夠查看java doc。上面例子中的鏈式調用也被稱爲操做管道流。shell

大多stream操做接受某種形式的lambda表達式做爲參數,經過方法接口的形式指定操做的具體行爲,這些方法接口的行爲基本上都是無干擾(non-interfering)和無狀態(stateless)。無干擾(non-interfering)的方法的定義是:該方法不修改stream的底層數據源,好比上述例子中:沒有lambda表達式添加或者刪除myList中的元素。無狀態(stateless)方法的定義:操做的執行是獨立的,好比上述例子中,沒有lambda表達式在執行中依賴可能發生變化的外部變量或狀態。編程


streams分類

能夠從不一樣的數據源建立stream。java collection包中的Collections,Lists,Sets這些類中新增stream()和parallelStream()方法,經過這些方法能夠建立一個順序stream(sequential streams)或者一個併發的stream(Parallel streams)。併發stream(Parallel streams)更適合在多線程中使用,本文先介紹順序流(sequential streams)在結尾會描述併發stream(Parallel streams),api

Arrays.asList("a1", "a2", "a3")
    .stream()
    .findFirst()
    .ifPresent(System.out::println);  // a1

List對象上調用stream()方法能夠返回一個常規的對象流。在下面的例子中咱們不須要建立一個collection對象也可使用stream:數組

Stream.of("a1", "a2", "a3")
    .findFirst()
    .ifPresent(System.out::println);  // a1

直接使用Stream.of()方法就能從一組對象建立一個stream對象,多線程

除了常規的對象流,JAVA 8中的IntStream,LongStream,DoubleStream這些流可以處理基本數據類型如:int,long,double。好比:IntStream可使用range()方法可以替換掉傳統的for循環併發

IntStream.range(1, 4)
    .forEach(System.out::println);

// 1
// 2
// 3

基本類型流(primitive streams)使用方式與常規對象流類型(regular object streams)大部分相同,可是基本類型流(primitive streams)能使用一些特殊的lambda表達式,好比:用IntFunction代替Function,用IntPredicate代替Predicate,同時基本類型流(primitive streams)中能夠支持一些聚合方法,如:sum(),average()等。

Arrays.stream(new int[] {1, 2, 3})
    .map(n -> 2 * n + 1)
    .average()
    .ifPresent(System.out::println);  // 5.0

能夠經過常規對象流(regular object stream)的mapToInt(), mapToLong(),mapToDouble(),基本類型對象流(primitive streams)中的mapToObj()等方法完成常規對象流和基本類型流之間的相互轉換

IntStream.range(1, 4)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

下面這個例子中doubles stream先被映射成int stream,而後又被映射成String類型的對象流:

Stream.of(1.0, 2.0, 3.0)
    .mapToInt(Double::intValue)
    .mapToObj(i -> "a" + i)
    .forEach(System.out::println);

// a1
// a2
// a3

處理順序

前面描述瞭如何建立和使用各類stream,如今開始深刻了解stream執行引擎的工做原理。

Laziness(延遲加載)是中間操做(intermediate operations)的一個重要特性。以下面這個例子:中間操做(terminal operation)缺失,當執行這個代碼片斷的時候,並不會在控制檯打印相應的內容,這是由於只有最終操做(terminal operation)存在的時候,中間操做(intermediate operations)纔會執行。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    });

給上面的例子添加最終操做(terminal operation)forEach:

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return true;
    })
    .forEach(s -> System.out.println("forEach: " + s));

執行結果以下:

filter:  d2
forEach: d2
filter:  a2
forEach: a2
filter:  b1
forEach: b1
filter:  b3
forEach: b3
filter:  c
forEach: c

執行結果比較讓人驚奇,想固然的作法是水平執行此流上的全部元素。可是其實是每個元素沿着鏈垂直移動,第一個字符串"d2"執行完filterforEach後第二個元素"a2"纔開始執行。

這種沿着鏈垂直移動的行爲能夠下降每個元素上進行操做的數量,如咱們在下面的例子中所示:

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .anyMatch(s -> {最終操做
        System.out.println("anyMatch: " + s);
        return s.startsWith("A");
    });

// map:      d2
// anyMatch: D2
// map:      a2
// anyMatch: A2

當對給定元素執行判斷爲真時anyMatch操做會馬上返回true,在上面例子中執行到元素「A2」的時候,元素判斷爲真anyMatch馬上返回true,因爲流是沿着鏈垂直移動的,所以上面的map操做只會執行兩次。

注:stream的執行流程相似shell中管道:ps xxx | grep "sss" | grep "ccc",是按照輸入行的形式進行處理。


執行效率與steream執行鏈順序的關係

下面的例子由兩個中間操做(intermediate operations)map和filter以及一個最終操做(terminal operation)forEach構成,咱們觀察這些動做是如何執行的。

Stream.of("d2", "a2", "b1", "b3", "c")
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("A");
    })
    .forEach(s -> System.out.println("forEach: " + s));

// map:     d2
// filter:  D2
// map:     a2
// filter:  A2
// forEach: A2
// map:     b1
// filter:  B1
// map:     b3
// filter:  B3
// map:     c
// filter:  C

你可能已經猜測到:mapfilter操做被執行了5次,可是forEach操做只被執行了1次。咱們能夠經過修改操做的執行順序(如:將filter操做移到操做鏈的頭部),大幅度下降執行次數

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// map:     a2
// forEach: A2
// filter:  b1中間操做
// filter:  b3
// filter:  c

修改後map只被執行了1次,若是此時數據量比較大則操做管道的執行效率會有較大的提高,在處理複雜方法鏈的時候須要注意執行順序對執行效率的影響。

給上面的例子添加sort操做。

Stream.of("d2", "a2", "b1", "b3", "c")
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

執行結果以下:

sort:    a2; d2
sort:    b1; a2
sort:    b1; d2
sort:    b1; a2
sort:    b3; b1
sort:    b3; d2
sort:    c; b3
sort:    c; d2
filter:  a2
map:     a2
forEach: A2
filter:  b1
filter:  b3
filter:  c
filter:  d2

Sorting 是一種特殊的中間操做(intermediate operation),在對集合中元素進行排序過程當中須要保存元素的狀態,所以Sorting 是一種有狀態的操做(stateful operation)。

首先,在整個輸入集上執行排序操做(即先對集合進行水平操做),因爲輸入集合中的元素間存在多種組合,所以上面的例子中sorted操做被執行了8次。

能夠經過對執行鏈重排序的方式,提高stream的執行效率。修改執行鏈順序以後因爲filter操做的過濾,致使sorted操做的輸入集只有一個元素,在大數據量的狀況下可以大幅度提升執行效率。

Stream.of("d2", "a2", "b1", "b3", "c")
    .filter(s -> {
        System.out.println("filter: " + s);
        return s.startsWith("a");
    })
    .sorted((s1, s2) -> {
        System.out.printf("sort: %s; %s\n", s1, s2);
        return s1.compareTo(s2);
    })
    .map(s -> {
        System.out.println("map: " + s);
        return s.toUpperCase();
    })
    .forEach(s -> System.out.println("forEach: " + s));

// filter:  d2
// filter:  a2
// filter:  b1
// filter:  b3
// filter:  c
// map:     a2
// forEach: A2

流複用

Java 8 streams不能被複用,當你執行完任何一個最終操做(terminal operation)的時候流就被關閉了。

Stream<String> stream =
    Stream.of("d2", "a2", "b1", "b3", "c")
        .filter(s -> s.startsWith("a"));

stream.anyMatch(s -> true);    // ok
stream.noneMatch(s -> true);   // exception

在同一個stream中執行完anyMatch後再執行noneMatch就會拋出以下異常:

java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
    at com.winterbe.java8.Streams5.test7(Streams5.java:38)
    at com.winterbe.java8.Streams5.main(Streams5.java:28)

能夠經過爲每一個最終操做(terminal operation)建立一個新的stream鏈的方式來解決上面的重用問題,Stream api中已經提供了一個stream supplier類來在已經存在的中間操做(intermediate operations )的stream基礎上構建一個新的stream。

Supplier<Stream<String>> streamSupplier =
    () -> Stream.of("d2", "a2", "b1", "b3", "c")
            .filter(s -> s.startsWith("a"));

streamSupplier.get().anyMatch(s -> true);   // ok
streamSupplier.get().noneMatch(s -> true);  // ok

streamSupplier的每一個get()方法會構造一個新的stream,咱們能夠在這個stream上執行指望的最終操做(terminal operation)。


高級操做

Streams支持多種不一樣的操做(operations),咱們已經瞭解過filter,map等比較重要的操做。你能夠經過Stream Javadoc進一步瞭解更多的操做。如今咱們開始深刻探討更復雜的操做:collect flatMap reduce

假設存在以下的用戶列表:

class Person {
    String name;
    int age;

    Person(String name, int age) {
        this.name = name;
        this.age = age;
    }

    @Override
    public String toString() {
        return name;
    }
}

List<Person> persons =
    Arrays.asList(
        new Person("Max", 18),
        new Person("Peter", 23),
        new Person("Pamela", 23),
        new Person("David", 12));

Collect(收集)

Collect(收集)是一種是十分有用的最終操做,它能夠把stream中的元素轉換成另一種形式,好比;list,set,map。Collect使用Collector做爲參數,Collector包含四種不一樣的操做:supplier(初始構造器), accumulator(累加器), combiner(組合器), finisher(終結者)。這聽起來很複雜,可是一個好消息是java 8經過Collectors類內置了各類複雜的收集操做,所以對於大部分經常使用的操做來講,你不須要本身去實現collector類。

從一個十分常見的用類開始:

List<Person> filtered =
    persons
        .stream()
        .filter(p -> p.name.startsWith("P"))
        .collect(Collectors.toList());

System.out.println(filtered);    // [Peter, Pamela]

經過上面的demo能夠看出,將stream轉換爲List十分簡單,若是想轉換爲Set的話,只需使用Collectors.toSet()就能夠了。

下面的例子暫時將用戶按年齡分組:

Map<Integer, List<Person>> personsByAge = persons
    .stream()
    .collect(Collectors.groupingBy(p -> p.age));

personsByAge
    .forEach((age, p) -> System.out.format("age %s: %s\n", age, p));

// age 18: [Max]
// age 23: [Peter, Pamela]
// age 12: [David]

Collectors類功能繁多,你能夠經過Collectors對stream中的元素進行匯聚,好比:計算全部用戶的年紀。

Double averageAge = persons
    .stream()
    .collect(Collectors.averagingInt(p -> p.age));

System.out.println(averageAge);     // 19.0

能夠經過summarizing collectors能返回一個內置的統計對象,經過這個對象可以獲取更加全面的統計信息,好比用戶年紀中的最大值,最小值,平均年紀等結果。

IntSummaryStatistics ageSummary =
    persons
        .stream()
        .collect(Collectors.summarizingInt(p -> p.age));

System.out.println(ageSummary);
// IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}

下面的例子展現如何將全部用戶鏈接成一個字符串:

String phrase = persons
    .stream()
    .filter(p -> p.age >= 18)
    .map(p -> p.name)
    .collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));

System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.

join collector的三個參數分別表示:鏈接符,字符串前綴,字符串後綴(可選)。

將一個stream轉換爲map,咱們必須指定map的key和value如何映射。要注意的是key的值必須是惟一性的,不然會拋出IllegalStateException,可是能夠經過使用合併函數(可選)繞過這個IllegalStateException異常:

Map<Integer, String> map = persons
    .stream()
    .collect(Collectors.toMap(
        p -> p.age,
        p -> p.name,
        (name1, name2) -> name1 + ";" + name2));

System.out.println(map);
// {18=Max, 23=Peter;Pamela, 12=David}

前文已經介紹了jdk內置的一些頗有用的collectors,接下來開始介紹如何構造咱們本身所需的collector,咱們的目標是將stream中全部用戶的用戶名變成大寫並用"|"符號鏈接成一個字符串。爲了達成這個目標咱們經過Collector.of()方法建立了一個新的collector,咱們必須給這個collector提供四種功能:supplier, accumulator, combiner,finisher.

Collector<Person, StringJoiner, String> personNameCollector =
    Collector.of(
        () -> new StringJoiner(" | "),          // supplier
        (j, p) -> j.add(p.name.toUpperCase()),  // accumulator
        (j1, j2) -> j1.merge(j2),               // combiner
        StringJoiner::toString);                // finisher

String names = persons
    .stream()
    .collect(personNameCollector);

System.out.println(names);  // MAX | PETER | PAMELA | DAVID

因爲JAVA中String是一個不可變對象,所以咱們須要一個輔助類(好比StringJoiner)來幫助collect構造咱們的字符串。supplier建立了一個包含適當分隔符的StringJoiner對象,accumulator用來將每一個用戶名轉爲大寫並添加到supplier建立的StringJoiner中,combiner將兩個StringJoiners對象鏈接成一個,最後一步的finisher從StringJoiner中構建出所但願的獲得的string對象。


FlatMap

咱們已經瞭解:經過map方法能夠將stream中的一種對象轉換成另一種對象。可是map方法仍是有使用場景限制,只能將一種對象映射爲另一種特定的已經存在的對象。是否可以將一個對象映射爲多種對象,或者映射成一個根本不存在的對象呢。這就是flatMap方法出現的目的。

FlatMap方法能夠將一個stream中的每個元素對象轉換爲另外一個stream中的另外一種元素對象,所以能夠將stream中的每一個對象改形成零,一個或多個。flatMap操做的返回流包含這些改造後的對象。

爲了演示flatMap,定義一個繼承關係以下:

class Foo {
    String name;
    List<Bar> bars = new ArrayList<>();

    Foo(String name) {
        this.name = name;
    }
}

class Bar {
    String name;

    Bar(String name) {
        this.name = name;
    }
}

經過流實例化一隊對象:

List<Foo> foos = new ArrayList<>();

// create foos
IntStream
    .range(1, 4)
    .forEach(i -> foos.add(new Foo("Foo" + i)));

// create bars
foos.forEach(f ->
    IntStream
        .range(1, 4)
        .forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));

完成上述操做以後咱們獲得三個foos,每一個foos包含三個bars。

FlatMap接收一個返回值爲stream的函數作參數,經過傳遞合適的函數,就能夠解析每個foo下對應的bar對象

foos.stream()
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3

正如所見,咱們成功地將三個對象的stream轉換成一個包含九個對象的stream

最後,上面的示例代碼能夠簡化爲一個單一管道流:

IntStream.range(1, 4)
    .mapToObj(i -> new Foo("Foo" + i))
    .peek(f -> IntStream.range(1, 4)
        .mapToObj(i -> new Bar("Bar" + i + " <- " f.name))
        .forEach(f.bars::add))
    .flatMap(f -> f.bars.stream())
    .forEach(b -> System.out.println(b.name));

FlatMap也支持JAVA8中新引入的Optional類,Optionals flatMap能返回一個另外的類的optional包裝類,能夠用來減小對null的檢查。

假設有以下這種多層級結構:

class Outer {
    Nested nested;
}

class Nested {
    Inner inner;
}

class Inner {
    String foo;
}

爲了獲取內部outer實例的內部foo對象,須要添加一系列空指針判斷

Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
    System.out.println(outer.nested.inner.foo);
}

能夠採用optionals flatMap 操做得到相同的結果:

Optional.of(new Outer())
    .flatMap(o -> Optional.ofNullable(o.nested))
    .flatMap(n -> Optional.ofNullable(n.inner))
    .flatMap(i -> Optional.ofNullable(i.foo))
    .ifPresent(System.out::println);

上面的例子中flatMap的每次調用都會返回一個用Optional對象,若是有返回值則這個Optional對象是這個返回值的包裝類,若是返回值不存在則返回null。


Reduce(減小)

reduce操做能夠將stream中全部元素組合起來獲得一個元素,JAVA8支持三中不一樣的reduce方法。

第一種能從stream元素序列中提取一個特定的元素。好比下面的從用戶列表中選擇年紀最大的用戶操做:

persons
    .stream()
    .reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
    .ifPresent(System.out::println);    // Pamela

上面的實例中reduce方法接收一個二元累加計算函數(BinaryOperator accumulator function)做爲參數,二元操做(BinaryOperator)實際就是上在兩個操做數共享同一類型。示例中函數比較兩人年齡,返回的最大年齡的人。

第二種reduce操做接收一個標識值和一個二元操做累加器做爲參數,這個reduce方法能夠把stream中全部用戶的名字和年齡彙總獲得一個新用戶。

Person result =
    persons
        .stream()
        .reduce(new Person("", 0), (p1, p2) -> {
            p1.age += p2.age;
            p1.name += p2.name;
            return p1;
        });

System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76

第三種reduce方法,接收三個參數:一個標示值(identity value),一個二元操做累加器(BiFunction accumulator),一個二元組合方法。因爲標識符參數未被嚴格限制爲person類型,所以咱們能夠用這個reduce方法來獲取用戶的總年齡。

Integer ageSum = persons
    .stream()
    .reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);

System.out.println(ageSum);  // 76

計算的結果是76,經過添加調試輸出,咱們能夠詳細地瞭解執行引擎中發生了什麼。

Integer ageSum = persons
    .stream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David

從調試輸出中能夠看到,累加器作了全部的工做,它首先獲取值爲0的標示值和第一個用戶Max,接下來的三步中持續sum值因爲累加不斷變大,在最後一步彙總的年紀增加到76。

注意,上面的調試輸出中combiner沒有執行,經過parallel執行上面相同stream。

Integer ageSum = persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
            return sum1 + sum2;
        });

// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35

Executing this stream in parallel results in an entirely different execution behavior. Now the combiner is actually called. Since the accumulator is called in parallel, the combiner is needed to sum up the separate accumulated values.

經過並行的方式執行上面的stream操做,獲得的是另一種徹底不相同的執行動做。在並行stream中combiner方法會被調用。這是因爲累加器是被並行調用的,所以組合器須要對分開的累加操做進行求和。

下一章會詳細描述並行stream。


Parallel Streams(並行流)

爲了提升大量輸入時的執行效率,stream能夠採用並行的放行執行。並行流(Parallel Streams)經過ForkJoinPool.commonPool() 方法獲取一個可用的ForkJoinPool。這個ForkJoinPool使用5個線程(其實是由底層可用的物理cpu核數決定的)。

ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());    // 3

On my machine the common pool is initialized with a parallelism of 3 per default. This value can be decreased or increased by setting the following JVM parameter:
在個人機器上公共池初始化爲每一個默認3並行,這個值能夠經過調整jvm參數來修改:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Collections中包含parallelStream()方法,經過這個方法可以爲Collections中的元素建立並行流。另外也能夠調用stream的parallel()方法將一個順序流轉變爲一個並行流的拷貝。

爲了瞭解並行流的執行動做,下面的例子會打印當前線程的執行信息。

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

執行的結果以下:

filter:  b1 [main]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  c2 [ForkJoinPool.commonPool-worker-3]
map:     c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map:     b1 [main]
forEach: B1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-3]
map:     a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]

經過分析調試輸出,咱們能夠更好地瞭解哪個線程執行了哪些stream操做。從上面的輸出中咱們能夠看到parallel stream使用了ForkJoinPool提供的全部可用的線程來執行流的各類操做。因爲不能肯定哪一個線程會執行並行流的哪一個操做,所以反覆執行上面的代碼,打印的結果會不一樣。

擴充上面的例子,添加sort操做

Arrays.asList("a1", "a2", "b1", "c2", "c1")
    .parallelStream()
    .filter(s -> {
        System.out.format("filter: %s [%s]\n",
            s, Thread.currentThread().getName());
        return true;
    })
    .map(s -> {
        System.out.format("map: %s [%s]\n",
            s, Thread.currentThread().getName());
        return s.toUpperCase();
    })
    .sorted((s1, s2) -> {
        System.out.format("sort: %s <> %s [%s]\n",
            s1, s2, Thread.currentThread().getName());
        return s1.compareTo(s2);
    })
    .forEach(s -> System.out.format("forEach: %s [%s]\n",
        s, Thread.currentThread().getName()));

執行結果以下:

filter:  c2 [ForkJoinPool.commonPool-worker-3]
filter:  c1 [ForkJoinPool.commonPool-worker-2]
map:     c1 [ForkJoinPool.commonPool-worker-2]
filter:  a2 [ForkJoinPool.commonPool-worker-1]
map:     a2 [ForkJoinPool.commonPool-worker-1]
filter:  b1 [main]
map:     b1 [main]
filter:  a1 [ForkJoinPool.commonPool-worker-2]
map:     a1 [ForkJoinPool.commonPool-worker-2]
map:     c2 [ForkJoinPool.commonPool-worker-3]
sort:    A2 <> A1 [main]
sort:    B1 <> A2 [main]
sort:    C2 <> B1 [main]
sort:    C1 <> C2 [main]
sort:    C1 <> B1 [main]
sort:    C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]

這個執行結果看起來比較奇怪,看起來sort操做只是在main線程中順序執行的。實際上,parallel stream中的sort操做使用了JAVA 8的一個新方法:Arrays.parallelSort()。JAVA doc中是這樣描述Arrays.parallelSort()的:待排序數組的長度決定了排序操做是順序執行仍是並行執行。java doc 描述以下:

If the length of the specified array is less than the minimum granularity, then it is sorted using the appropriate Arrays.sort method.

回到上一章的例子,咱們已經瞭解combiner方法只能在parallel streams中調用,讓咱們來看下那些線程被實際調用:

List<Person> persons = Arrays.asList(
    new Person("Max", 18),
    new Person("Peter", 23),
    new Person("Pamela", 23),
    new Person("David", 12));

persons
    .parallelStream()
    .reduce(0,
        (sum, p) -> {
            System.out.format("accumulator: sum=%s; person=%s [%s]\n",
                sum, p, Thread.currentThread().getName());
            return sum += p.age;
        },
        (sum1, sum2) -> {
            System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
                sum1, sum2, Thread.currentThread().getName());
            return sum1 + sum2;
        });

執行結果以下:

accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max;    [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David;  [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter;  [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=18; sum2=23;     [ForkJoinPool.commonPool-worker-1]
combiner:    sum1=23; sum2=12;     [ForkJoinPool.commonPool-worker-2]
combiner:    sum1=41; sum2=35;     [ForkJoinPool.commonPool-worker-2]

從控制檯輸出能夠看到accumulator和combiner操做都被可用的線程並行執行了。

總結起來:在大數據量輸入的時候,parallel streams能夠帶來比較大的性能提高。可是應該記住,一些並行操做,好比:reduce,collect須要額外的計算(組合操做),可是在順序流中,這些組合操做是不須要的。

另外,咱們知道全部的parallel stream操做共享一個jvm範圍內的ForkJoinPool,因此你應該注意避免在parallel stream上執行慢阻塞流操做,由於這些操做可能致使你應用中依賴parallel streams操做的其餘部分也會響應變慢。


結尾

若是你想更多瞭解JAVA 8 的stream,你能夠閱讀stream的JAVA doc,若是你想更深刻了解stream的底層機制,你能夠閱讀Martin Fowlers的文章Collection Pipelines

若是你對js也感興趣,你能夠查看Stream.js(一個用js實現的java 8 stream api),你也能夠查看我寫的java8教程。

但願這個教程對你有幫助,你也喜歡閱讀這個教程。這個教程的源碼和例子在github上,你能夠免費fork或者在twitter上給我反饋。


相關文章
相關標籤/搜索