Java函數式編程之Stream流編程

Stream流編程-概念

概念:java

這個Stream並不是是I/O流裏的Stream,也不是集合元素,更不是數據結構,它是JDK1.8帶來的新特性,是一種用函數式編程在集合類上進行復雜操做的工具。Stream就像工廠裏的流水線同樣,有輸入和輸出。Stream不能夠重複遍歷集合裏面的數據,數據在Stream裏面就像水在渠道里面同樣,流過了就一去不復返。編程

簡而言之,Stream是之內部迭代的方式處理集合數據的操做,內部迭代能夠將更多的控制權交給集合類。Stream 和 Iterator 的功能相似,只不過 Iterator 是之外部迭代的形式處理集合的數據。數組

在JDK1.8之前,對集合的操做須要寫出處理的過程,如在集合中篩選出知足條件的數據,須要一 一遍歷集合中的每一個元素,再把每一個元素逐一判斷是否知足條件,最後將知足條件的元素保存返回。而Stream 對集合篩選的操做提供了一種更爲便捷的操做,只需將實現函數接口的篩選條件做爲參數傳遞進來,Stream會自行操做並將合適的元素一樣以 stream 的方式返回,最後進行接收便可。bash

內部迭代與外部迭代:數據結構

使用for循環等利用Iterator進行迭代操做的,咱們都叫作外部迭代,而使用stream流進行迭代操做的叫作內部迭代。內部迭代最明顯的好處就是當數量很大的狀況下,咱們不須要對數據進行拆分,而且能夠經過調用指定函數實現並行遍歷。多線程

外部迭代示例代碼:dom

int[] nums = {1, 2, 3};
// 循環屬於外部迭代
int sum = 0;
for (int num : nums) {
    sum += num;
}
System.out.println("計算結果爲:" + sum);  // 計算結果爲:6

使用stream內部迭代示例代碼:ide

int[] nums = {1, 2, 3};
// 使用stream進行內部迭代
int sum = IntStream.of(nums).sum();
System.out.println("計算結果爲:" + sum);  // 計算結果爲:6

使用stream流操做時的一些概念:函數式編程

  • 中間操做:中間操做的結果是刻畫、描述了一個Stream,並返回了這個Stream,但此操做並無產生一個新集合,這種操做也叫作惰性求值方法
  • 終止操做:從Stream中獲得最終的結果
  • 惰性求值:終止沒有調用的狀況下,中間操做不會執行

如何區分中間操做和終止操做呢?能夠根據操做的返回值類型判斷,若是返回值是Stream,則該操做爲中間操做。若是返回值不是Stream或者爲空,則該操做是終止操做。函數

以下圖所示,前兩個操做是中間操做,只有最後一個操做是終止操做:
Java函數式編程之Stream流編程

能夠形象地理解Stream的操做是對一組粗糙的工藝品原型(即對應的 Stream 數據源)進行加工成顏色統一的工藝品(即最終獲得的結果),第一步篩選出合適的原型(即對應Stream的 filter 的方法),第二步將這些篩選出來的原型工藝品上色(對應Stream的map方法),第三步取下這些上好色的工藝品(即對應Stream的 collect(toList())方法)。在取下工藝品以前進行的操做都是中間操做,能夠有多個或者0箇中間操做,但每一個Stream數據源只能有一次終止操做,不然程序會報錯。

接下來,咱們經過一個簡單的示例來演示以上所提到的幾個概念,代碼以下:

package org.zero01.example.demo;

import java.util.stream.IntStream;

public class StreamDemo {

    public static void main(String[] args) {
        int[] nums = {1, 2, 3};
        // IntStream建立數字流,of則是輸入一個數組,這裏的map就是中間操做(返回stream流的操做),sum則是終止操做
        int sum = IntStream.of(nums).map(i -> i * 2).sum();
        System.out.println("計算結果爲:" + sum);

        System.out.println("惰性求值就是終止操做沒有調用的狀況下,中間操做不會執行");
        IntStream.of(nums).map(StreamDemo::doubleNum);
    }

