JAVA8 之 Stream 流(五)

上一節咱們介紹了 Stream 相關的 API,map,filter 等方法都是中間操做。這一節咱們介紹終止操做。終止操做(terminal operation)的做用是產生結果。java

  • forEach 和 forEachOrderedsql

    forEach 和 forEachOrdered 都是對流中的每個元素執行對應的操做。須要傳入 Consumer 類型的函數式接口(一個輸入參數,沒有返回值)。編程

    若是咱們使用串行流時這兩個方法並無什麼區別,可是若是使用並行流(後面會有專門的章節去將並行流)的時候 forEach 並不能保證元素的處理順序,而 forEachOrdered 是嚴格按照順序去執行的。數組

    void forEach(Consumer<? super T> action);
    void forEachOrdered(Consumer<? super T> action);

    這兩個方法沒有返回值,因此通常使用這兩個方法進行打印等操做。安全

    Arrays.asList(4,2,6,8,1,9,3).stream().forEach(System.out::println);
  • toArray多線程

    toArray 方法的做用是將 Stream 流轉換爲數組。toArray 方法是一個終止操做。toArray 方法有兩個實現,第一個實現不須要傳入參數,返回 Object 類型的數組。第二個實現咱們能夠返回特定類型的數組,須要傳入 IntFunction 類型的函數式接口。併發

    Object[] toArray();
    <A> A[] toArray(IntFunction<A[]> generator);

    例子:app

    因爲 toArray() 方法返回的是 Object 類型的數組,因此使用起來不若有參數的 toArray 方法靈活。框架

    Object[] a = Arrays.asList(4,2,6,8,1,9,3).stream().filter(i->i>3).toArray();
    for(Object object:a){
      System.out.println(object);
    }

    若是咱們想返回特定類型的數組,咱們就可使用帶參數的 toArray 方法。IntFunction 中須要傳入一個 int 類型的數組做爲數組的長度,返回的 A[] 是咱們須要返回的數組類型。還會將流中的元素放入返回的數組中。ide

    例子:

    //Lambda 表達式
    Integer[]res1 = Arrays.asList(4,2,6,8,1,9,3).stream().filter(i->i>3).toArray(i->new Integer[i]);
    //方法引用
    Integer[]res2 = Arrays.asList(4,2,6,8,1,9,3).stream().filter(i->i>3).toArray(Integer[]::new);

    咱們在來看看 JDK 中給咱們的示例,將過濾後的元素放入到 Person 數組中:

    Person[] men = people.stream()
                      .filter(p -> p.getGender() == MALE)
                        .toArray(Person[]::new);
  • reduce

    聚合操做。就是將流中的元素彙集到一塊兒計算出一個結果,好比求和等操做。

    T reduce(T identity, BinaryOperator<T> accumulator);
    Optional<T> reduce(BinaryOperator<T> accumulator);
    <U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);

    來看第一個方法的例子:

    這個方法須要咱們傳入一個初始值,而後經過函數式接口 BinaryOperator 的實例計算出流中數據的和。

    Stream<Integer> stream = Arrays.stream(new Integer[]{1,2,3,4,5,6,7,8,9});
    Integer result = stream.reduce(0,(i,j)->i+j);
    System.out.println(result);

    把這個方法改爲 for 循環的方式方便你們理解:

    其實直觀的看函數式編程的方式更加易於理解,stream 流至關於 for 循環的操做。

    T res = identity;
    for(int i=0;i<n;i++){
      res = accumulator.apply(res,a[i]);
    }
    return res;

    若是在上面的方法,咱們不給初始值,返回的就是 Optional 類型。也就是 reduce 方法的第二個實現。

    Stream<Integer> str = Arrays.stream(new Integer[]{1,2,3,4,5,6,7,8,9});
    Optional<Integer> re = str.reduce((i, j)->{
      return i < j?i:j;
    });
    re.ifPresent(System.out::println);
    //上面的方法也能夠改爲方法引用的方式來實現
    //reduce(Integer::min)

    第三個函數實現等到咱們講並行流的時候再講。

  • Collect

    收集器, 做用是將輸入元素累積到一個可變的結果容器中(到 StringBuilder 中,到 List中,獲取彙總信息等等)。它會將全部的元素處理完畢後,將積累的結果轉換爲一個最終的表示,它支持串行與並行兩種方式執行。

    咱們再來看第一個方法實現:

    //參數的做用和上面相同
    <R> R collect(Supplier<R> supplier,  //提供一個 Container 容器
                  BiConsumer<R, ? super T> accumulator, //累加器
                  BiConsumer<R, R> combiner);  //並行聚合

    例子:

    咱們須要接收一個字符串流而後返回 List 集合。

    Supplier supplier, 這個函數式接口須要返回一個容器,類型和 collect 方法的返回值同樣,這裏咱們須要返回一個 List 集合,因此咱們須要返回一個 ArrayList。咱們可使用 ()-> new ArrayList(),或者 ArrayList::new 來實現 Supplier 函數式接口。

    BiConsumer<R, ? super T> accumulator,這個函數式接口的做用是將流中的元素放入到前面建立的容器中,可使用 (thelist, item)->thelist.add(item) 或者 ArrayList::add 方法來實現 BiConsumer 函數式接口。

    BiConsumer<R, R> combiner,是將全部並行操做的結果聚集到一塊兒,就是當全部容器的結果聚集到一塊兒就能夠了。咱們可使用 (theList1,theList2)-> theList1.addAll(thelist2),或者 ArrayList::addAll 來實現 BiConsumer 函數式接口。

    //方法引用方式
    List<String> asList = Arrays.asList("hello","world","java8","lambda").stream().collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
    
    
    //Lambda 表達式方式
    List<String> asList1 = Arrays.asList("hello","world","java8","lambda").stream().collect(()->new ArrayList<>(), (thelist, item)->
                                                                                          thelist.add(item), (theList1,theList2)->
                                                                                            theList1.addAll(theList2));

    下面的例子會接收一個字符串的流而後組成一個字符串:

    Supplier supplier,這個函數式接口須要返回一個容器,類型和 collect 方法的返回值同樣,因此這裏咱們直接返回一個 StringBuilder。

    BiConsumer<R, ? super T> accumulator,這個函數式接口的做用是將流中的元素放入到前面建立的容器中,咱們可使用 StringBuilder 的 append 方法來將流中的字符串數據追加到前一步建立的 StringBuilder 中。

    BiConsumer<R, R> combiner,是將全部並行操做的結果聚集到一塊兒。咱們仍是使用 append 方法將全部的 StringBuilder 聚集到一塊兒。

    StringBuilder concat = Arrays.asList("hello","world","java8","lambda").stream().collect(StringBuilder::new, StringBuilder::append,StringBuilder::append);
    StringBuilder concat1 = Arrays.asList("hello","world","java8","lambda").stream().collect(()->new StringBuilder(),(stringBuilder, s) -> stringBuilder.append(s),(stringBuilder1, stringBuilder2) -> stringBuilder1.append(stringBuilder2));

    先看第二個方法實現:

    <R, A> R collect(Collector<? super T, A, R> collector);

    Collector<T,A,R> 泛型接口,可變的匯聚操做,

    Collector 並非一個函數式接口,咱們看看它都定義了哪些抽象方法,以及這些抽象方法的做用:

    //T 是 Stream 裏提供的 input 類型
    //A 是累加器,結果會被放入累加器中
    //R 返回值,不必定全部狀況都返回 Collection
    
    public interface Collector<T,A,R>{
      Supplier<A> supplier();  //提供一個 Container
      BiConsumer<A,T> accumulator(); //累加器
      Function<A,R> finisher(); //返回的結果
      BinaryOperator<A> combiner(); //並行聚合,並行操做時,好比有四個線程同時去執行,那麼就會生成4個部分結果,使用combiner 聚合到一塊兒
      Set<Characteristics> characteristics(); //Collector的特徵
      // Concurrent, unordered,identity_finish
    }

    Collector 的實現類須要實現上面的方法而後將結果聚集到一塊兒。Collectors 自己提供了關於 Collector 的常見匯聚實現,好比匯聚到 List,Collection 等等。Collectors 自己實際是一個工廠

    先來看一些使用的例子:

    toCollection 能夠匯聚成一些常規的集合,好比 ArrayList,LinkedList,TreeSet 等等。

    Stream<String> stream = Stream.of("hello","world","helloworld");
    List<String> list = stream.collect(Collectors.toCollection(ArrayList::new));
    //若是想要換成 LinkedList 或者 TreeSet,只須要換掉 toCollection 的參數就能夠了

    固然也能夠直接使用 toList(將元素匯聚到 ArrayList 中),toSet(將元素匯聚到 HashSet 中)中。

    List<String> list1 = Arrays.asList("hello","world","helloworld","test");
    List<String> r = list1.stream().map(String::toUpperCase).collect(Collectors.toList());
    Set<String> s = list1.stream().map(String::toUpperCase).collect(Collectors.toSet());

    咱們經過 toList 的源碼在瞭解一下 JDK 的實現者們如何實現 Collector 接口的。

    Supplier 接口的實現直接返回一個 new ArrayList,累加器的實現使用 ArrayList 接口的 add 方法,聚合操做則是使用 addAll 方法。

    public static <T>
        Collector<T, ?, List<T>> toList() {
            return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                       (left, right) -> { left.addAll(right); return left; },
                                       CH_ID);
        }

    咱們再來看看 toSet 方法的源碼

    Supplier 接口的實現直接返回一個 new HashSet,累加器的實現使用 HashSet 的 add 方法,聚合操做一樣使用 addAll 方法。

    public static <T>
        Collector<T, ?, Set<T>> toSet() {
            return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
                                       (left, right) -> { left.addAll(right); return left; },
                                       CH_UNORDERED_ID);
        }

    咱們已經基本瞭解了 Collectors 中聚合的方法,相信咱們在實際工做中也能根據本身的須要實現一個本身的聚合方法。

    咱們還能夠在 collect 方法執行以後在進行一些操做,好比將結果的集合變爲一個不可變的集合。

    List<String> t = list1.stream().collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));

    collectors 還爲提供了許多求值或者計算的方法,好比,最大值(maxBy),最小值(minBy),個數(counting),求和(summingInt,summingLong,summingDouble)等等,由於比較簡單就不一一介紹了。

    Collectors 的分組和分區

    分組,至關於 sql 的 groupBy。好比咱們將 List 根據姓名分組,返回的結果 Map<String, List >。

    之前的編程思想:

    • 循環列表
    • 取出學生的名字
    • 檢查 map中是否存在該名字,不存在則直接添加到 map 中,存在則將 map 中的 List 對象取出來,而後將 Student 對象加入到 List 中。
    • 返回 map 對象。

    流的方式:

    Map<String,List<Student>> map = students.stream().collect(Collectors.groupingBy(Student::getName));

    若是咱們想返回姓名和姓名相同的 Student 的個數該怎麼辦哪?能夠在 groupingBy 的第二個參數放入咱們須要放入每一個組內的數據。

    //返回 name,count(*)
    Map<String,Long> map =students.stream().collect(Collectors.groupingBy(Student::getName,Collectors.counting()));

    若是咱們想按照姓名分組,獲取相同姓名的同窗的平均分數改怎麼辦那?

    Map<String,Double> map = students.stream().collect(Collectors.groupingBy(Student::getName,Collectors.avaeragingDouble(Student::getSocre));

    分區,partition by,能夠認爲分組的特殊狀況。只會有兩組。至關於 True 和 False 兩個組。

    咱們把學生分區,大於 90 分的是一個分區,小於 90 分的是一個分區。

    //大於 90 的一個分區,小於 90 的一個分區
    Map<Boolean,List<Student>> s = students.stream().collect(Collectors.partitioningBy(student->student.getScore()>=90));
  • allMatch/noneMatch/anyMatch

    這三個方法都須要傳入 Predicate 類型的函數式接口,allMatch 只有全部元素都符合咱們在 Predicate 實例中設置的條件則返回 true,不然返回 false。anyMatch 則是任意一個符合就返回 true,都不符合則返回 false。noneMatch 是全部都不符合則返回 true,不然返回 false。

  • findAny/findFirst

    findAny 返回流中的某一個元素,findFirst 返回流中第一個元素。兩個函數的返回值都是 Optional 類型的,若是找不到元素,則返回一個包含空值的 Optioanl 對象。

咱們常用 Collection 接口下的 stream() 方法來將咱們須要操做的集合轉化爲流。其實 Collection 接口還有另外一個方法來生成流,這個方法用於生成並行流,parallelStream()。所謂並行流就是用多線程的方式對流中的數據進行操做。

並行流帶來的好處固然是加快處理數據的速度,可是使用並行流並不老是很快,有時反而很是慢。並且使用 parallelStream 還要注意線程安全問題。這兩個問題咱們稍後給你們講解,咱們先來看看 parallelStream 的底層原理。

底層原理

咱們經過 parallelStream 的執行來看看後臺到底有哪些線程。咱們經過 sleep 方法讓它執行的時間長一點,方便咱們查看。

public static void main(String[] args) {
  List<String> lists = Arrays.asList("hello","world","java8","fork","join","string","main");
  lists.parallelStream().forEach(s->{
    try {
      Thread.sleep(100000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println(s);
  });
}

使用 jconsole 命令,選擇咱們正在執行的程序。

注意後臺中有三個 ForkJoinPool 名字的線程,這說明 parallelStream 使用了 fork/join 框架來執行多線程任務。

最後來看上面那段代碼的執行結果,咱們發現使用 parallelStream 以後 forEach 方法中的元素執行順序出現了問題。

main
hello
join
world
string
java8
fork

Fork/Join

爲了不有的同窗對於這個框架不熟悉,咱們來簡單介紹一下這個框架的思想和使用方法。

Fork/Join 是 JAVA7 中引入的並行操做框架,注意這裏並非併發而是並行。併發指的是在系統中有多個線程運行去執行任務,它們同時等待一個 CPU 分配時間片去執行,某一時刻只有一個線程執行任務。而並行在這裏指的是利用多核 CPU,將子任務劃分到不一樣 CPU 中,達到同時執行的效果,某一時刻可能有互相不影響多個線程同時執行子任務。

Fork/Join 從名字上來看就是採用分而治之的思想,將任務 fork 成多個子任務,執行完畢後在將結果 join 到一塊兒。

Fork/Join 框架有兩個重要的類,ForkJoinPool,繼承了 ExecutorService 和線程池同一個父類,至關於一個特殊的線程池,裏面線程的數量通常爲 CPU 的核數。線程池裏面的 Task 類爲 ForkJoinTask,咱們通常使用它的子類來建立 task,RecursiveAction 適用於沒有返回值的 task,RecursiveTask 適用於有返回值的 task。

線程安全

經過上面兩個章節咱們已經瞭解了 parallelStream 的建立,底層原理了。parallelStream 的使用也和 stream 方法同樣,只不過 parallelStream 使用並行的方式處理流中的數據,並行帶來的好處是可以加快數據處理的速度,可是此時咱們必定要注意線程安全的問題,就如同上面的 forEach 方法同樣,使用 parallelStream 後就出現了數據處理順序的問題。

在使用並行流時,有些方法是很難保持穩定性的,而且維持穩定性的成本很高,若是須要嚴格的順序或者高性能時就不要使用 parallelStream,使用這些方法時咱們仍是推薦你們使用串行流。好比 distinct,limit,skip。

有些方法須要咱們本身提供同步的機制,好比 forEach。有些不須要額外同步的方法,reduce,collect,咱們也推薦你們使用 reduce 和 collect。

咱們就 forEach 的線程不安全問題來給你們一個演示:

咱們經過並行流的 forEach 方法將流中元素加入到新的容器中。

List<String> lists = Arrays.asList("hello","world","java8","fork","join","string","main");
List<String> res = new ArrayList<>();
lists.parallelStream().forEach(res::add);
res.stream().forEach(System.out::println);

結果,新容器中的數據順序不但變了,並且還出現了 null 這樣嚴重的線程安全問題。

null
main
hello
java8
string
fork
join

解決辦法,經過 synchronizedList 方法將咱們要經過並行流操做的容器變爲線程安全的。

List<String> lists = Arrays.asList("hello","world","java8","fork","join","string","main");
List<String> res = new ArrayList<>();
List<String> newRes = Collections.synchronizedList(res);
lists.parallelStream().forEach(newRes::add);
res.stream().forEach(System.out::println);

結果,雖然順序仍是變了,可是元素一個也很多。

join
main
world
java8
string
hello
fork

性能

併發流 parallelStream 的做用是使用多線程的方式來加快流處理的速度。那麼並行流的性能到底怎麼樣哪?

咱們經過加法來計算一下並行流和串行流和 for 循環的速度,這個結果徹底是基於個人機器,在不一樣的機器上時間可能會出現偏差,可是不會影響結果。

第一個方法,for 循環。

private static long forMethod(long limit){
  long result= 0;
  for(long i=1l;i< limit;i++){
    result = result+i;
  }
  return result;
}

第二個方法,使用 iteate 方法來生成一個無限流,經過 limit 限制流的個數,而後經過 reduce 來計算。

private static long iterateStream(long limit){
  return Stream.iterate(1L,i->i+1).
    limit(limit).reduce(0L,Long::sum);
}

第三個方法,使用 iteate 方法來生成一個無限流,經過 limit 限制流的個數,使用並行的方式來計算。

private static long iterateStreamparallel(long limit){
    return Stream.iterate(1L,i->i+1).
            limit(limit).parallel().reduce(0L,Long::sum);
}

第四個方法,爲了不自動拆裝箱帶來的性能損耗,咱們使用 mapToLong 將其轉化爲 LongStream。

private static long iterateStream2(long limit){
  return Stream.iterate(1L,i->i+1).mapToLong(Long::longValue).
    limit(limit).reduce(0L,Long::sum);
}

第五個方法,爲了不自動拆裝箱帶來的性能損耗,咱們使用 mapToLong 將其轉化爲 LongStream,使用並行的方式來計算。

private static long iterateStream2parallel(long limit){
  return Stream.iterate(1L, i->i+1).mapToLong(Long::longValue).parallel().
    limit(limit).reduce(0L,Long::sum);
}

第六個方法,直接使用 LongStream 的 rangeClosed 來生成一個有限流。

private static long iterateStream3(long limit){
  return LongStream.rangeClosed(1,limit).parallel().reduce(0L,Long::sum);
}

第七個方法直接使用 LongStream 的 rangeClosed 來生成一個有限流,經過並行來計算。

private static long iterateStream3(long limit){
  return LongStream.rangeClosed(1,limit).parallel.reduce(0L,Long::sum);
}

測試方法:

private static long functionTest(Function<Long,Long> add, long limit){
  long start = System.currentTimeMillis();
  long result = add.apply(limit);
  long end  = System.currentTimeMillis();
  long cost = end - start;
  return cost;
}

咱們將 limit 設置成 10000000,來看看各個方法的執行時間到底有什麼差別。

forMethod cost time: 7
iterateStream cost time: 241
iterateStreamparallel cost time: 1790
iterateStream2 cost time: 107
iterateStream2parallel cost time: 367
iterateStream3 cost time: 37
iterateStream3parallel cost time: 28

咱們的結論是基於 for 循環,iterate 和 rangeClosed 的方法來統計的。

  • 無限流(iterate)的處理速度偏慢,與 for 循環和 rangeClosed 不在一個數量級。
  • 正確的使用 mapToLong 等方法來避免自動拆裝箱的損耗能提升很大的性能。
  • 某些方法,好比 iterate 並不適合使用並行流,使用並行流的處理速度比串行慢了很多。
  • 某些方法,好比 rangeClosed 適合使用並行流,在數據量比較大的狀況下與 for 循環在一個數量級而且時間比 for 循環還要少(能夠單獨使用 for 循環和 rangeClosed 方法進行測試,iterate 在 limit 比較大的時候會直接拋出 OOM 異常)。

最後給出前人總結的並行流處理速度的總結:

方法 處理速度
ArrayList 優秀
LinkedList
*Stream.range 優秀
Stream.iterate
HashSet
TreeSet
相關文章
相關標籤/搜索