Java 8新特性(二):Stream API

本文首發於一書生VOID的博客。 原文連接:Java 8新特性(二):Stream APIhtml


本篇文章繼續介紹Java 8的另外一個新特性——Stream API。新增的Stream API與InputStreamOutputStream是徹底不一樣的概念,Stream API是對Java中集合操做的加強,能夠利用它進行各類過濾、排序、分組、聚合等操做。Stream API配合Lambda表達式能夠加大的提升代碼可讀性和編碼效率,Stream API也支持並行操做,咱們不用再花費不少精力來編寫容易出錯的多線程代碼了,Stream API已經替咱們作好了,而且充分利用多核CPU的優點。藉助Stream API和Lambda,開發人員能夠很容易的編寫出高性能的併發處理程序。java

Stream API簡介

Stream API是Java 8中加入的一套新的API,主要用於處理集合操做,不過它的處理方式與傳統的方式不一樣,稱爲「數據流處理」。流(Stream)相似於關係數據庫的查詢操做,是一種聲明式操做。好比要從數據庫中獲取全部年齡大於20歲的用戶的名稱,並按照用戶的建立時間進行排序,用一條SQL語句就能夠搞定,不過使用Java程序實現就會顯得有些繁瑣,這時候可使用流:git

List<String> userNames =
        users.stream()
        .filter(user -> user.getAge() > 20)
        .sorted(comparing(User::getCreationDate))
        .map(User::getUserName)
        .collect(toList());
複製代碼

能夠把流跟集合作一個比較。在Java中,集合是一種數據結構,或者說是一種容器,用於存放數據,流不是容器,它不關心數據的存放,只關注如何處理。能夠把流當作是Java中的Iterator,不過它比Iterator強大多了。github

流與集合另外一個區別在於他們的遍歷方式,遍歷集合一般使用for-each方式,這種方式稱爲外部迭代,而流使用內部迭代方式,也就是說它幫你把迭代的工做作了,你只須要給出一個函數來告訴它接下來要幹什麼:數據庫

// 外部迭代
List<String> list = Arrays.asList("A", "B", "C", "D");
for (String str : list) {
    System.out.println(str);
}

// 內部迭代
list.stream().forEach(System.out::println);
複製代碼

在這個大數據的時代,數據變得愈來愈多樣化,不少時候咱們會面對海量數據,並對其作一些複雜的操做(好比統計,分組),依照傳統的遍歷方式(for-each),每次只能處理集合中的一個元素,而且是按順序處理,這種方法是極其低效的。你也許會想到並行處理,可是編寫多線程代碼並不是易事,很容易出錯而且維護困難。不過在Java 8以後,你可使用Stream API來解決這一問題。編程

Stream API將迭代操做封裝到了內部,它會自動的選擇最優的迭代方式,而且使用並行方式處理時,將集合分紅多段,每一段分別使用不一樣的線程處理,最後將處理結果合併輸出。api

須要注意的是,流只能遍歷一次,遍歷結束後,這個流就被關閉掉了。若是要從新遍歷,能夠從數據源(集合)中從新獲取一個流。若是你對一個流遍歷兩次,就會拋出java.lang.IllegalStateException異常:數組

List<String> list = Arrays.asList("A", "B", "C", "D");
Stream<String> stream = list.stream();
stream.forEach(System.out::println);
stream.forEach(System.out::println); // 這裏會拋出java.lang.IllegalStateException異常,由於流已經被關閉
複製代碼

流一般由三部分構成:bash

  1. 數據源:數據源通常用於流的獲取,好比本文開頭那個過濾用戶的例子中users.stream()方法。
  2. 中間處理:中間處理包括對流中元素的一系列處理,如:過濾(filter()),映射(map()),排序(sorted())。
  3. 終端處理:終端處理會生成結果,結果能夠是任何不是流值,如List<String>;也能夠不返回結果,如stream.forEach(System.out::println)就是將結果打印到控制檯中,並無返回。

建立流

建立流的方式有不少,具體能夠劃分爲如下幾種:數據結構

由值建立流

使用靜態方法Stream.of()建立流,該方法接收一個變長參數:

Stream<Stream> stream = Stream.of("A", "B", "C", "D");
複製代碼

也可使用靜態方法Stream.empty()建立一個空的流:

Stream<Stream> stream = Stream.empty();
複製代碼

由數組建立流