    public static int doubleNum(int i) {
        System.out.println("doubleNum 方法執行了");
        return i * 2;
    }
}

運行以上代碼,控制檯輸出結果以下,能夠看到因爲惰性求值的緣由,doubleNum方法沒有被調用:

計算結果爲:12
惰性求值就是終止操做沒有調用的狀況下,中間操做不會執行

流的建立

對一個流完整的操做過程:流的建立 -> 中間操做 -> 終止操做。流的建立是第一步,而流的常見建立方式以下表:
Java函數式編程之Stream流編程

代碼示例以下:

public static void main(String[] args) {
    List<String> list = new ArrayList<>();

    // 從集合建立流
    list.stream();
    // 從集合建立並行流
    list.parallelStream();

    // 從數組建立流
    Arrays.stream(new int[]{1, 2, 3, 4, 5});

    // 建立數字流
    IntStream.of(1, 2, 3, 4, 5);
    // 建立1-10的數字流
    IntStream.rangeClosed(1, 10);

    // 使用random建立一個無限流,須要調用limit來限制大小,不然會報錯
    new Random().ints().limit(10);

    // 本身建立流
    Random random = new Random();
    Stream.generate(() -> random.nextInt()).limit(20);
}

流的中間操做

而後咱們來看看流的中間操做,中間操做分類兩類,一類是無狀態操做,一類則是有狀態操做。以下表:
Java函數式編程之Stream流編程

無狀態操做:

  • 當前操做與其餘操做沒有依賴關係。

有狀態操做:

  • 當前操做與其餘操做有依賴關係 (通常而言有狀態操做都是有2個參數傳入) 。例如排序操做,就須要等待其餘操做計算完成後才能進行一個最終的排序。

共同點:

  • 不管是有狀態操做仍是無狀態操做,最終都會返回一個Stream流,能夠繼續使用鏈式的操做調用下去

代碼示例以下:

public static void main(String[] args) {
    String str = "my name is zero";

    // 把每一個單詞的長度打印出來
    System.out.println("---------------map---------------");
    Stream.of(str.split(" ")).map(String::length).forEach(System.out::println);

    // 只打印長度大於2的單詞
    System.out.println("---------------filter---------------");
    Stream.of(str.split(" ")).filter(s -> s.length() > 2)
            .map(String::length).forEach(System.out::println);

    // flatMap 適合用於A元素下有B屬性,而且這個B屬性是個集合,最終獲得全部的A元素裏面的全部B屬性集合
    // 這裏調用了 boxed() 方法的緣由是intStream\LongStream等數字流並不是是Stream的子類,因此須要裝箱
    System.out.println("---------------flatMap---------------");
    Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed())
            .forEach(integer -> System.out.println((char) integer.intValue()));

    // peek 通常用於debug,相似於forEach,不一樣的是peek是中間操做而forEach是終止操做
    System.out.println("---------------peek---------------");
    Stream.of(str.split(" ")).peek(System.out::println).forEach(System.out::println);

    // limit 主要用於限制無限流,咱們能夠結合filter來產生特定區間的隨機數
    System.out.println("---------------limit---------------");
    new Random().ints().filter(i -> i > 100 && i < 1000).limit(10).forEach(System.out::println);
}

流的終止操做

接下來咱們看看流的終止操做,一樣的,終止操做也分類兩類,分別是短路操做和非短路操做。以下表:
Java函數式編程之Stream流編程

短路操做:

  • 短路操做是一種無需等待全部的結果計算完,就能夠結束流的操做。例如從一組數據中,獲得指定的某個數據就結束流,這種就是短路操做

非短路操做:

  • 非短路操做則反之,需等待全部的結果計算完成後才結束流。例如遍歷一個集合中的全部元素,或將一組數據轉換成集合、數組等操做就是非短路操做。

