提及 Java 8,咱們知道 Java 8 大改動之一就是增長函數式編程,而 Stream API 即是函數編程的主角,Stream API 是一種流式的處理數據風格,也就是將要處理的數據看成流,在管道中進行傳輸,並在管道中的每一個節點對數據進行處理,如過濾、排序、轉換等。編程
首先咱們先看一個使用Stream API的示例,具體代碼以下:數組
code1 Stream exampleapp
這是個很簡單的一個Stream使用例子,咱們過濾掉空字符串後,轉成int類型並計算出最大值,這其中包括了三個操做:filter、mapToInt、sum。相信大多數人再剛使用Stream API的時候都會有個疑問,Stream是指怎麼實現的,是每一次函數調用就執行一次迭代嗎?答案確定是否,由於若是真的是每一次函數調用就執行一次迭代,這個效率是很難接受的,Stream也不會那麼受歡迎。less
其實Stream內部是經過流水線(Pipeline)的方式來實現的,基本思想是在迭代的時候順着流水線儘量的執行更多的操做,從而避免屢次迭代。爲了對Stream的操做有更清晰的認識,咱們彙總了Stream的全部操做。函數式編程
從上表能夠看出Stream將全部操做分爲兩類:中間操做和終止操做。其中中間操做分爲無狀態和有狀態,終止操做分爲非短路操做和短路操做,下面是針對這幾個操做的含義說明:函數
一、中間操做:中間操做只是一種標記,只有結束操做纔會觸發實際計算spa
二、終止操做:顧名思義,就是得出最後計算結果的操做設計
經過上面的介紹,咱們瞭解到Stream在執行中間操做時僅僅是記錄,當用戶調用終止操做時,會在一個迭代裏將已經記錄的操做順着流水線所有執行掉。沿着這個思路,有幾個問題須要解決:code
圖1-1對象
關於操做如何記錄,在JDK源碼註釋中屢次用(操做)stage來標識用戶的每一次操做,而一般狀況下Stream的操做又須要一個回調函數,因此一個完整的操做是由數據來源、操做、回調函數組成的三元組來表示。而在具體實現中,使用實例化的ReferencePipeline來表示,即圖1-1中的Head、StatelessOp、StatefulOp的實例。接下來咱們來看下Stream幾個經常使用方法的源碼。
code2 Collection.Stream()
code3 StreamSupport.stream()
code4 ReferencePipeline.map()
從上面源碼中能夠看出來,咱們調用stream()方法時最終會建立一個Head實例來表示流操做的頭,當調用map()方法時則會建立無狀態的中間操做實例StatelessOp,一樣調用其餘操做對應的方法也會生成一個ReferencePipeline實例,在這裏就不一一列舉。在用戶調用一系列操做後,最終會造成一個雙向鏈表,以下圖所示:
圖1-2
上面咱們說明了Stream是經過stage記錄操做,但stage只保存當前操做,它並不知道下個stage如何操做,須要什麼操做。因此要執行的話還須要某種協議將各個stage關聯起來。jdk中就是使用Slink接口來實現的,Slink接口定義begin()、end()、cancellationRequested()、accept()四個方法,以下表所示。
往回看code3 ReferencePipeline.map()的方法,咱們會發現咱們在建立一個ReferencePipeline實例的時候,須要重寫opWrapSink方法來生成對應Sink實例。並且經過閱讀源碼會發現經常使用的操做都會建立一個ChainedReference實例。咱們能夠看下code5 ChainedReference抽象類的源碼實現,由於ChainedReference只是個抽象實現,不攜帶具體操做的特性,因此是更能體現做者的設計理念。
經過查看源碼能夠發現ChainedReference會持有下一個操做的Slink,並在調用begin、end、cancellationRequested方法會調用下一個操做的Slink的相應方法,以此來達到疊加的效果。
code5 ChainedReference
Sink完美封裝了Stream每一步操做,並給出了[處理->轉發]的模式來疊加操做。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啓動執行。是什麼啓動這一連串的操做呢?也許你已經想到了啓動的原始動力就是結束操做(Terminal Operation),一旦調用某個結束操做,就會觸發整個流水線的執行。
結束操做以後不能再有別的操做,因此結束操做不會建立新的流水線階段(Stage),直觀的說就是流水線的鏈表不會在日後延伸了。結束操做會建立一個包裝了本身操做的Sink,這也是流水線中最後一個Sink,這個Sink只須要處理數據而不須要將結果傳遞給下游的Sink(由於沒有下游)。對於Sink的[處理->轉發]模型,結束操做的Sink就是調用鏈的出口。
咱們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage並訪問Sink字段便可。但Stream類庫的設計者沒有這麼作,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來獲得Sink,該方法的做用是返回一個新的包含了當前Stage表明的操做以及可以將結果傳遞給downstream的Sink對象。爲何要產生一個新對象而不是返回一個Sink字段?這是由於使用opWrapSink()能夠將當前操做與下游Sink(上文中的downstream參數)結合成新Sink。試想只要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,由於stage0表明數據源,不包含操做),就能夠獲得一個表明了流水線上全部操做的Sink,用代碼表示就是這樣:
code6 AbstractPipeline.wrapSink
如今流水線上從開始到結束的全部的操做都被包裝到了一個Sink裏,執行這個Sink就至關於執行整個流水線,執行Sink的代碼以下:
code7 AbstractPipeline.copyInto
上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,而後調用spliterator.forEachRemaining()方法對數據進行迭代,最後調用wrappedSink.end()方法通知Sink數據處理結束。邏輯如此清晰。
做者:Huang Rongpeng