使用靜態方法Arrays.stream()從數組建立一個流,該方法接收一個數組參數:

String[] strs = {"A", "B", "C", "D"};
Stream<Stream> stream = Arrays.stream(strs);
複製代碼

經過文件生成流

使用java.nio.file.Files類中的不少靜態方法均可以獲取流,好比Files.lines()方法,該方法接收一個java.nio.file.Path對象,返回一個由文件行構成的字符串流:

Stream<String> stream = Files.lines(Paths.get("text.txt"), Charset.defaultCharset());
複製代碼

經過函數建立流

java.util.stream.Stream中有兩個靜態方法用於從函數生成流,他們分別是Stream.generate()Stream.iterate()

// iteartor
Stream.iterate(0, n -> n + 2).limit(51).forEach(System.out::println);

// generate
Stream.generate(() -> "Hello Man!").limit(10).forEach(System.out::println);
複製代碼

第一個方法會打印100之內的全部偶數,第二個方法打印10個Hello Man!。值得注意的是,這兩個方法生成的流都是無限流,沒有固定大小,能夠無窮的計算下去,在上面的代碼中咱們使用了limit()來避免打印無窮個值。

通常來講,iterate()用於生成一系列值,好比生成以當前時間開始以後的10天的日期:

Stream.iterate(LocalDate.now(), date -> date.plusDays(1)).limit(10).forEach(System.out::println);
複製代碼

generate()方法用於生成一些隨機數,好比生成10個UUID:

Stream.generate(() -> UUID.randomUUID().toString()).limit(10).forEach(System.out::println);
複製代碼

使用流

Stream接口中包含許多對流操做的方法,這些方法分別爲:

  • filter():對流的元素過濾
  • map():將流的元素映射成另外一個類型
  • distinct():去除流中重複的元素
  • sorted():對流的元素排序
  • forEach():對流中的每一個元素執行某個操做
  • peek():與forEach()方法效果相似,不一樣的是,該方法會返回一個新的流,而forEach()無返回
  • limit():截取流中前面幾個元素
  • skip():跳過流中前面幾個元素
  • toArray():將流轉換爲數組
  • reduce():對流中的元素歸約操做,將每一個元素合起來造成一個新的值
  • collect():對流的彙總操做,好比輸出成List集合
  • anyMatch():匹配流中的元素,相似的操做還有allMatch()noneMatch()方法
  • findFirst():查找第一個元素,相似的還有findAny()方法
  • max():求最大值
  • min():求最小值
  • count():求總數

下面逐一介紹這些方法的用法。

過濾和排序

Stream.of(1, 8, 5, 2, 1, 0, 9, 2, 0, 4, 8)
    .filter(n -> n > 2)     // 對元素過濾,保留大於2的元素
    .distinct()             // 去重,相似於SQL語句中的DISTINCT
    .skip(1)                // 跳過前面1個元素
    .limit(2)               // 返回開頭2個元素,相似於SQL語句中的SELECT TOP
    .sorted()               // 對結果排序
    .forEach(System.out::println);
複製代碼

查找和匹配

Stream中提供的查找方法有anyMatch()allMatch()noneMatch()findFirst()findAny(),這些方法被用來查找或匹配某些元素是否符合給定的條件:

// 檢查流中的任意元素是否包含字符串"Java"
boolean hasMatch = Stream.of("Java", "C#", "PHP", "C++", "Python")
        .anyMatch(s -> s.equals("Java"));

// 檢查流中的全部元素是否都包含字符串"#"
boolean hasAllMatch = Stream.of("Java", "C#", "PHP", "C++", "Python")
        .allMatch(s -> s.contains("#"));

// 檢查流中的任意元素是否沒有以"C"開頭的字符串
boolean hasNoneMatch = Stream.of("Java", "C#", "PHP", "C++", "Python")
        .noneMatch(s -> s.startsWith("C"));

// 查找元素
Optional<String> element = Stream.of("Java", "C#", "PHP", "C++", "Python")
        .filter(s -> s.contains("C"))
        // .findFirst() // 查找第一個元素
        .findAny();         // 查找任意元素
複製代碼

注意最後一行代碼的返回類型,是一個Optional<T>類(java.util.Optional),它一個容器類,表明一個值存在或不存在。上面的代碼中,findAny()可能什麼元素都沒找到。Java 8的庫設計人員引入了Optional<T>,這樣就不用返回衆所周知容易出問題的null了。有關Optional<T>類的詳細用法,將在下一篇文章中介紹。

實際上測試結果發現,findFirst()findAny()返回的都是第一個元素,那麼二者之間到底有什麼區別?經過查看javadoc描述,大體意思是findAny()是爲了提升並行操做時的性能,若是沒有特別須要,仍是建議使用findAny()方法。

歸約

歸約操做就是將流中的元素進行合併,造成一個新的值,常見的歸約操做包括求和,求最大值或最小值。歸約操做通常使用reduce()方法,與map()方法搭配使用,能夠處理一些很複雜的歸約操做。

// 獲取流
List<Book> books = Arrays.asList(
       new Book("Java編程思想", "Bruce Eckel", "機械工業出版社", 108.00D),
       new Book("Java 8實戰", "Mario Fusco", "人民郵電出版社", 79.00D),
       new Book("MongoDB權威指南(第2版)", "Kristina Chodorow", "人民郵電出版社", 69.00D)
);

// 計算全部圖書的總價
Optional<Double> totalPrice = books.stream()
       .map(Book::getPrice)
       .reduce((n, m) -> n + m);

// 價格最高的圖書
Optional<Book> expensive = books.stream().max(Comparator.comparing(Book::getPrice));
// 價格最低的圖書
Optional<Book> cheapest = books.stream().min(Comparator.comparing(Book::getPrice));
// 計算總數
long count = books.stream().count()
複製代碼

在計算圖書總價的時候首先使用map()方法獲得全部圖書價格的流,而後再使用reduce()方法進行歸約計算。與map()方法相似的還有一個flatMap()flatMap()方法讓你把一個流中的每一個值都換成另外一個流,而後把全部的流鏈接起來成爲一個新的流。看看下面的代碼:

List<String[]> result = Stream.of("Hello Man")
        .map(s -> s.split(""))
        .collect(Collectors.toList());
複製代碼

上面代碼返回的結果是一個List<String[]>類型,也就是[["H", "e", "l", "l", "o"], ["M", "a", "n"]]這種結構,而咱們想要的到["H", "e", "l", "l", "o", "M", "a", "n"]這種結構,這時候就須要使用flatMap()方法了:

List<String> result = Stream.of("Hello Man")
        .map(s -> s.split(""))
        .flatMap(Arrays::stream)
        .collect(Collectors.toList());
複製代碼

使用flatMap()方法的效果是,各個數組並非分別映射成一個流,而是映射成流的內容。全部使用map(Arrays::stream)時生成的單個流都被合併起來,也就是對流扁平化操做。

數據收集

前面兩部份內容分別爲流式數據處理的前兩個步驟:從數據源建立流、使用流進行中間處理。下面咱們介紹流式數據處理的最後一個步驟——數據收集。

數據收集是流式數據處理的終端處理,與中間處理不一樣的是,終端處理會消耗流,也就是說,終端處理以後,這個流就會被關閉,若是再進行中間處理,就會拋出異常。數據收集主要使用collect方法,該方法也屬於歸約操做,像reduce()方法那樣能夠接收各類作法做爲參數,將流中的元素累積成一個彙總結果,具體的作法是經過定義新的Collector接口來定義的。

在前面部分的例子中使用收集器(Collector)是由java.util.stream.Collectors工具類中的toList()方法提供,Collectors類提供了許多經常使用的方法用於處理數據收集,常見的有歸約、彙總、分組等。

歸約和彙總

咱們使用前面歸約操做中計算圖書總價,最大值,最小值,輸入總數那個例子來看看收集器如何進行上述歸約操做:

// 求和
long count = books.stream().collect(counting());

// 價格最高的圖書
Optional<Book> expensive = books.stream().collect(maxBy(comparing(Book::getPrice)));

// 價格最低的圖書
Optional<Book> cheapest = books.stream().collect(minBy(comparing(Book::getPrice)));
複製代碼

上面的代碼假設你已經使用靜態導入了CollectorsComparator兩個類,這樣你就不用再去寫Collectors.counting()Comparator.comparing()這樣的代碼了:

import static java.util.stream.Collectors.*;
import static java.util.Comparator.*;
複製代碼

Collectors工具類爲咱們提供了用於彙總的方法,包括summarizingInt()summarizingLong()summarizingDouble(),因爲圖書的價格爲Double類型,因此咱們使用summarizingDouble()方法進行彙總。該方法會返回一個DoubleSummaryStatistics對象,包含一系列歸約操做的方法,如:彙總、計算平均數、最大值、最小值、計算總數:

DoubleSummaryStatistics dss = books.stream().collect(summarizingDouble(Book::getPrice));
double sum = dss.getSum();          // 彙總
double average = dss.getAverage();  // 求平均數
long count = dss.getCount();        // 計算總數
double max = dss.getMax();          // 最大值
double min = dss.getMin();          // 最小值
複製代碼

Collectors類還包含一個joining()方法,該方法用於鏈接字符串:

String str = Stream.of("A", "B", "C", "D").collect(joining(","));
複製代碼

上面的代碼用於將流中的字符串經過逗號鏈接成一個新的字符串。

分組

和關係數據庫同樣,流也提供了相似於數據庫中GROUP BY分組的特性,由Collectors.groupingBy()方法提供:

Map<String, List<Book>> booksGroup = books.stream().collect(groupingBy(Book::getPublisher));
複製代碼

上面的代碼按照出版社對圖書進行分組,分組的結果是一個Map對象,Mapkey值是出版社的名稱,value值是每一個出版社分組對應的集合。分組方法groupingBy()接收一個Function接口做爲參數,上面的例子中咱們使用了方法引用傳遞了出版社做爲分組的依據,但實際狀況可能比這複雜,好比將價格在0-50之間的書籍分紅一組,50-100之間的分紅一組,超過100的分紅一組,這時候,咱們能夠直接使用Lambda表達式來表示這個分組邏輯:

Map<String, List<Book>> booksGroup = books
    .stream()
    .collect(groupingBy(book -> {
        if (book.getPrice() > 0 && book.getPrice() <= 50) {
            return "A";
        } else if (book.getPrice() > 50 && book.getPrice() <=100) {
            return "B";
        } else {
            return "C";
        }
    }));
複製代碼

groupingBy()方法還支持多級分組,他有一個重載方法,除了接收一個Function類型的參數外,還接收一個Collector類型的參數:

Map<String, Map<String, List<Book>>> booksGroup = books.stream().collect(
        groupingBy(Book::getPublisher, groupingBy(book -> {
            if (book.getPrice() > 0 && book.getPrice() <= 50) {
                return "A";
            } else if (book.getPrice() > 50 && book.getPrice() <=100) {
                return "B";
            } else {
                return "C";
            }
        }))
);
複製代碼

上面的代碼將以前兩個分組合併成一個,實現了多級分組,首先按照出版社進行分組,而後按照價格進行分組,返回類型是一個Map<String, Map<String, List<Book>>>groupingBy()的第二個參數能夠是任意類型,只要是Collector接口的實例就能夠,好比先分組,再統計數量:

Map<String, Long> countGroup = books.stream()
        .collect(groupingBy(Book::getPublisher, counting()));
複製代碼

還能夠在進行分組後獲取每組中價格最高的圖書:

Map<String, Book> expensiveGroup = books.stream()
        .collect(groupingBy(Book::getPublisher, collectingAndThen(
            maxBy(comparingDouble(Book::getPrice)),
                Optional::get
        )));
複製代碼

並行數據處理

在Java 7以前,處理並行數據集合很是麻煩,首先須要將一個龐大數據集合分紅幾個子集合;而後須要爲每個子集合編寫多線程處理程序,還須要對他們作線程同步來避免訪問共享變量致使處理結果不許確;最後,等待全部線程處理完畢後將處理結果合併。在Java 7以後新添加了一個fork/join的框架,讓這一切變得更加簡單。

並行流

並行流使用集合的parallelStream()方法能夠獲取一個並行流。Java內部會將流的內容分割成若干個子部分,而後將它們交給多個線程並行處理,這樣就將工做的負擔交給多核CPU的其餘內核處理。

咱們經過一個簡單粗暴的例子演示並行流的處理性能。假設有一個方法,接受一個數字n做爲參數,返回從1到n的全部天然數之和:

public static long sequentialSum(long n) {
	return Stream.iterate(1L, i -> i + 1)
			.limit(n)
			.reduce(0L, Long::sum);
}
複製代碼

上面的方法也能夠經過傳統的for循環方式實現:

public static long iterativeSum(long n) {
	long result = 0;
	for (long i = 1L; i <= n; i++) {
		result += i;
	}
	return result;
}
複製代碼

編寫測試代碼:

public static void main(String[] args) {
	long number = 10000000L;
    System.out.println("Sequential Sum: " + sumPerformanceTest(StreamTest::sequentialSum, number) + " 毫秒");
    System.out.println("Iterative Sum: " + sumPerformanceTest(StreamTest::iterativeSum, number) + " 毫秒");
}

public static long sumPerformanceTest(Function<Long, Long> function, long number) {
	long maxValue = Long.MAX_VALUE;

	for (int i=0; i<10; i++) {
		long start = System.nanoTime();
		long sum = function.apply(n);
		long end = System.nanoTime();
		System.out.println("Result: " + sum);
		long time = ( end - start ) / 1000000;

		if (time < maxValue) {
			maxValue = time;
		}
	}

	return maxValue;
}
複製代碼

爲了方便測試,咱們編寫一個sumPerformanceTest()方法,參數number表示給定的一個數,用於計算從1到這個數的全部天然數之和。該方法內部執行10次運算,返回時間最短的一次運算結果。

運行上面的代碼,能夠在控制檯看到以下結果:

Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Sequential Sum: 159 毫秒
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Iterative Sum: 5 毫秒
複製代碼

能夠看出,採用傳統的for循環更快,由於它不用作任何自動拆箱/裝箱操做,操做的都是基本類型。這個測試結果並不客觀,提高的性能取決於機器的配置,以上是我在公司的臺式機(機器配置爲Intel(R) Core i7-6700 CPU 3.40HZ; 8GB RAM)上運行的結果。

如今咱們使用並行流測試一下:

public static long parallelSum(long n) {
	return Stream.iterate(1L, i -> i + 1)
			.limit(n)
			.parallel()
			.reduce(0L, Long::sum);
}

public static void main(String[] args) {
	System.out.println("Parallel Sum: " + sumPerformanceTest(StreamTest::parallelSum, number) + " 毫秒");
}
複製代碼

並行流執行結果爲:

Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Parallel Sum: 570 毫秒
複製代碼

並行的執行效率比順序執行還要慢,這個結果讓人大跌眼鏡。主要有兩個緣由:

  1. iterate()方法生成的對象是基本類型的包裝類(也就是java.lang.Long類型),必須進行拆箱操做才能運算。
  2. iterate()方法不適合用並行流處理。

第一個緣由容易理解,自動拆箱操做確實須要花費必定的時間,這從前一個例子能夠看出來。第二個緣由中iterate()方法不適合用並行流處理,主要緣由是iterate()方法內部機制的問題。iterate()方法每次執行都須要依賴前一次的結果,好比本次執行的輸入值爲10,這個輸入值必須是前一次運算結果的輸出,所以iterate()方法很難使用並行流分割成不一樣小塊處理。實際上,上面的並行流程序還增長了順序處理的額外開銷,由於須要把每次操做執行的結果分別分配到不一樣的線程中。

一個有效的處理方式是使用LongStream.rangeClosed()方法,該方法彌補了上述例子的兩個缺點,它生成的是基本類型而非包裝類,不用拆箱操做就能夠運算,而且,它生成的是由範圍的數字,很容易拆分。如:生成1-20範圍的數字能夠拆分紅1-10, 11-20。

public static long rangedSum(long n) {
	return LongStream.rangeClosed(1, n)
			.reduce(0L, Long::sum);
}

public static void main(String[] args) {
	System.out.println("Ranged Sum: " + sumPerformanceTest(StreamTest::rangedSum, number) + " 毫秒");
}
複製代碼

執行結果爲:

Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Ranged Sum: 8 毫秒
複製代碼

這個結果比起sequentialSum()方法執行的結果還要快,因此選擇合適的數據結構有時候比並行化處理更重要。咱們再將rangeClosed()方法生成的流轉化爲並行流:

public static long parallelRangedSum(long n) {
    return LongStream.rangeClosed(1, n)
            .parallel()
            .reduce(0L, Long::sum);
}

public static void main(String[] args) {
	System.out.println("Parallel Ranged Sum: " + sumPerformanceTest(StreamTest::parallelRangedSum, number) + " 毫秒");
}
複製代碼

執行結果爲:

Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Result: 200000010000000
Parallel Ranged Sum: 2 毫秒
複製代碼

咱們終於獲得了想要的結果,因此並行操做須要選擇合適的數據結構,建議多作測試,找到合適的並行方式再執行,不然很容易跳到坑裏。

相關文章
相關標籤/搜索