具體代碼及註釋,請參考以下示例:

public static void main(String[] args) {
    String str = "my name is zero";

    // 一般會在使用並行流的時候使用forEachOrdered,forEachOrdered能夠在並行的狀況下保證元素順序的一致
    str.chars().parallel().forEachOrdered(i -> System.out.print((char) i));
    System.out.println();
    // 而forEach則沒法在並行的狀況下保證元素順序的一致
    str.chars().parallel().forEach(i -> System.out.print((char) i));
    System.out.println();

    // collect屬於收集器,使用能夠將放入流裏面的數據收集成集合類型
    List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList());
    System.out.println(list);

    // reduce用於縮減、歸約數據集,咱們可使用reduce來將數組拼接成一個字符串
    Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
    System.out.println(letters.orElse(""));

    // 帶初始化值的reduce,這樣咱們就無需經過Optional去判斷空值了
    String reduce = Stream.of(str.split(" ")).reduce("", (s1, s2) -> s1 + "|" + s2);
    System.out.println(reduce);

    // 使用reduce計算字符串長度的總和
    Integer lengthCount = Stream.of(str.split(" ")).map(String::length).reduce(0, (s1, s2) -> s1 + s2);
    System.out.println(lengthCount);

    // 使用max能夠經過傳入比較器在一組數據中取出最大值
    Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
    System.out.println(max.orElse(""));

    // 使用findFirst拿出第一個元素
    OptionalInt first = new Random().ints().findFirst();
    System.out.println(first.orElse(0));

    // 使用findAny隨機拿出一個元素
    OptionalInt any = new Random().ints().findAny();
    System.out.println(any.orElse(0));

    // 使用allMatch匹配全部的元素是否都爲zero
    boolean is = Stream.of(str.split(" ")).allMatch("zero"::equals);
    System.out.println(is);
}

並行流

以上的例子中大多數建立的都是單線程流,其實咱們能夠建立多線程並行的Stream流,即並行流。使用並行流時,咱們並不需關心多線程執行以及任務拆分等問題,由於Stream都已經幫咱們管理好了,因此用起來也是很方便的。

咱們先來看一個不使用並行流的示例,如下代碼會每隔3秒打印一行信息:

public static void main(String[] args) {
    // 不使用並行流
    IntStream.range(1, 100).peek(ParallelStreamDemo::debug).sum();
}

public static void debug(int i) {
    System.out.println("debug" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

運行結果以下:
Java函數式編程之Stream流編程

而使用並行流後,會發現同時打印了多行信息。代碼以下:

public static void main(String[] args) {
    // 不使用並行流
    IntStream.range(1, 100).parallel().peek(ParallelStreamDemo::debug).sum();
}

public static void debug(int i) {
    System.out.println("debug" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

至於同時會打印多少行,默認取決於cpu的核心數量,例如我電腦cpu有4個核心,因此會同時打印四行,也就是說開啓了四個線程。運行結果以下:
Java函數式編程之Stream流編程

經過以上的例子,咱們得知能夠調用parallel方法來建立並行流。那麼一樣的,也能夠調用相似的方法建立串行流,這個方法就是sequential。若是如今有一個需求:當進行第一步操做時需使用並行流,而第二步操做則需使用串行流。那麼咱們能夠經過結合這兩個方法來實現這個需求嗎?咱們來看一個簡單的例子就知道了,代碼以下:

public static void main(String[] args) {
    IntStream.range(1, 100)
            // 1.調用parallel產生並行流
            .parallel().peek(ParallelStreamDemo::debug)
            // 2.調用sequential產生串行流
            .sequential().peek(ParallelStreamDemo::debug2).sum();
}

public static void debug(int i) {
    System.out.println("debug" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

public static void debug2(int i) {
    System.err.println("debug2" + i);
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

運行結果以下:
Java函數式編程之Stream流編程

從運行結果能夠看到,運行過程始終是串行的,是一行行打印的。因此能夠得出一個結論:屢次調用 parallel/sequential方法,會以最後一次調用的爲準,天然就沒法實現以上所提到的需求了。


接下來咱們看看並行流裏線程相關的東西,在上文中,咱們提到了默認狀況下,並行流開啓的線程數量取決於cpu的核心數量。那麼並行流使用的是哪一個線程池?如何設置開啓的線程數量?

先來回答第一個問題,並行流裏使用的線程池是java.util.concurrent.ForkJoinPool,這一點能夠直接在方法裏打印線程名稱得知,因此這裏就不演示了。對ForkJoinPool感興趣的話,能夠查閱fork/join相關的概念。

關於第二個設置線程數量的問題,則是須要在建立並行流以前,設置ForkJoinPool裏parallelism屬性的值,例如我要開啓20個線程,具體代碼以下:

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

還有一點須要注意的就是,全部的並行流默認都將會使用同一個ForkJoinPool線程池,若咱們的並行任務比較多的話,可能會出現任務阻塞的狀況。若是想要防止一些比較關鍵的任務出現阻塞的狀況,則須要自行建立線程池去處理這些任務。以下示例:

public static void main(String[] args) {
    // 使用本身建立的線程池,不使用默認線程池,防止任務阻塞
    ForkJoinPool forkJoinPool = new ForkJoinPool(20);
    forkJoinPool.submit(() -> IntStream.range(1, 100).parallel().peek(ParallelStreamDemo::debug).sum());
    forkJoinPool.shutdown();  // 關閉線程池

    // 防止主線程提早結束
    synchronized (forkJoinPool) {
        try {
            forkJoinPool.wait();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

收集器

本小節咱們來看一下收集器相關的東西,收集器就是將流處理完後的數據收集起來,例如將數據收集到一個集合裏,或者對數據求和、拼接成字符串等行爲都屬於收集器。

如下使用一組例子來演示一下收集器的常見使用方式。首先定義一個Student類以及相關的枚舉類,代碼以下:

// ...省略getter/setter以及全參/無參構造函數...
public class Student {
    private String name;
    private int age;
    private Gender gender;
    private Grade grade;
}

/**
 * 性別
 */
enum Gender {
    MALE, FEMALE
}

/**
 * 班級
 */
enum Grade {
    ONE, TWO, THREE, FOUR;
}

使用收集器的示例代碼以下:

public static void main(String[] args) {
    // 測試數據
    List<Student> students = Arrays.asList(
            new Student("小明", 10, Gender.MALE, Grade.ONE),
            new Student("大明", 9, Gender.MALE, Grade.THREE),
            new Student("小白", 8, Gender.FEMALE, Grade.TWO),
            new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
            new Student("小紅", 7, Gender.FEMALE, Grade.THREE),
            new Student("小黃", 13, Gender.MALE, Grade.ONE),
            new Student("小青", 13, Gender.FEMALE, Grade.THREE),
            new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
            new Student("小王", 6, Gender.MALE, Grade.ONE),
            new Student("小李", 6, Gender.MALE, Grade.ONE),
            new Student("小馬", 14, Gender.FEMALE, Grade.FOUR),
            new Student("小劉", 13, Gender.MALE, Grade.FOUR));

    // 獲得全部學生的年齡列表
    List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toList());
    // 能夠經過toCollection指定集合實現類型
    // List<Integer> ages = students.stream().map(Student::getAge).collect(Collectors.toCollection(LinkedList::new));
    System.out.println("全部學生的年齡列表: " + ages);

    // 統計彙總信息
    IntSummaryStatistics ageSummaryStatistics = students.stream().collect(Collectors.summarizingInt(Student::getAge));
    System.out.println("年齡彙總信息: " + ageSummaryStatistics);

    // 分塊-按照規則把數據分紅兩塊,這裏按性別將學生數據分爲兩塊
    Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
    // 這裏使用了Apache集合工具類進行打印
    MapUtils.verbosePrint(System.out, "男女學生列表: ", genders);

    // 分組-按照規則把數據分爲多組數據,這裏按班級將學生數據進行分組
    Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade));
    MapUtils.verbosePrint(System.out, "學生班級列表: ", grades);

    // 統計每一個分組裏的數據-統計全部班級裏學生的數量
    Map<Grade, Long> gradesCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
    MapUtils.verbosePrint(System.out, "全部班級學生人數列表: ", gradesCount);
}

運行結果以下:

全部學生的年齡列表: [10, 9, 8, 13, 7, 13, 13, 9, 6, 6, 14, 13]
年齡彙總信息: IntSummaryStatistics{count=12, sum=121, min=6, average=10.083333, max=14}
男女學生列表:  = 
{
    false = [Student(name=小白, age=8, gender=FEMALE, grade=TWO), Student(name=小黑, age=13, gender=FEMALE, grade=FOUR), Student(name=小紅, age=7, gender=FEMALE, grade=THREE), Student(name=小青, age=13, gender=FEMALE, grade=THREE), Student(name=小紫, age=9, gender=FEMALE, grade=TWO), Student(name=小馬, age=14, gender=FEMALE, grade=FOUR)]
    true = [Student(name=小明, age=10, gender=MALE, grade=ONE), Student(name=大明, age=9, gender=MALE, grade=THREE), Student(name=小黃, age=13, gender=MALE, grade=ONE), Student(name=小王, age=6, gender=MALE, grade=ONE), Student(name=小李, age=6, gender=MALE, grade=ONE), Student(name=小劉, age=13, gender=MALE, grade=FOUR)]
}
學生班級列表:  = 
{
    TWO = [Student(name=小白, age=8, gender=FEMALE, grade=TWO), Student(name=小紫, age=9, gender=FEMALE, grade=TWO)]
    FOUR = [Student(name=小黑, age=13, gender=FEMALE, grade=FOUR), Student(name=小馬, age=14, gender=FEMALE, grade=FOUR), Student(name=小劉, age=13, gender=MALE, grade=FOUR)]
    ONE = [Student(name=小明, age=10, gender=MALE, grade=ONE), Student(name=小黃, age=13, gender=MALE, grade=ONE), Student(name=小王, age=6, gender=MALE, grade=ONE), Student(name=小李, age=6, gender=MALE, grade=ONE)]
    THREE = [Student(name=大明, age=9, gender=MALE, grade=THREE), Student(name=小紅, age=7, gender=FEMALE, grade=THREE), Student(name=小青, age=13, gender=FEMALE, grade=THREE)]
}
全部班級學生人數列表:  = 
{
    TWO = 2
    FOUR = 3
    ONE = 4
    THREE = 3
}

Stream運行機制

經過以上幾個小節的內容,咱們已經掌握了流的基本操做。可是咱們對流的運行機制還不太清楚,因此本小節咱們將簡單認識一下Stream的運行機制。

一樣的,咱們首先來編寫一段簡單的Stream操做代碼,以下:

public static void main(String[] args) {
    Random random = new Random();
    // 隨機產生數據
    Stream<Integer> stream = Stream.generate(random::nextInt)
            // 產生500個 ( 無限流須要短路操做. )
            .limit(500)
            // 第1個無狀態操做
            .peek(s -> print("peek: " + s))
            // 第2個無狀態操做
            .filter(s -> {
                print("filter: " + s);
                return s > 1000000;
            });

    // 終止操做
    stream.count();
}

public static void print(String s) {
    // 5毫秒打印一第二天志
    System.out.println(Thread.currentThread().getName() + " > " + s);
    try {
        TimeUnit.MILLISECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

運行以上代碼,控制檯輸出以下:
Java函數式編程之Stream流編程

1.從運行結果中能夠看到,peek和filter是交替執行的,也就是說全部操做都是鏈式調用,一個元素只迭代一次

2.既然是一個鏈式的調用,那麼這條鏈是怎麼樣的呢?是如何維護的呢?咱們在終止操做上打上斷點,經過debug運行。以下圖,能夠看到每個中間操做返回一個新的流,而流裏面有一個屬性sourceStage,它都指向同一個地方,就是鏈表的頭Head:
Java函數式編程之Stream流編程

3.而Head裏指向了nextStage,nextStage裏又指向了nextStage,一直指向到鏈尾的null值。就如:Head -&gt; nextStage -&gt; nextStage -&gt; ... -&gt; null
Java函數式編程之Stream流編程

這就是Stream裏實現鏈式調用所需的一個鏈表結構,是一條單鏈


以上的例子只有無狀態操做,若是加入有狀態操做,會發生什麼變化呢?示例代碼以下:

public static void main(String[] args) {
    Random random = new Random();
    // 隨機產生數據
    Stream<Integer> stream = Stream.generate(random::nextInt)
            // 產生500個 ( 無限流須要短路操做. )
            .limit(500)
            // 第1個無狀態操做
            .peek(s -> print("peek: " + s))
            // 第2個無狀態操做
            .filter(s -> {
                print("filter: " + s);
                return s > 1000000;
            })
            // 有狀態操做
            .sorted((i1, i2) -> {
                print("排序: " + i1 + ", " + i2);
                return i1.compareTo(i2);
            })
            // 又一個無狀態操做
            .peek(s -> print("peek2: " + s)).parallel();

    // 終止操做
    stream.count();
}

運行以上代碼,控制檯輸出以下:

main > peek: -1564323985
main > filter: -1564323985
main > peek: -779802182
main > filter: -779802182
main > peek: -498652682
main > filter: -498652682
main > 排序: 78555310, 50589406
main > 排序: 74439402, 50589406
main > 排序: 56492454, 50589406
main > 排序: 39808935, 50589406
main > 排序: 39808935, 39002482
main > peek2: 25284397
main > peek2: 29672249
main > peek2: 29800626
main > peek2: 32299397

從輸出的日誌信息能夠發現,用於排序的中間操做截斷了流,並無像無狀態操做那樣交替執行。因此咱們就能夠得知有狀態操做會把無狀態操做截斷,會單獨進行處理而不會交替執行。

而後咱們再加上並行流,看看並行狀況下又是怎樣的,輸出的日誌信息以下:

ForkJoinPool.commonPool-worker-3 > peek: -332079048
ForkJoinPool.commonPool-worker-3 > filter: -332079048
ForkJoinPool.commonPool-worker-1 > filter: 1974510987
ForkJoinPool.commonPool-worker-4 > peek: -1727742841
ForkJoinPool.commonPool-worker-4 > filter: -1727742841
main > 排序: 58979900, 74247464
main > 排序: 58979900, 57671811
main > 排序: 53543451, 57671811
main > 排序: 53543451, 42862261
main > 排序: 43624983, 42862261
ForkJoinPool.commonPool-worker-0 > peek2: 1152454167
ForkJoinPool.commonPool-worker-2 > peek2: 1468420859
ForkJoinPool.commonPool-worker-5 > peek2: 736525554
ForkJoinPool.commonPool-worker-6 > peek2: 1×××50615

從日誌打印能夠看到,排序操做依舊是main線程執行的,而其餘的操做則是線程池裏的線程執行的。因此咱們經過這個例子能夠得知即使在並行的環境下,有狀態的中間操做不必定能並行操做。

順帶說明一下 parallel/ sequetial 這2個操做也是中間操做 (也是返回stream) ,可是區別在於它們不會建立流,,它們只修改 Head 的並行標誌,由於這兩個方法修改的是同一個地方,因此纔會以最後一次調用的爲準:
Java函數式編程之Stream流編程

相關文章
相關標籤/搜索