Java8函數式編程(二):類比Spark RDD算子的Stream流操做

1 Stream流

對集合進行迭代時,可調用其iterator方法,返回一個iterator對象,以後即可以經過該iterator對象遍歷集合中的元素,這被稱爲外部迭代(for循環自己正是封裝了其的語法糖),其示意圖以下:java

Java8函數式編程(二):類比Spark RDD算子的Stream流操做

除此以外,還有內部迭代方法,這正是這裏要說明的集合的stream()方法返回的Stream對象的一系列操做,好比,要統計一個數字列表的偶數元素個數,當使用Stream對象的操做時,以下:編程

List<Integer> list = new ArrayList<Integer>(){{
    add(1);
    add(2);
    add(3);
}};

long count = list.stream().filter(num -> num % 2 == 0).count();
System.out.println(count);  // 1

其示意圖以下:分佈式

Java8函數式編程(二):類比Spark RDD算子的Stream流操做

上面提供的例子,好比filter,其參數爲一個lambda表達式,因此Stream實際上是用函數式編程方式在集合類上進行復雜操做的工具。ide

2 Stream流操做與Spark RDD算子

其實有Spark經驗的人開始使用Stream流操做時,會有似曾相識的感受,好像一切都那麼熟悉。函數式編程

參考Spark RDD算子介紹的文章:《Spark RDD算子實戰》https://blog.51cto.com/xpleaf/2108481函數

下面從操做對象(名詞)和對象操做(動詞)兩個角度來簡單對比一下。工具

2.1 操做對象

Spark RDD算子的操做對象是RDD,中文意思是彈性分佈式數據集,對用戶而言,它就是相似集合同樣的對象,裏面存的是數據,只是底層它的數據可能分佈於各個節點的各個partition,但無論怎樣,其本質仍是數據集。性能

Stream流操做的操做對象是集合,集合本質也是一種數據集,只是相比RDD,它是單機的。翻譯

2.2 對象操做

Spark RDD算子有兩種類型,分別是Transformation算子和Action算子,前者是延遲計算的,它僅僅記住了數據的邏輯操做,並無真正執行,後者是真正觸發Transformation算子的計算。code

Stream流操做也有兩種類型,分別是惰性求值和及早求值(我的以爲這翻譯很差),前者也只是記錄了惰性求值的邏輯操做,後者纔是真正觸發操做。

能夠看到其二者是很是類似的,一個是對分佈式數據進行的各類操做,一個是單機數據進行的各類操做,把計算分爲延遲計算和觸發計算兩種,好處是顯而易見的:當對數據集進行屢次邏輯操做時,有可能迭代只須要一次就可能完成,這樣真正觸發計算時,一次迭代帶來的性能提高是顯著的,好比對於過濾和計算這兩個操做(前面計算偶數的操做),在一次迭代中就可以完成。

固然,不只類型類似,其自己提供的操做的名稱而言,都是類似的,有些東西真的是通用的。

3 經常使用Stream流操做

每一個操做都用一個通俗易懂的例子來進行說明。

3.1 及早求值操做

3.1.1 collect(toList())

其做用是將Stream流中的元素收集起來,造成List、Set或Map等。

List<Integer> list = Stream.of(1, 2, 3).collect(Collectors.toList());

System.out.println(list);   // [1, 2, 3]

1.Stream.of()方法用於方便地生成Stream流;

2.Collectors還有toSet()、toMap()等方法,詳見其API。

3.1.2 forEach(Consumer)

對集合中的每一個元素進行操做,其參數是Consumer<T>函數接口。

Consumer<Integer> printNum = System.out::print;
Stream.of(1, 2, 3).forEach(printNum);   // 123

System.out::print表示使用System.out類中的print方法,至關於lambda表達式:element -> System.out.print(element);

上面的例子也能夠一步到位:

Stream.of(1, 2, 3).forEach(System.out::print);  // 123

3.1.3 max和min

其參數爲Comparator對象,返回一個Optional對象,Optional說明其結果可能有,也可能沒有(好比對空值的Stream流操做時)。

// 計算數值流中的最大值
Optional<Integer> maxOptional = Stream.of(1, 2, 3).max(Comparator.comparing(num -> num));
System.out.println(maxOptional.get());  // 3

// 找出字符串流中長度最小的字符串
Optional<String> minOptional = Stream.of("a", "ab", "abc").min(Comparator.comparing(String::length));
System.out.println(minOptional.get());  // a

另外,其確實是及早求值操做,能夠驗證一下:

Stream.of(1, 2, 3).max(Comparator.comparing(num -> {
    System.out.println(num);
    return num;
}));

輸出:

1
2
2
3

3.2 惰性求值操做

3.2.1 map

其參數爲Function&lt;T,R&gt;,用於將Stream流中的值轉換爲另一種流。

// 將字母轉換爲大寫
Stream.of("a", "b", "hello")
    .map(String::toUpperCase)
    .forEach(element -> System.out.print(element + " "));  // A B HELLO

3.2.2 filter

其參數爲Predicate&lt;T&gt;,過濾Stream流中的元素。

// 找出偶數
List<Integer> list = Stream.of(1, 2, 3).filter(num -> num % 2 == 0).collect(Collectors.toList());

System.out.println(list);   // [2]

3.2.3 flatMap

其參數爲Function&lt;T,R&gt;,只是此時R限定爲Stream,將Stream流中的值轉換爲更多的流。

// 找出字符串中的單詞
List<String> list = Stream.of("hello you", "hello me")
    .flatMap(line -> Arrays.stream(line.split(" "))).collect(Collectors.toList());

System.out.println(list);   // [hello, you, hello, me]

是否是感受跟Spark的wordcount例子有點像。

3.2.4 reduce

其參數爲BinaryOperator&lt;T&gt;,返回一個Optional對象,Optional說明其結果可能有,也可能沒有(好比對空值的Stream流操做時,而且沒有指定初始值),用於歸約操做。

// 求和
Integer res = Stream.of(1, 2, 3).reduce((acc, element) -> acc + element).get();

// 指定初始值6後,Stream的reduce操做結果確定有值的,所以其返回的不是Optional,而直接是6所屬的類型,即Integer
Integer res2 = Stream.of(1, 2, 3).reduce(6, (acc, element) -> acc + element);

System.out.println(String.format("res: %s, res2: %s", res, res2));  // res: 6, res2: 12

4 參考

Java 8 Lambdas,Richard Warburton著(O’Reilly,2014)》

相關文章
相關標籤/搜索