咱們已經學會如何使用Stream API,用起來真的很爽,但簡潔的方法下面彷佛隱藏着無盡的祕密,如此強大的API是如何實現的呢?java
好比Pipeline是怎麼執行的,每次方法調用都會致使一次迭代嗎?自動並行又是怎麼作到的,線程個數是多少?本節咱們學習Stream流水線的原理,這是Stream實現的關鍵所在。數組
首先回顧一下容器執行Lambda表達式的方式,以ArrayList.forEach()
方法爲例,具體代碼以下:安全
// ArrayList.forEach() public void forEach(Consumer<? super E> action) { ... for (int i=0; modCount == expectedModCount && i < size; i++) { action.accept(elementData[i]);// 回調方法 } ... }
咱們看到ArrayList.forEach()
方法的主要邏輯就是一個for循環,在該for循環裏不斷調用action.accept()
回調方法完成對元素的遍歷。數據結構
這徹底沒有什麼新奇之處,回調方法在Java GUI的監聽器中普遍使用。Lambda表達式的做用就是至關於一個回調方法,這很好理解。app
Stream API中大量使用Lambda表達式做爲回調方法,但這並非關鍵。理解Stream咱們更關心的是另外兩個問題:流水線和自動並行。使用Stream或許很容易寫入以下形式的代碼:less
int longestStringLengthStartingWithA = strings.stream() .filter(s -> s.startsWith("A")) .mapToInt(String::length) .max();
上述代碼求出以字母A開頭的字符串的最大長度,一種直白的方式是爲每一次函數調用都執一次迭代,這樣作可以實現功能,但效率上確定是沒法接受的。ide
類庫的實現着使用流水線(Pipeline)的方式巧妙的避免了屢次迭代,其基本思想是在一次迭代中儘量多的執行用戶指定的操做。爲講解方便咱們彙總了Stream的全部操做。函數
Stream上的全部操做分爲兩類:中間操做和結束操做,中間操做只是一種標記,只有結束操做纔會觸發實際計算。中間操做又能夠分爲無狀態的(Stateless)和有狀態的(Stateful),無狀態中間操做是指元素的處理不受前面元素的影響,而有狀態的中間操做必須等到全部元素處理以後才知道最終結果。學習
好比排序是有狀態操做,在讀取全部元素以前並不能肯定排序結果;結束操做又能夠分爲短路操做和非短路操做,短路操做是指不用處理所有元素就能夠返回結果,好比找到第一個知足條件的元素。之因此要進行如此精細的劃分,是由於底層對每一種狀況的處理方式不一樣。ui
爲了更好的理解流的中間操做和終端操做,能夠經過下面的兩段代碼來看他們的執行過程。
IntStream.range(1, 10) .peek(x -> System.out.print("\nA" + x)) .limit(3) .peek(x -> System.out.print("B" + x)) .forEach(x -> System.out.print("C" + x));
輸出爲:
A1B1C1 A2B2C2 A3B3C3
中間操做是懶惰的,也就是中間操做不會對數據作任何操做,直到遇到了最終操做。而最終操做,都是比較熱情的。他們會往前回溯全部的中間操做。也就是當執行到最後的forEach操做的時候,它會回溯到它的上一步中間操做,上一步中間操做,又會回溯到上上一步的中間操做,...,直到最初的第一步。
第一次forEach執行的時候,會回溯peek 操做,而後peek會回溯更上一步的limit操做,而後limit會回溯更上一步的peek操做,頂層沒有操做了,開始自上向下開始執行,輸出:A1B1C1 第二次forEach執行的時候,而後會回溯peek 操做,而後peek會回溯更上一步的limit操做,而後limit會回溯更上一步的peek操做,頂層沒有操做了,開始自上向下開始執行,輸出:A2B2C2
... 當第四次forEach執行的時候,而後會回溯peek 操做,而後peek會回溯更上一步的limit操做,到limit的時候,發現limit(3)這個job已經完成,這裏就至關於循環裏面的break操做,跳出來終止循環。
再來看第二段代碼:
IntStream.range(1, 10) .peek(x -> System.out.print("\nA" + x)) .skip(6) .peek(x -> System.out.print("B" + x)) .forEach(x -> System.out.print("C" + x));
輸出爲:
A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9
第一次forEach執行的時候,會回溯peek操做,而後peek會回溯更上一步的skip操做,skip回溯到上一步的peek操做,頂層沒有操做了,開始自上向下開始執行,執行到skip的時候,由於執行到skip,這個操做的意思就是跳過,下面的都不要執行了,也就是就至關於循環裏面的continue,結束本次循環。輸出:A1
第二次forEach執行的時候,會回溯peek操做,而後peek會回溯更上一步的skip操做,skip回溯到上一步的peek操做,頂層沒有操做了,開始自上向下開始執行,執行到skip的時候,發現這是第二次skip,結束本次循環。輸出:A2
...
第七次forEach執行的時候,會回溯peek操做,而後peek會回溯更上一步的skip操做,skip回溯到上一步的peek操做,頂層沒有操做了,開始自上向下開始執行,執行到skip的時候,發現這是第七次skip,已經大於6了,它已經執行完了skip(6)的job了。此次skip就直接跳過,繼續執行下面的操做。輸出:A7B7C7
...直到循環結束。
仍然考慮上述求最長字符串的程序,一種直白的流水線實現方式是爲每一次函數調用都執一次迭代,並將處理中間結果放到某種數據結構中(好比數組,容器等)。
具體說來,就是調用filter()
方法後當即執行,選出全部以A開頭的字符串並放到一個列表list1中,以後讓list1傳遞給mapToInt()
方法並當即執行,生成的結果放到list2中,最後遍歷list2找出最大的數字做爲最終結果。程序的執行流程如如所示:
這樣作實現起來很是簡單直觀,但有兩個明顯的弊端:
這些弊端使得效率底下,根本沒法接受。若是不使用Stream API咱們都知道上述代碼該如何在一次迭代中完成,大體是以下形式:
int longest = 0; for(String str : strings){ if(str.startsWith("A")){// 1. filter(), 保留以A開頭的字符串 int len = str.length();// 2. mapToInt(), 轉換成長度 longest = Math.max(len, longest);// 3. max(), 保留最長的長度 } }
採用這種方式咱們不但減小了迭代次數,也避免了存儲中間結果,顯然這就是流水線,由於咱們把三個操做放在了一次迭代當中。只要咱們事先知道用戶意圖,老是可以採用上述方式實現跟Stream API等價的功能,但問題是Stream類庫的設計者並不知道用戶的意圖是什麼。
如何在沒法假設用戶行爲的前提下實現流水線,是類庫的設計者要考慮的問題。
咱們大體可以想到,應該採用某種方式記錄用戶每一步的操做,當用戶調用結束操做時將以前記錄的操做疊加到一塊兒在一次迭代中所有執行掉。沿着這個思路,有幾個問題須要解決:
注意這裏使用的是「操做(operation)」一詞,指的是「Stream中間操做」的操做,不少Stream操做會須要一個回調函數(Lambda表達式),所以一個完整的操做是<數據來源,操做,回調函數>構成的三元組。
Stream中使用Stage的概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將具備前後順序的各個Stage連到一塊兒,就構成了整個流水線。跟Stream相關類和接口的繼承關係圖示。
還有IntPipeline, LongPipeline, DoublePipeline沒在圖中畫出,這三個類專門爲三種基本類型(不是包裝類型)而定製的,跟ReferencePipeline是並列關係。
圖中Head用於表示第一個Stage,即調用調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage裏不包含任何操做;StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage,對應於無狀態和有狀態的中間操做。
Stream流水線組織結構示意圖以下:
圖中經過Collection.stream()
方法獲得Head也就是stage0,緊接着調用一系列的中間操做,不斷產生新的Stream。這些Stream對象以雙向鏈表的形式組織在一塊兒,構成整個流水線,因爲每一個Stage都記錄了前一個Stage和本次的操做以及回調函數,依靠這種結構就能創建起對數據源的全部操做。這就是Stream記錄操做的方式。
以上只是解決了操做記錄的問題,要想讓流水線起到應有的做用咱們須要一種將全部操做疊加到一塊兒的方案。你可能會以爲這很簡單,只須要從流水線的head開始依次執行每一步的操做(包括回調函數)就好了。
這聽起來彷佛是可行的,可是你忽略了前面的Stage並不知道後面Stage到底執行了哪一種操做,以及回調函數是哪一種形式。換句話說,只有當前Stage自己才知道該如何執行本身包含的動做。這就須要有某種協議來協調相鄰Stage之間的調用關係。
這種協議由Sink接口完成,Sink接口包含的方法以下表所示:
有了上面的協議,相鄰Stage之間調用就很方便了,每一個Stage都會將本身的操做封裝到一個Sink裏,前一個Stage只需調用後一個Stage的accept()
方法便可,並不須要知道其內部是如何處理的。
固然對於有狀態的操做,Sink的begin()
和end()
方法也是必須實現的。好比Stream.sorted()是一個有狀態的中間操做,其對應的Sink.begin()方法可能建立一個盛放結果的容器,而accept()方法負責將元素添加到該容器,最後end()負責對容器進行排序。
對於短路操做,Sink.cancellationRequested()
也是必須實現的,好比Stream.findFirst()是短路操做,只要找到一個元素,cancellationRequested()就應該返回true,以便調用者儘快結束查找。Sink的四個接口方法經常相互協做,共同完成計算任務。
實際上Stream API內部實現的的本質,就是如何重寫Sink的這四個接口方法。
有了Sink對操做的包裝,Stage之間的調用問題就解決了,執行時只須要從流水線的head開始對數據源依次調用每一個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能夠了。一種可能的Sink.accept()方法流程是這樣的:
void accept(U u){ 1. 使用當前Sink包裝的回調函數處理u 2. 將處理結果傳遞給流水線下游的Sink }
Sink接口的其餘幾個方法也是按照這種[處理->轉發]的模型實現。
下面咱們結合具體例子看看Stream的中間操做是如何將自身的操做包裝成Sink以及Sink是如何將處理結果轉發給下一個Sink的。先看Stream.map()方法:
// Stream.map(),調用該方法將產生一個新的Stream public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { ... return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override /*opWripSink()方法返回由回調函數包裝而成Sink*/ Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) { return new Sink.ChainedReference<P_OUT, R>(downstream) { @Override public void accept(P_OUT u) { R r = mapper.apply(u);// 1. 使用當前Sink包裝的回調函數mapper處理u downstream.accept(r);// 2. 將處理結果傳遞給流水線下游的Sink } }; } }; }
上述代碼看似複雜,其實邏輯很簡單,就是將回調函數mapper包裝到一個Sink當中。因爲Stream.map()是一個無狀態的中間操做,因此map()方法返回了一個StatelessOp內部類對象(一個新的Stream),調用這個新Stream的opWripSink()方法將獲得一個包裝了當前回調函數的Sink。
再來看一個複雜一點的例子。Stream.sorted()方法將對Stream中的元素進行排序,顯然這是一個有狀態的中間操做,由於讀取全部元素以前是無法獲得最終順序的。拋開模板代碼直接進入問題本質,sorted()方法是如何將操做封裝成Sink的呢?sorted()一種可能封裝的Sink代碼以下:
// Stream.sort()方法用到的Sink實現 class RefSortingSink<T> extends AbstractRefSortingSink<T> { private ArrayList<T> list;// 存放用於排序的元素 RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) { super(downstream, comparator); } @Override public void begin(long size) { ... // 建立一個存放排序元素的列表 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); } @Override public void end() { list.sort(comparator);// 只有元素所有接收以後才能開始排序 downstream.begin(list.size()); if (!cancellationWasRequested) {// 下游Sink不包含短路操做 list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink } else {// 下游Sink包含短路操做 for (T t : list) {// 每次都調用cancellationRequested()詢問是否能夠結束處理。 if (downstream.cancellationRequested()) break; downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink } } downstream.end(); list = null; } @Override public void accept(T t) { list.add(t);// 1. 使用當前Sink包裝動做處理t,只是簡單的將元素添加到中間列表當中 } }
上述代碼完美的展示了Sink的四個接口方法是如何協同工做的:
Sink完美封裝了Stream每一步操做,並給出了[處理->轉發]的模式來疊加操做。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啓動執行。
是什麼啓動這一連串的操做呢?也許你已經想到了啓動的原始動力就是結束操做(Terminal Operation),一旦調用某個結束操做,就會觸發整個流水線的執行。
結束操做以後不能再有別的操做,因此結束操做不會建立新的流水線階段(Stage),直觀的說就是流水線的鏈表不會在日後延伸了。
結束操做會建立一個包裝了本身操做的Sink,這也是流水線中最後一個Sink,這個Sink只須要處理數據而不須要將結果傳遞給下游的Sink(由於沒有下游)。對於Sink的[處理->轉發]模型,結束操做的Sink就是調用鏈的出口。
咱們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage並訪問Sink字段便可。
但Stream類庫的設計者沒有這麼作,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)
方法來獲得Sink,該方法的做用是返回一個新的包含了當前Stage表明的操做以及可以將結果傳遞給downstream的Sink對象。爲何要產生一個新對象而不是返回一個Sink字段?
這是由於使用opWrapSink()能夠將當前操做與下游Sink(上文中的downstream參數)結合成新Sink。試想只要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,由於stage0表明數據源,不包含操做),就能夠獲得一個表明了流水線上全部操做的Sink,用代碼表示就是這樣:
// AbstractPipeline.wrapSink() // 從下游向上遊不斷包裝Sink。若是最初傳入的sink表明結束操做, // 函數返回時就能夠獲得一個表明了流水線上全部操做的Sink。 final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { ... for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
如今流水線上從開始到結束的全部的操做都被包裝到了一個Sink裏,執行這個Sink就至關於執行整個流水線,執行Sink的代碼以下:
// AbstractPipeline.copyInto(), 對spliterator表明的數據執行wrappedSink表明的操做。 final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { ... if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷 spliterator.forEachRemaining(wrappedSink);// 迭代 wrappedSink.end();// 通知遍歷結束 } ... }
上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,而後調用spliterator.forEachRemaining()方法對數據進行迭代,最後調用wrappedSink.end()方法通知Sink數據處理結束。邏輯如此清晰。
最後一個問題是流水線上全部操做都執行後,用戶所須要的結果(若是有)在哪裏?首先要說明的是否是全部的Stream結束操做都須要返回結果,有些操做只是爲了使用其反作用(Side-effects),好比使用Stream.forEach()
方法將結果打印出來就是常見的使用反作用的場景(事實上,除了打印以外其餘場景都應避免使用反作用),對於真正須要返回結果的結束操做結果存在哪裏呢?
特別說明:反作用不該該被濫用,也許你會以爲在Stream.forEach()裏進行元素收集是個不錯的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都沒法保證,由於Stream可能會並行執行。大多數使用反作用的地方均可以使用歸約操做更安全和有效的完成。
// 錯誤的收集方式 ArrayList<String> results = new ArrayList<>(); stream.filter(s -> pattern.matcher(s).matches()) .forEach(s -> results.add(s)); // Unnecessary use of side-effects! // 正確的收集方式 List<String>results = stream.filter(s -> pattern.matcher(s).matches()) .collect(Collectors.toList()); // No side-effects!
回到流水線執行結果的問題上來,須要返回結果的流水線結果存在哪裏呢?這要分不一樣的狀況討論,下表給出了各類有返回結果的Stream結束操做。
本文詳細介紹了Stream流水線的組織方式和執行過程,學習本文將有助於理解原理並寫出正確的Stream代碼,同時打消你對Stream API效率方面的顧慮。如你所見,Stream API實現如此巧妙,即便咱們使用外部迭代手動編寫等價代碼,也未必更加高效。
注:留下本文所用的JDK版本,以便有考究癖的人考證:
$ java -version java version "1.8.0_101" Java(TM) SE Runtime Environment (build 1.8.0_101-b13) Java HotSpot(TM) Server VM (build 25.101-b13, mixed mode)
歡迎你們關注個人公衆號【風平浪靜如碼】,海量Java相關文章,學習資料都會在裏面更新,整理的資料也會放在裏面。
以爲寫的還不錯的就點個贊,加個關注唄!點關注,不迷路,持續更新!!!