前言
本文爲java.util.stream 包文檔的譯文
極其個別部分可能爲了更好理解,陳述略有改動,與原文幾乎一致
原文可參考在線API文檔
Package java.util.stream Description
一些用於支持流上函數式操做的類 ,例如在集合上的map-reduce轉換。例如
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
此處,咱們使用widgets, 他是一個 Collection<Widget>, 做爲一個流的源,
而後在流上執行一個filter-map-reduce 來得到紅色widgets重量的總和。(總和是一個歸約(reduce)操做的例子)
這個包中引入的關鍵抽象是流。
類 Stream、IntStream、LongStream和DoubleStream分別是在對象Object和基本類型int、long和double類型上的流。
流與集合的不一樣有如下幾點:
- 不存儲數據 流不是存儲元素的數據結構;相反,它經過一個哥哥計算操做組合而成的管道,從一個數據源,如數據結構、數組、生成器函數或i/o通道 來傳遞元素
- 函數特性 一個流上的操做產生一個結果,可是不會修改它的源。例如,過濾集合 得到的流會產生一個沒有被過濾元素的新流,而不是從源集合中刪除元素
- 延遲搜索 許多流操做,如過濾、映射或重複刪除,均可以延遲實現,從而提供出優化的機會。
- 例如,「找到帶有三個連續元音的第一個字符串」不須要檢查全部的輸入字符串。
- 流操做分爲中間(流生成)操做和終端(值或反作用生成)操做。許多的中間操做, 如filter,map等,都是延遲執行。
- 中間操做老是惰性的的。
- Stream多是無限的 雖然集合的大小是有限的,但流不須要。諸如limit(n)或findFirst()這樣的短路操做能夠容許在有限時間內完成無限流的計算。
- 消耗的 流的元素只在流的生命週期中訪問一次。就像迭代器同樣,必須生成一個新的流來從新訪問源的相同元素
流能夠經過多種方式進行得到,好比
- Collection 提供的stream parallelStream
- 從數組 Arrays.stream(Object[]) 靜態方法
- Stream類的靜態工廠方法 好比 Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator) Stream.generate
- BufferedReader.lines(); 文件行
- 獲取文件路徑的流: Files類的find(), lines(), list(), walk();
- Random.ints() 隨機數流
- JDK中的許多其餘流載方法,包括BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), and JarFile.stream().
還能夠從第三方類庫的提供中建立其餘一些流 ,詳見 Low-level stream construction
Stream operations and pipelines流操做以及管道
流操做被劃分爲中間和終端操做,經過流管道組合起來。
- 一條流管道由一個源(如一個集合、一個數組、一個生成器函數或一個i/o通道)組成;
- 而後是零個或更多的中間操做,例如stream.filter 或者stream.map;
- 還有一個終端操做,如stream.forEach或Stream.reduce
中間操做返回一條新流,他們老是惰性的;
執行諸如filter()之類的中間操做實際上並不會當即執行任何過濾操做,而是建立了一個新流,當遍歷時,它包含與給定謂詞相匹配的初始流的元素。直到管道的終端操做被執行,管道源的遍歷纔會開始
終端操做,例如Stream.forEach 和 IntStream.sum,能夠遍歷流以產生結果或反作用。
在執行終端操做以後,流管道被認爲是被消耗掉的,而且不能再被使用;
若是您須要再次遍歷相同的數據源,您必須從新從數據源得到一條新流
在幾乎全部狀況下,終端操做都很迫切,在返回以前完成了數據源的遍歷和管道的處理。只有終端操做iterator() 和 spliterator() 不是;
這些都是做爲一個「逃生艙口」提供的,以便在現有操做不足以完成任務的狀況下,啓用任意客戶控制的管道遍歷
延遲處理流能夠顯著提升效率;
在像上面的filer-map-sum例子這樣的管道中,過濾、映射和求和能夠被融合到數據的單個傳遞中,而且具備最小的中間狀態。
惰性還容許在沒有必要的狀況下避免檢查全部數據;對於諸如「查找第一個超過1000個字符的字符串」這樣的操做,只須要檢查足夠的字符串,就能夠找到具備所需特徵的字符串,而不須要檢查源的全部字符串。(當輸入流是無限的而不只僅是大的時候,這種行爲就變得更加劇要了。)
中間操做被進一步劃分爲無狀態和有狀態操做。
無狀態操做,如filter和map,在處理新元素時不保留之前處理的元素的狀態——每一個元素均可以獨立於其餘元素的操做處理。
有狀態的操做,例如distinct和sorted,則須要考慮從先前看處處理的元素中合併狀態。
有狀態操做可能須要在產生結果以前處理整個輸入。
例如,直到一我的看到了流的全部元素以前 他沒辦法完成對流的排序
所以,在並行計算下,一些包含有狀態中間操做的管道可能須要對數據進行屢次傳遞,或者可能須要緩衝重要數據。
包含徹底無狀態的中間操做的管道能夠在單次傳遞過程當中進行處理,不管是順序的仍是並行的,只有最少的數據緩衝
此外,一些操做被認爲是短路操做。一箇中間操做,若是在提供無限流輸入時,它可能會產生一個有限的流,那麼他就是短路的。
若是在無限流做爲輸入時,它可能在有限的時間內終止,這個終端操做是短路的。
在管道中進行短路操做是處理無限流在有限時間內正常終止的必要條件,但不是充分條件
Parallelism並行
經過顯式的for循環處理元素本質上是串行的
流經過將計算從新定義爲聚合操做的管道,而不是在每一個單獨元素上當即執行操做,從而促進並行執行。
全部的流操做均可以串行或並行執行
JDK中流的實現建立的都是串行流, 除非顯式的設置爲並行
例如,Collection有方法Collection.stream()和Collection.parallelstream(),它們分別產生串行和並行流;
其餘的流方法好比 IntStream.range(int, int) 產生串行的流,可是能夠經過調用BaseStream.parallel()方法設置爲 並行化
想要計算全部 widgets的重量之和 只須要
這個例子的串行和並行版本的惟一區別是初始時建立流,使用parallelStream()而不是stream()
當啓動終端操做時,流管道是按順序或並行執行的,這取決於它被調用的流的策略模式。
一個流是否能夠串行或並行執行,能夠用isParallel()方法來得到,
能夠用BaseStream.sequential() 和 BaseStream.parallel() 操做修改。
當啓動終端操做時,流管道是按順序或並行執行的,這取決於它被調用的流的模式。
除了被肯定爲顯式非肯定性的操做以外,如findAny(),不管是順序執行仍是並行執行,都不該該改變計算的結果。
大多數流操做接受描述用戶指定行爲的參數,這些參數一般是lambda表達式。
爲了保持正確的行爲,這些行爲參數必須是不干涉non-interfering的,而且在大多數狀況下必須是無狀態的。
這些參數始終是函數式接口的實例,例如Function,一般是lambda表達式或方法引用
Non-interference 無干擾的 非干涉的
Streams容許您在各類數據源上執行可能並行的聚合操做,甚至包括ArrayList之類的非線程安全集合。
只有當咱們可以在流管道的執行過程當中防止對數據源的干擾時這纔是可能的。
除了逃脫艙口iterator()和spliterator()以外,都是在調用終端操做時開始執行,並在終端操做完成時結束。
對於大多數數據源來講,防止干擾意味着確保在流管道的執行過程當中根本沒有修改數據源。
這方面的一個顯著的例外是源是併發集合的流,它們是專門設計用來處理併發修改的。併發流源是那些Spliterator 設置了併發特性(CONCURRENT characteristic)
所以,在流管道中,源不是併發的行爲參數,永遠不該該修改流的數據源。
一個行爲參數將被稱之爲干擾的(interfere) 若是對於一個非併發數據源來講若是它修改或致使被修改數據源被修改.
不只僅是並行的管道須要,全部的管道都須要是非干擾的(non-interference)
除非流數據源是併發的,不然在執行流管道時修改stream的數據源可能會致使異常、錯誤的答案或不一致的行爲。
對於表現良好的stream,數據源是能夠修改的,只要是在終端操做開始以前,而且全部的修改都會包含在內
好比
首先建立一個列表,由兩個字符串組成:「one」;和「two」。
而後,從該列表中建立一條stream。接下來,經過添加第三個字符串:「three」來修改列表。
最後,流的元素被collect 以及joining在一塊兒。因爲該列表在終端收集操做開始以前被修改,結果將是一串「one two three」。
從JDK集合返回的全部流,以及大多數其餘JDK類,都像這樣表現良好;
對於其餘庫生成的流,請參閱 Low-level stream construction,以知足構建行爲良好的流的需求。
Stateless behaviors無狀態行爲
若是流操做的行爲參數是有狀態的,那麼流管道的結果多是不肯定的或不正確的。
有狀態的lambda(或實現適當的功函數接口的其餘對象)是一個其結果依賴於任何可能在流水線執行過程當中發生變化的狀態。
有狀態lambda的一個例子是map()的參數:
在這裏,若是映射操做是並行執行的,那麼相同輸入的結果可能因線程調度差別而變化,而對於無狀態lambda表達式,結果老是相同的
還要注意的是,試圖從行爲參數訪問可變狀態時,在安全性和性能方面是您一個錯誤的選擇;
若是你不一樣步訪問那個狀態,你就有了數據競爭,所以你的代碼可能出現問題,
可是若是你對那個狀態進行同步訪問,你就有可能會破壞你想要從並行性中獲得的受益。
最好的方法是在流操做中徹底地避免有狀態的行爲參數; 一般總會有種方法能夠重構流以免狀態性
Side-effects反作用
通常來講,對流操做的行爲參數的反作用是不鼓勵的,由於它們一般會致使不知情的違反無狀態要求的行爲,以及其餘線程安全隱患
若是行爲參數確實有反作用,除非顯式地聲明,不然就沒法保證這些反作用對其餘線程的可見性,也不能保證在同一條管道內的「相同」元素上的不一樣操做在相同的線程中執行。此外,這些影響的排序可能出乎意料。即便管道被限制生成一個與stream源的處理順序一致的結果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray() 必須生成0、二、四、六、8),對於將mapper函數應用於個別元素的順序,或者對於給定元素執行任何行爲參數的順序,都沒有保證
對許多可能會被嘗試使用於反作用的計算中,能夠替換爲無反作用的,更安全更有效的表達,好比使用歸約而不是可變的累積器。
然而,使用println()來進行調試的反作用一般是無害的。少部分的流操做,如forEach()和peek(),用的就是他們的反作用;這些應該當心使用。
下面的例子演示,如何從一個使用反作用的計算轉變爲不適用反作用
下面的代碼搜索一個字符串流,以匹配給定的正則表達式,並將匹配放在列表中
這段代碼沒必要要地使用了反作用。若是並行執行,ArrayList的非線程安全將致使不正確的結果,而且添加所需的同步將致使競爭,從而破壞並行性的好處。
此外,在這裏使用反作用是徹底沒有必要的;forEach()能夠簡單地被替換爲更安全、更高效、更適合並行化的reduce操做。
Ordering 排序
流可能有也可能沒有定義好的順序。流是否有順序取決於源和中間操做。(所謂定義好的順序,就是說原始數據源是否有序)
某些流源(如列表或數組)本質上是有序的,而其餘的(如HashSet)則不是。
一些中間操做,好比sorted(),能夠在無序的流中強加一個順序,而其餘的操做可能會使一個有序的流變成無序,例如BaseStream.unordered().
此外,一些終端操做可能會忽略順序,好比forEach()。
若是一個流有序,大多數操做都被限制在順序的元素上操做;
若是流的源是包含一、二、3的列表,那麼執行map(x-x 2)的結果必須是二、四、6。
然而,若是源沒有定義的順序,那麼值二、四、6的任何排列都將是一個有效的結果。
對於串行流,順序的存在與否並不影響性能,隻影響肯定性。
若是一個流是有序的,在相同的源上重複執行相同的流管道將產生相同的結果;
若是沒有排序,重複執行可能會產生不一樣的結果
對於並行流,放鬆排序的限制有時能夠實現更高效的執行。
若是元素的排序不是很重要,那麼能夠更有效地實現某些聚合操做,如過濾重複元素(distinct() )或分組歸約(Collectors.groupingBy())。
相似地,與順序相關的操做,如limit(),可能須要緩衝以確保正確的排序,從而破壞並行性的好處。
在流有順序的狀況下,可是用戶並不特別關心這個順序,顯式地經過unordered()方法調用取消排序, 可能會改善一些有狀態或終端操做的並行性能。
然而,大多數的流管道,例如上面的「blocks的重量總和」,即便在排序約束下仍然有效地並行化。
Reduction operations歸約操做
一個歸約操做(也稱爲摺疊)接受一系列的輸入元素,並經過重複應用組合操做將它們組合成一個簡單的結果,例如查找一組數字的總和或最大值,或者將元素累積到一個列表中。streams類有多種形式的通用歸約reduce操做,稱爲reduce()和collect(),以及多個專門化的簡化形式,如sum()、max()或count()
固然,這樣的操做能夠很容易用簡單的順序循環來實現,以下所示
然而,咱們有充分的理由傾向於reduce操做,而不是像上面這樣的迭代累計運算。
它不只是一個「更抽象的」——它在流上把流做爲一個總體運行而不是做用於單獨的元素——可是一個適當構造的reduce操做本質上是可並行的,只要用於處理元素的函數(s)是結合的associative和無狀態stateless的。舉個例子,給定一個數字流,咱們想要找到和,咱們能夠寫:
幾乎不須要怎麼修改,就能夠以並行的方式運行
之因此歸約操做能夠很好地並行,是由於實現能夠並行地操做數據的子集,而後將中間結果組合在一塊兒,獲得最終的正確答案。(即便該語言有一個「"parallel for-each"」構造,迭代累計運算方法仍然須要開發人員提供對共享累積變量sum的線程安全更新以及所需的同步,這可能會消除並行性帶來的任何性能收益。)
使用reduce()代替了歸約操做的並行化的全部負擔,而且庫能夠提供一個高效的並行實現,不須要額外的同步
前面展現的「widgets」示例展現瞭如何與其餘操做相結合,以替換for循環。
若是widgets 是Widget 對象的集合,它有一個getWeight方法,咱們能夠找到最重的widget:
在更通用的形式中 對類型爲T的元素,而且返回結果類型爲U的reduce操做 須要三個參數:
在這裏,identity不只僅是歸約的初始化結果值或者若是沒有任何元素時的一個默認的返回值
迭代累計運算器接受部分結果和下一個元素,併產生一個新的中間結果。
組合函數結合了兩個部分結果,產生了一個新的中間結果。
(在並行減小的狀況下,組合是必要的,在這個過程當中,輸入被分區,每一個分區都計算出部分的累積,而後將部分結果組合起來產生最終的結果。)
更準確地說,identity必須是組合函數的恆等式。這意味着對全部的u,combiner.apply(identity, u)等於u,
另外,組合函數必須是結合的,必須與累加器函數兼容:
對全部u和t,
combiner.apply(identity, u) 必須等於accumulator.apply(u, t).
三參數形式是雙參數形式的泛化,將映射步驟合併到累加步驟中。
咱們能夠用更通常的形式從新改寫這個簡單的widgets重量的例子
儘管顯式的map-reduce的形式更易於閱讀,所以一般應該優先考慮。
通用的形式是爲了 經過將映射和減小到單個函數,以重要的工做進行優化 這種場景
Mutable reduction 可變的歸約
一個可變的歸約操做在處理流中的元素時,將輸入元素積累到一個可變的結果容器中,例如一個Collection或StringBuilder,
若是咱們想要獲取一串字符串的流並將它們鏈接成一個長字符串,咱們能夠經過普通的reduce來實現這個目標:
咱們會獲得想要的結果,它甚至能夠並行工做,然而,可是咱們可能對性能不滿意
這樣的實現將會進行大量的字符串複製 時間複雜度O(n^2)
一種更有效的方法是將結果累積到StringBuilder中,這是一個用於累積字符串的可變容器
就如同咱們對普通的歸約操做處理同樣,咱們可使用相同的技術來處理可變的歸約
可變歸約操做稱爲collect()當它將指望的結果收集到一個結果容器中,例如一個集合
收集操做須要三個功能:
一個supplier 功能來構造結果容器的新實例,
一個累計運算器函數將一個輸入元素合併到一個結果容器中,
一個組合函數將一個結果容器的內容合併到另外一個結果容器中。
它的形式與普通歸約的通常形式很是類似
與reduce()相比,以這種抽象的方式表示收集的好處是它直接適合並行化:
咱們能夠並行地累計運算部分結果,而後將它們組合起來,只要積累和組合功能知足適當的需求。
例如,爲了收集流中的元素的字符串表示到ArrayList,咱們能夠編寫顯式的for循環
或者咱們可使用一個可並行的collect形式
或者,從累加器函數中提取出來map操做,咱們能夠更簡潔地表達它:
在這裏,咱們的supplier只是ArrayList的構造器,累加器將string element元素添加到ArrayList中,組合器簡單地使用addAll將字符串從一個容器複製到另外一個容器中
collect的三個部分——supplier, accumulator, 和combiner ——是緊密耦合的。
咱們可使用Collector來抽象的表達描述這三部分。
上面的例子能夠將字符串collect到列表中,可使用一個標準收集器來重寫:
將可變的歸約打包成收集器有另外一個優勢:可組合性。
類Collectors包含許多用於收集器的預約義工廠,包括將一個收集器轉換爲另外一個收集器的組合器。
例如,假設咱們有一個Collector,它計算員工流的薪水之和,以下所列
(對於第二個類型的參數 ? ,僅僅代表咱們不關心收集器所使用的中間類型。 )若是咱們想要建立一個收集器來按部門計算工資的總和,咱們可使用groupingBy來重用summingSalaries 薪水:
就像常規的reduce操做同樣,只有知足適當的條件collect() 操做纔可以並行化
對於任何部分累計運算的結果,將其與空結果容器相結合combiner 必須產生一個等效的結果
也就是說,對於任意一個部分累計運算的結果p,累計運算或者組合調用的結果,p必須等於 combiner.apply(p, supplier.get()).
並且,不管計算是否分割,它必須產生一個等價的結果。對於任何輸入元素t1和t2,下面計算的結果r1和r2必須是等價的
在這裏,等價一般指的是Object.equals(Object).。但在某些狀況下,等價性的要求可能會下降
Reduction, concurrency, and ordering 歸約 併發與排序
經過一些複雜的reduce操做,例如生成map的collect(),例如
並行執行操做可能實際上會產生反效果。這是由於組合步驟(經過鍵將一個Map合併到另外一個Map)對於某些Map實現來講可能代價很大
然而,假設在這個reduce中使用的結果容器是一個可修改的集合——例如ConcurrentHashMap。在這種狀況下,對迭代累計運算器的並行調用實際上能夠將它們的結果併發地放到相同的共享結果容器中,從而將再也不須要組合器合併不一樣的結果容器。這可能會促進並行執行性能的提高。咱們稱之爲並行reduce
支持併發reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性爲標誌。併發特性。然而,併發集合也有缺點。
若是多個線程將結果併發地存入一個共享容器,那麼產生結果的順序是不肯定的。
所以,只有在排序對正在處理的流不重要的狀況下,纔可能執行併發的reduce
下面這些條件下 Stream.collect(Collector) 的實現會併發reduce(歸約)
- 流是並行的;
- 收集器有Collector.Characteristics.CONCURRENT 特性
- 要麼是無序的流,要麼收集器擁有Collector.Characteristics.UNORDERED 特性
您能夠經過使用BaseStream.unordered()方法來確保流是無序的。例如:
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) 等同於 groupingBy).
Associativity 結合性
若是一個操做或者函數方法知足下面的形式,那麼他就是結合的
若是咱們把這個問題擴大到四項,就能夠看到這種結合性對於並行的重要性
這樣咱們就能夠把(a op b) 和 (c op d) 進行並行計算 最後在對他們進行 op 運算
結合性操做的例子包括數字加法、min、max和字符串串聯
Low-level stream construction 低級流構造器
到目前爲止,全部的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法來得到一個stream。這些處理流的方法是如何實現的?
類StreamSupport提供了許多用於建立流的低級方法,全部這些方法都使用某種形式的Spliterator。
一個Spliterator是迭代器的一個並行版本;
它描述了一個(多是無限的)元素集合,支持順序前進、批量遍歷,並將一部分輸入分割成另外一個可並行處理的Spliterator。
在最低層,全部的流都由一個Spliterator驅動構造
在實現Spliterator時,有許多實現選擇,幾乎全部的實現都是在簡單的實現和運行時性能之間進行權衡。
建立Spliterator的最簡單、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)從一個iterator中建立spliterator 。
雖然這樣的spliterator 能夠工做,但它可能會提供糟糕的並行性能,由於咱們已經丟失了容量信息(底層數據集有多大),以及被限制爲一個簡單的分割算法。
一個高質量的spliterator 將提供平衡的和已知大小的分割,精確的容量信息,以及一些可用於實現優化執行的spliterator 或數據的其餘特徵 (特徵見spliterator characteristics)
可變數據源的Spliterators 有一個額外的挑戰;綁定到數據的時間,由於數據可能在建立Spliterators 後和開始執行流管道的期間,發生變化。
理想狀況下,一個流的spliterator將報告一個IMMUTABLE or CONCURRENT;若是不是,應該是後期綁定(late-binding)。
若是一個源不能直接提供一個推薦的spliterator,它可能會經過Supplier 間接地提供一個spliterator,並經過接收Supplier做爲參數的stream()版本構造一個stream。只有在流管道的終端操做以後,才從Supplier處得到spliterator
這些要求極大地減小了流源的變化和流管道的執行之間的潛在的干擾。
基於具備所需特性的spliterators ,或者使用 Supplier-based 的工廠的形式的流,在終端操做開始以前對數據源的修改是不受影響的(若是流操做的行爲參數知足不干涉和無狀態的要求標準)。參見不干涉 Non-Interference的細節。