Stream 做爲 Java 8 的一大亮點,它與 java.io 包裏的 InputStream 和 OutputStream 是徹底不一樣的概念。它也不一樣於 StAX 對 XML 解析的 Stream,也不是 Amazon Kinesis 對大數據實時處理的 Stream。Java 8 中的 Stream 是對集合(Collection)對象功能的加強,它專一於對集合對象進行各類很是便利、高效的聚合操做(aggregate operation),或者大批量數據操做 (bulk data operation)。Stream API 藉助於一樣新出現的 Lambda 表達式,極大的提升編程效率和程序可讀性。同時它提供串行和並行兩種模式進行匯聚操做,併發模式可以充分利用多核處理器的優點,使用 fork/join 並行方式來拆分任務和加速處理過程。一般編寫並行代碼很難並且容易出錯, 但使用 Stream API 無需編寫一行多線程的代碼,就能夠很方便地寫出高性能的併發程序。因此說,Java 8 中首次出現的 java.util.stream 是一個函數式語言+多核時代綜合影響的產物。java
在傳統的 J2EE 應用中,Java 代碼常常不得不依賴於關係型數據庫的聚合操做來完成諸如:程序員
這類的操做。算法
但在當今這個數據大爆炸的時代,在數據來源多樣化、數據海量化的今天,不少時候不得不脫離 RDBMS,或者以底層返回的數據爲基礎進行更上層的數據統計。而 Java 的集合 API 中,僅僅有極少許的輔助型方法,更多的時候是程序員須要用 Iterator 來遍歷集合,完成相關的聚合應用邏輯。這是一種遠不夠高效、笨拙的方法。在 Java 7 中,若是要發現 type 爲 grocery 的全部交易,而後返回以交易值降序排序好的交易 ID 集合,咱們須要這樣寫:數據庫
清單 1. Java 7 的排序、取值實現編程
List<Transaction> groceryTransactions = new Arraylist<>(); for(Transaction t: transactions){ if(t.getType() == Transaction.GROCERY){ groceryTransactions.add(t); } } Collections.sort(groceryTransactions, new Comparator(){ public int compare(Transaction t1, Transaction t2){ return t2.getValue().compareTo(t1.getValue()); } }); List<Integer> transactionIds = new ArrayList<>(); for(Transaction t: groceryTransactions){ transactionsIds.add(t.getId()); }
而在 Java 8 使用 Stream,代碼更加簡潔易讀;並且使用併發模式,程序執行速度更快。api
清單 2. Java 8 的排序、取值實現數組
List<Integer> transactionsIds = transactions.parallelStream(). filter(t -> t.getType() == Transaction.GROCERY). sorted(comparing(Transaction::getValue).reversed()). map(Transaction::getId). collect(toList());
回頁首數據結構
Stream 不是集合元素,它不是數據結構並不保存數據,它是有關算法和計算的,它更像一個高級版本的 Iterator。原始版本的 Iterator,用戶只能顯式地一個一個遍歷元素並對其執行某些操做;高級版本的 Stream,用戶只要給出須要對其包含的元素執行什麼操做,好比 「過濾掉長度大於 10 的字符串」、「獲取每一個字符串的首字母」等,Stream 會隱式地在內部進行遍歷,作出相應的數據轉換。多線程
Stream 就如同一個迭代器(Iterator),單向,不可往復,數據只能遍歷一次,遍歷過一次後即用盡了,就比如流水從面前流過,一去不復返。併發
而和迭代器又不一樣的是,Stream 能夠並行化操做,迭代器只能命令式地、串行化操做。顧名思義,當使用串行方式去遍歷時,每一個 item 讀完後再讀下一個 item。而使用並行去遍歷時,數據會被分紅多個段,其中每個都在不一樣的線程中處理,而後將結果一塊兒輸出。Stream 的並行操做依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。Java 的並行 API 演變歷程基本以下:
Stream 的另一大特色是,數據源自己能夠是無限的。
當咱們使用一個流的時候,一般包括三個基本步驟:
獲取一個數據源(source)→ 數據轉換→執行操做獲取想要的結果,每次轉換原有 Stream 對象不改變,返回一個新的 Stream 對象(能夠有屢次轉換),這就容許對其操做能夠像鏈條同樣排列,變成一個管道,以下圖所示。
圖 1. 流管道 (Stream Pipeline) 的構成
有多種方式生成 Stream Source:
流的操做類型分爲兩種:
在對於一個 Stream 進行屢次轉換操做 (Intermediate 操做),每次都對 Stream 的每一個元素進行轉換,並且是執行屢次,這樣時間複雜度就是 N(轉換次數)個 for 循環裏把全部操做都作掉的總和嗎?其實不是這樣的,轉換操做都是 lazy 的,多個轉換操做只會在 Terminal 操做的時候融合起來,一次循環完成。咱們能夠這樣簡單的理解,Stream 裏有個操做函數的集合,每次轉換操做就是把轉換函數放入這個集合中,在 Terminal 操做的時候循環 Stream 對應的集合,而後對每一個元素執行全部的函數。
還有一種操做被稱爲 short-circuiting。用以指:
當操做一個無限大的 Stream,而又但願在有限時間內完成操做,則在管道內擁有一個 short-circuiting 操做是必要非充分條件。
清單 3. 一個流操做的示例
int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum();
stream() 獲取當前小物件的 source,filter 和 mapToInt 爲 intermediate 操做,進行數據篩選和轉換,最後一個 sum() 爲 terminal 操做,對符合條件的所有小物件做重量求和。
簡單說,對 Stream 的使用就是實現一個 filter-map-reduce 過程,產生一個最終結果,或者致使一個反作用(side effect)。
下面提供最多見的幾種構造 Stream 的樣例。
清單 4. 構造流的幾種常見方法
// 1. Individual values Stream stream = Stream.of("a", "b", "c"); // 2. Arrays String [] strArray = new String[] {"a", "b", "c"}; stream = Stream.of(strArray); stream = Arrays.stream(strArray); // 3. Collections List<String> list = Arrays.asList(strArray); stream = list.stream();
須要注意的是,對於基本數值型,目前有三種對應的包裝類型 Stream:
IntStream、LongStream、DoubleStream。固然咱們也能夠用 Stream<Integer>、Stream<Long> >、Stream<Double>,可是 boxing 和 unboxing 會很耗時,因此特別爲這三種基本數值型提供了對應的 Stream。
Java 8 中尚未提供其它數值型 Stream,由於這將致使擴增的內容較多。而常規的數值型聚合運算能夠經過上面三種 Stream 進行。
清單 5. 數值流的構造
IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println); IntStream.range(1, 3).forEach(System.out::println); IntStream.rangeClosed(1, 3).forEach(System.out::println);
清單 6. 流轉換爲其它數據結構
// 1. Array String[] strArray1 = stream.toArray(String[]::new); // 2. Collection List<String> list1 = stream.collect(Collectors.toList()); List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new)); Set set1 = stream.collect(Collectors.toSet()); Stack stack1 = stream.collect(Collectors.toCollection(Stack::new)); // 3. String String str = stream.collect(Collectors.joining()).toString();
一個 Stream 只可使用一次,上面的代碼爲了簡潔而重複使用了數次。
接下來,當把一個數據結構包裝成 Stream 後,就要開始對裏面的元素進行各種操做了。常見的操做能夠歸類以下。
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit
咱們下面看一下 Stream 的比較典型用法。
map/flatMap
咱們先來看 map。若是你熟悉 scala 這類函數式語言,對這個方法應該很瞭解,它的做用就是把 input Stream 的每個元素,映射成 output Stream 的另一個元素。
清單 7. 轉換大寫
List<String> output = wordList.stream(). map(String::toUpperCase). collect(Collectors.toList());
這段代碼把全部的單詞轉換爲大寫。
清單 8. 平方數
List<Integer> nums = Arrays.asList(1, 2, 3, 4); List<Integer> squareNums = nums.stream(). map(n -> n * n). collect(Collectors.toList());
這段代碼生成一個整數 list 的平方數 {1, 4, 9, 16}。
從上面例子能夠看出,map 生成的是個 1:1 映射,每一個輸入元素,都按照規則轉換成爲另一個元素。還有一些場景,是一對多映射關係的,這時須要 flatMap。
清單 9. 一對多
Stream<List<Integer>> inputStream = Stream.of( Arrays.asList(1), Arrays.asList(2, 3), Arrays.asList(4, 5, 6) ); Stream<Integer> outputStream = inputStream. flatMap((childList) -> childList.stream());
flatMap 把 input Stream 中的層級結構扁平化,就是將最底層元素抽出來放到一塊兒,最終 output 的新 Stream 裏面已經沒有 List 了,都是直接的數字。
filter
filter 對原始 Stream 進行某項測試,經過測試的元素被留下來生成一個新 Stream。
清單 10. 留下偶數
Integer[] sixNums = {1, 2, 3, 4, 5, 6}; Integer[] evens = Stream.of(sixNums).filter(n -> n%2 == 0).toArray(Integer[]::new);
通過條件「被 2 整除」的 filter,剩下的數字爲 {2, 4, 6}。
清單 11. 把單詞挑出來
List<String> output = reader.lines(). flatMap(line -> Stream.of(line.split(REGEXP))). filter(word -> word.length() > 0). collect(Collectors.toList());
這段代碼首先把每行的單詞用 flatMap 整理到新的 Stream,而後保留長度不爲 0 的,就是整篇文章中的所有單詞了。
forEach
forEach 方法接收一個 Lambda 表達式,而後在 Stream 的每個元素上執行該表達式。
清單 12. 打印姓名(forEach 和 pre-java8 的對比)
// Java 8 roster.stream() .filter(p -> p.getGender() == Person.Sex.MALE) .forEach(p -> System.out.println(p.getName())); // Pre-Java 8 for (Person p : roster) { if (p.getGender() == Person.Sex.MALE) { System.out.println(p.getName()); } }
對一我的員集合遍歷,找出男性並打印姓名。能夠看出來,forEach 是爲 Lambda 而設計的,保持了最緊湊的風格。並且 Lambda 表達式自己是能夠重用的,很是方便。當須要爲多核系統優化時,能夠 parallelStream().forEach(),只是此時原有元素的次序無法保證,並行的狀況下將改變串行時操做的行爲,此時 forEach 自己的實現不須要調整,而 Java8 之前的 for 循環 code 可能須要加入額外的多線程邏輯。
但通常認爲,forEach 和常規 for 循環的差別不涉及到性能,它們僅僅是函數式風格與傳統 Java 風格的差異。
另一點須要注意,forEach 是 terminal 操做,所以它執行後,Stream 的元素就被「消費」掉了,你沒法對一個 Stream 進行兩次 terminal 運算。下面的代碼是錯誤的:
stream.forEach(element -> doOneThing(element)); stream.forEach(element -> doAnotherThing(element));
相反,具備類似功能的 intermediate 操做 peek 能夠達到上述目的。以下是出如今該 api javadoc 上的一個示例。
清單 13. peek 對每一個元素執行操做並返回一個新的 Stream
Stream.of("one", "two", "three", "four") .filter(e -> e.length() > 3) .peek(e -> System.out.println("Filtered value: " + e)) .map(String::toUpperCase) .peek(e -> System.out.println("Mapped value: " + e)) .collect(Collectors.toList());
forEach 不能修改本身包含的本地變量值,也不能用 break/return 之類的關鍵字提早結束循環。
findFirst
這是一個 termimal 兼 short-circuiting 操做,它老是返回 Stream 的第一個元素,或者空。
這裏比較重點的是它的返回值類型:Optional。這也是一個模仿 Scala 語言中的概念,做爲一個容器,它可能含有某值,或者不包含。使用它的目的是儘量避免 NullPointerException。
清單 14. Optional 的兩個用例
String strA = " abcd ", strB = null; print(strA); print(""); print(strB); getLength(strA); getLength(""); getLength(strB); public static void print(String text) { // Java 8 Optional.ofNullable(text).ifPresent(System.out::println); // Pre-Java 8 if (text != null) { System.out.println(text); } } public static int getLength(String text) { // Java 8 return Optional.ofNullable(text).map(String::length).orElse(-1); // Pre-Java 8 // return if (text != null) ? text.length() : -1; };
在更復雜的 if (xx != null) 的狀況中,使用 Optional 代碼的可讀性更好,並且它提供的是編譯時檢查,能極大的下降 NPE 這種 Runtime Exception 對程序的影響,或者迫使程序員更早的在編碼階段處理空值問題,而不是留到運行時再發現和調試。
Stream 中的 findAny、max/min、reduce 等方法等返回 Optional 值。還有例如 IntStream.average() 返回 OptionalDouble 等等。
reduce
這個方法的主要做用是把 Stream 元素組合起來。它提供一個起始值(種子),而後依照運算規則(BinaryOperator),和前面 Stream 的第一個、第二個、第 n 個元素組合。從這個意義上說,字符串拼接、數值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就至關於
Integer sum = integers.reduce(0, (a, b) -> a+b); 或
Integer sum = integers.reduce(0, Integer::sum);
也有沒有起始值的狀況,這時會把 Stream 的前面兩個元素組合起來,返回的是 Optional。
清單 15. reduce 的用例
// 字符串鏈接,concat = "ABCD" String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat); // 求最小值,minValue = -3.0 double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min); // 求和,sumValue = 10, 有起始值 int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum); // 求和,sumValue = 10, 無起始值 sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get(); // 過濾,字符串鏈接,concat = "ace" concat = Stream.of("a", "B", "c", "D", "e", "F"). filter(x -> x.compareTo("Z") > 0). reduce("", String::concat);
上面代碼例如第一個示例的 reduce(),第一個參數(空白字符)即爲起始值,第二個參數(String::concat)爲 BinaryOperator。這類有起始值的 reduce() 都返回具體的對象。而對於第四個示例沒有起始值的 reduce(),因爲可能沒有足夠的元素,返回的是 Optional,請留意這個區別。
limit/skip
limit 返回 Stream 的前面 n 個元素;skip 則是扔掉前 n 個元素(它是由一個叫 subStream 的方法更名而來)。
清單 16. limit 和 skip 對運行次數的影響
public void testLimitAndSkip() { List<Person> persons = new ArrayList(); for (int i = 1; i <= 10000; i++) { Person person = new Person(i, "name" + i); persons.add(person); } List<String> personList2 = persons.stream(). map(Person::getName).limit(10).skip(3).collect(Collectors.toList()); System.out.println(personList2); } private class Person { public int no; private String name; public Person (int no, String name) { this.no = no; this.name = name; } public String getName() { System.out.println(name); return name; } }
輸出結果爲:
name1 name2 name3 name4 name5 name6 name7 name8 name9 name10 [name4, name5, name6, name7, name8, name9, name10]
這是一個有 10,000 個元素的 Stream,但在 short-circuiting 操做 limit 和 skip 的做用下,管道中 map 操做指定的 getName() 方法的執行次數爲 limit 所限定的 10 次,而最終返回結果在跳過前 3 個元素後只有後面 7 個返回。
有一種狀況是 limit/skip 沒法達到 short-circuiting 目的的,就是把它們放在 Stream 的排序操做後,緣由跟 sorted 這個 intermediate 操做有關:此時系統並不知道 Stream 排序後的次序如何,因此 sorted 中的操做看上去就像徹底沒有被 limit 或者 skip 同樣。
清單 17. limit 和 skip 對 sorted 後的運行次數無影響
List<Person> persons = new ArrayList(); for (int i = 1; i <= 5; i++) { Person person = new Person(i, "name" + i); persons.add(person); } List<Person> personList2 = persons.stream().sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).limit(2).collect(Collectors.toList()); System.out.println(personList2);
上面的示例對清單 13 作了微調,首先對 5 個元素的 Stream 排序,而後進行 limit 操做。輸出結果爲:
name2 name1 name3 name2 name4 name3 name5 name4 [stream.StreamDW$Person@816f27d, stream.StreamDW$Person@87aac27]
即雖然最後的返回元素數量是 2,但整個管道中的 sorted 表達式執行次數沒有像前面例子相應減小。
最後有一點須要注意的是,對一個 parallel 的 Steam 管道來講,若是其元素是有序的,那麼 limit 操做的成本會比較大,由於它的返回對象必須是前 n 個也有同樣次序的元素。取而代之的策略是取消元素間的次序,或者不要用 parallel Stream。
sorted
對 Stream 的排序經過 sorted 進行,它比數組的排序更強之處在於你能夠首先對 Stream 進行各種 map、filter、limit、skip 甚至 distinct 來減小元素數量後,再排序,這能幫助程序明顯縮短執行時間。咱們對清單 14 進行優化:
清單 18. 優化:排序前進行 limit 和 skip
結果會簡單不少:
name2 name1 [stream.StreamDW$Person@6ce253f1, stream.StreamDW$Person@53d8d10a]
固然,這種優化是有 business logic 上的侷限性的:即不要求排序後再取值。
min/max/distinct
min 和 max 的功能也能夠經過對 Stream 元素先排序,再 findFirst 來實現,但前者的性能會更好,爲 O(n),而 sorted 的成本是 O(n log n)。同時它們做爲特殊的 reduce 方法被獨立出來也是由於求最大最小值是很常見的操做。
清單 19. 找出最長一行的長度
BufferedReader br = new BufferedReader(new FileReader("c:\\SUService.log")); int longest = br.lines(). mapToInt(String::length). max(). getAsInt(); br.close(); System.out.println(longest);
下面的例子則使用 distinct 來找出不重複的單詞。
清單 20. 找出全文的單詞,轉小寫,並排序
List<String> words = br.lines(). flatMap(line -> Stream.of(line.split(" "))). filter(word -> word.length() > 0). map(String::toLowerCase). distinct(). sorted(). collect(Collectors.toList()); br.close(); System.out.println(words);
Match
Stream 有三個 match 方法,從語義上說:
它們都不是要遍歷所有元素才能返回結果。例如 allMatch 只要一個元素不知足條件,就 skip 剩下的全部元素,返回 false。對清單 13 中的 Person 類稍作修改,加入一個 age 屬性和 getAge 方法。
清單 21. 使用 Match
List<Person> persons = new ArrayList(); persons.add(new Person(1, "name" + 1, 10)); persons.add(new Person(2, "name" + 2, 21)); persons.add(new Person(3, "name" + 3, 34)); persons.add(new Person(4, "name" + 4, 6)); persons.add(new Person(5, "name" + 5, 55)); boolean isAllAdult = persons.stream(). allMatch(p -> p.getAge() > 18); System.out.println("All are adult? " + isAllAdult); boolean isThereAnyChild = persons.stream(). anyMatch(p -> p.getAge() < 12); System.out.println("Any child? " + isThereAnyChild);
輸出結果:
All are adult? false Any child? true
Stream.generate
經過實現 Supplier 接口,你能夠本身來控制流的生成。這種情形一般用於隨機數、常量的 Stream,或者須要先後元素間維持着某種狀態信息的 Stream。把 Supplier 實例傳遞給 Stream.generate() 生成的 Stream,默認是串行(相對 parallel 而言)但無序的(相對 ordered 而言)。因爲它是無限的,在管道中,必須利用 limit 之類的操做限制 Stream 大小。
清單 22. 生成 10 個隨機整數
Random seed = new Random(); Supplier<Integer> random = seed::nextInt; Stream.generate(random).limit(10).forEach(System.out::println); //Another way IntStream.generate(() -> (int) (System.nanoTime() % 100)). limit(10).forEach(System.out::println);
Stream.generate() 還接受本身實現的 Supplier。例如在構造海量測試數據的時候,用某種自動的規則給每個變量賦值;或者依據公式計算 Stream 的每一個元素值。這些都是維持狀態信息的情形。
清單 23. 自實現 Supplier
Stream.generate(new PersonSupplier()). limit(10). forEach(p -> System.out.println(p.getName() + ", " + p.getAge())); private class PersonSupplier implements Supplier<Person> { private int index = 0; private Random random = new Random(); @Override public Person get() { return new Person(index++, "StormTestUser" + index, random.nextInt(100)); } }
輸出結果:
StormTestUser1, 9 StormTestUser2, 12 StormTestUser3, 88 StormTestUser4, 51 StormTestUser5, 22 StormTestUser6, 28 StormTestUser7, 81 StormTestUser8, 51 StormTestUser9, 4 StormTestUser10, 76
Stream.iterate
iterate 跟 reduce 操做很像,接受一個種子值,和一個 UnaryOperator(例如 f)。而後種子值成爲 Stream 的第一個元素,f(seed) 爲第二個,f(f(seed)) 第三個,以此類推。
清單 24. 生成一個等差數列
Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + " "));.
輸出結果:
0 3 6 9 12 15 18 21 24 27
與 Stream.generate 相仿,在 iterate 時候管道必須有 limit 這樣的操做來限制 Stream 大小。
java.util.stream.Collectors 類的主要做用就是輔助進行各種有用的 reduction 操做,例如轉變輸出爲 Collection,把 Stream 元素進行歸組。
groupingBy/partitioningBy
清單 25. 按照年齡歸組
Map<Integer, List<Person>> personGroups = Stream.generate(new PersonSupplier()). limit(100). collect(Collectors.groupingBy(Person::getAge)); Iterator it = personGroups.entrySet().iterator(); while (it.hasNext()) { Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next(); System.out.println("Age " + persons.getKey() + " = " + persons.getValue().size()); }
上面的 code,首先生成 100 人的信息,而後按照年齡歸組,相同年齡的人放到同一個 list 中,能夠看到以下的輸出:
Age 0 = 2 Age 1 = 2 Age 5 = 2 Age 8 = 1 Age 9 = 1 Age 11 = 2 ……
清單 26. 按照未成年人和成年人歸組
Map<Boolean, List<Person>> children = Stream.generate(new PersonSupplier()). limit(100). collect(Collectors.partitioningBy(p -> p.getAge() < 18)); System.out.println("Children number: " + children.get(true).size()); System.out.println("Adult number: " + children.get(false).size());
輸出結果:
Children number: 23 Adult number: 77
在使用條件「年齡小於 18」進行分組後能夠看到,不到 18 歲的未成年人是一組,成年人是另一組。partitioningBy 實際上是一種特殊的 groupingBy,它依照條件測試的是否兩種結果來構造返回的數據結構,get(true) 和 get(false) 能即爲所有的元素對象。
總之,Stream 的特性能夠概括爲: