深刻理解Java Stream流水線

前面咱們已經學會如何使用Stream API,用起來真的很爽,但簡潔的方法下面彷佛隱藏着無盡的祕密,如此強大的API是如何實現的呢?Pipeline是怎麼執行的,每次方法調用都會致使一次迭代嗎?自動並行又是怎麼作到的,線程個數是多少?本節咱們學習Stream流水線的原理,這是Stream實現的關鍵所在。html

首先回顧一下容器執行Lambda表達式的方式,以ArrayList.forEach()方法爲例,具體代碼以下:java

// ArrayList.forEach()
public void forEach(Consumer<? super E> action) {
    ...
    for (int i=0; modCount == expectedModCount && i < size; i++) {
        action.accept(elementData[i]);// 回調方法
    }
    ...
}

咱們看到ArrayList.forEach()方法的主要邏輯就是一個for循環,在該for循環裏不斷調用action.accept()回調方法完成對元素的遍歷。這徹底沒有什麼新奇之處,回調方法在Java GUI的監聽器中普遍使用。Lambda表達式的做用就是至關於一個回調方法,這很好理解。git

Stream API中大量使用Lambda表達式做爲回調方法,但這並非關鍵。理解Stream咱們更關心的是另外兩個問題:流水線和自動並行。使用Stream或許很容易寫入以下形式的代碼:github

int longestStringLengthStartingWithA
        = strings.stream()
              .filter(s -> s.startsWith("A"))
              .mapToInt(String::length)
              .max();

上述代碼求出以字母A開頭的字符串的最大長度,一種直白的方式是爲每一次函數調用都執一次迭代,這樣作可以實現功能,但效率上確定是沒法接受的。類庫的實現着使用流水線(Pipeline)的方式巧妙的避免了屢次迭代,其基本思想是在一次迭代中儘量多的執行用戶指定的操做。爲講解方便咱們彙總了Stream的全部操做。數組

Stream操做分類
中間操做(Intermediate operations) 無狀態(Stateless) unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()
有狀態(Stateful) distinct() sorted() sorted() limit() skip()
結束操做(Terminal operations) 非短路操做 forEach() forEachOrdered() toArray() reduce() collect() max() min() count()
短路操做(short-circuiting) anyMatch() allMatch() noneMatch() findFirst() findAny()

Stream上的全部操做分爲兩類:中間操做和結束操做,中間操做只是一種標記,只有結束操做纔會觸發實際計算。中間操做又能夠分爲無狀態的(Stateless)和有狀態的(Stateful),無狀態中間操做是指元素的處理不受前面元素的影響,而有狀態的中間操做必須等到全部元素處理以後才知道最終結果,好比排序是有狀態操做,在讀取全部元素以前並不能肯定排序結果;結束操做又能夠分爲短路操做和非短路操做,短路操做是指不用處理所有元素就能夠返回結果,好比找到第一個知足條件的元素。之因此要進行如此精細的劃分,是由於底層對每一種狀況的處理方式不一樣。安全

一種直白的實現方式

Stream_pipeline_naive

仍然考慮上述求最長字符串的程序,一種直白的流水線實現方式是爲每一次函數調用都執一次迭代,並將處理中間結果放到某種數據結構中(好比數組,容器等)。具體說來,就是調用filter()方法後當即執行,選出全部以A開頭的字符串並放到一個列表list1中,以後讓list1傳遞給mapToInt()方法並當即執行,生成的結果放到list2中,最後遍歷list2找出最大的數字做爲最終結果。程序的執行流程如如所示:數據結構

這樣作實現起來很是簡單直觀,但有兩個明顯的弊端:app

  1. 迭代次數多。迭代次數跟函數調用的次數相等。
  2. 頻繁產生中間結果。每次函數調用都產生一次中間結果,存儲開銷沒法接受。

這些弊端使得效率底下,根本沒法接受。若是不使用Stream API咱們都知道上述代碼該如何在一次迭代中完成,大體是以下形式:less

int longest = 0;
for(String str : strings){
    if(str.startsWith("A")){// 1. filter(), 保留以A開頭的字符串
        int len = str.length();// 2. mapToInt(), 轉換成長度
        longest = Math.max(len, longest);// 3. max(), 保留最長的長度
    }
}

採用這種方式咱們不但減小了迭代次數,也避免了存儲中間結果,顯然這就是流水線,由於咱們把三個操做放在了一次迭代當中。只要咱們事先知道用戶意圖,老是可以採用上述方式實現跟Stream API等價的功能,但問題是Stream類庫的設計者並不知道用戶的意圖是什麼。如何在沒法假設用戶行爲的前提下實現流水線,是類庫的設計者要考慮的問題。ide

Stream流水線解決方案

咱們大體可以想到,應該採用某種方式記錄用戶每一步的操做,當用戶調用結束操做時將以前記錄的操做疊加到一塊兒在一次迭代中所有執行掉。沿着這個思路,有幾個問題須要解決:

  1. 用戶的操做如何記錄?
  2. 操做如何疊加?
  3. 疊加以後的操做如何執行?
  4. 執行後的結果(若是有)在哪裏?

>> 操做如何記錄

Java_stream_pipeline_classes

注意這裏使用的是「操做(operation)」一詞,指的是「Stream中間操做」的操做,不少Stream操做會須要一個回調函數(Lambda表達式),所以一個完整的操做是<數據來源,操做,回調函數>構成的三元組。Stream中使用Stage的概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將具備前後順序的各個Stage連到一塊兒,就構成了整個流水線。跟Stream相關類和接口的繼承關係圖示。

還有IntPipeline, LongPipeline, DoublePipeline沒在圖中畫出,這三個類專門爲三種基本類型(不是包裝類型)而定製的,跟ReferencePipeline是並列關係。圖中Head用於表示第一個Stage,即調用調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage裏不包含任何操做;StatelessOpStatefulOp分別表示無狀態和有狀態的Stage,對應於無狀態和有狀態的中間操做。

Stream流水線組織結構示意圖以下:

Stream_pipeline_example

圖中經過Collection.stream()方法獲得Head也就是stage0,緊接着調用一系列的中間操做,不斷產生新的Stream。這些Stream對象以雙向鏈表的形式組織在一塊兒,構成整個流水線,因爲每一個Stage都記錄了前一個Stage和本次的操做以及回調函數,依靠這種結構就能創建起對數據源的全部操做。這就是Stream記錄操做的方式。

>> 操做如何疊加

以上只是解決了操做記錄的問題,要想讓流水線起到應有的做用咱們須要一種將全部操做疊加到一塊兒的方案。你可能會以爲這很簡單,只須要從流水線的head開始依次執行每一步的操做(包括回調函數)就好了。這聽起來彷佛是可行的,可是你忽略了前面的Stage並不知道後面Stage到底執行了哪一種操做,以及回調函數是哪一種形式。換句話說,只有當前Stage自己才知道該如何執行本身包含的動做。這就須要有某種協議來協調相鄰Stage之間的調用關係。

這種協議由Sink接口完成,Sink接口包含的方法以下表所示:

方法名 做用
void begin(long size) 開始遍歷元素以前調用該方法,通知Sink作好準備。
void end() 全部元素遍歷完成以後調用,通知Sink沒有更多的元素了。
boolean cancellationRequested() 是否能夠結束操做,可讓短路操做盡早結束。
void accept(T t) 遍歷元素時調用,接受一個待處理元素,並對元素進行處理。Stage把本身包含的操做和回調方法封裝到該方法裏,前一個Stage只須要調用當前Stage.accept(T t)方法就好了。

有了上面的協議,相鄰Stage之間調用就很方便了,每一個Stage都會將本身的操做封裝到一個Sink裏,前一個Stage只需調用後一個Stage的accept()方法便可,並不須要知道其內部是如何處理的。固然對於有狀態的操做,Sink的begin()end()方法也是必須實現的。好比Stream.sorted()是一個有狀態的中間操做,其對應的Sink.begin()方法可能建立一個乘放結果的容器,而accept()方法負責將元素添加到該容器,最後end()負責對容器進行排序。對於短路操做,Sink.cancellationRequested()也是必須實現的,好比Stream.findFirst()是短路操做,只要找到一個元素,cancellationRequested()就應該返回true,以便調用者儘快結束查找。Sink的四個接口方法經常相互協做,共同完成計算任務。實際上Stream API內部實現的的本質,就是如何重載Sink的這四個接口方法

有了Sink對操做的包裝,Stage之間的調用問題就解決了,執行時只須要從流水線的head開始對數據源依次調用每一個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就能夠了。一種可能的Sink.accept()方法流程是這樣的:

void accept(U u){
    1. 使用當前Sink包裝的回調函數處理u
    2. 將處理結果傳遞給流水線下游的Sink
}

Sink接口的其餘幾個方法也是按照這種[處理->轉發]的模型實現。下面咱們結合具體例子看看Stream的中間操做是如何將自身的操做包裝成Sink以及Sink是如何將處理結果轉發給下一個Sink的。先看Stream.map()方法:

// Stream.map(),調用該方法將產生一個新的Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    ...
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override /*opWripSink()方法返回由回調函數包裝而成Sink*/
        Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) {
            return new Sink.ChainedReference<P_OUT, R>(downstream) {
                @Override
                public void accept(P_OUT u) {
                    R r = mapper.apply(u);// 1. 使用當前Sink包裝的回調函數mapper處理u
                    downstream.accept(r);// 2. 將處理結果傳遞給流水線下游的Sink
                }
            };
        }
    };
}

上述代碼看似複雜,其實邏輯很簡單,就是將回調函數mapper包裝到一個Sink當中。因爲Stream.map()是一個無狀態的中間操做,因此map()方法返回了一個StatelessOp內部類對象(一個新的Stream),調用這個新Stream的opWripSink()方法將獲得一個包裝了當前回調函數的Sink。

再來看一個複雜一點的例子。Stream.sorted()方法將對Stream中的元素進行排序,顯然這是一個有狀態的中間操做,由於讀取全部元素以前是無法獲得最終順序的。拋開模板代碼直接進入問題本質,sorted()方法是如何將操做封裝成Sink的呢?sorted()一種可能封裝的Sink代碼以下:

// Stream.sort()方法用到的Sink實現
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;// 存放用於排序的元素
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 建立一個存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素所有接收以後才能開始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操做
            list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink
        }
        else {// 下游Sink包含短路操做
            for (T t : list) {// 每次都調用cancellationRequested()詢問是否能夠結束處理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用當前Sink包裝動做處理t,只是簡單的將元素添加到中間列表當中
    }
}

上述代碼完美的展示了Sink的四個接口方法是如何協同工做的:

  1. 首先beging()方法告訴Sink參與排序的元素個數,方便肯定中間結果容器的的大小;
  2. 以後經過accept()方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷全部元素;
  3. 最後end()方法告訴Sink全部元素遍歷完畢,啓動排序步驟,排序完成後將結果傳遞給下游的Sink;
  4. 若是下游的Sink是短路操做,將結果傳遞給下游時不斷詢問下游cancellationRequested()是否能夠結束處理。

>> 疊加以後的操做如何執行

Stream_pipeline_Sink

Sink完美封裝了Stream每一步操做,並給出了[處理->轉發]的模式來疊加操做。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啓動執行。是什麼啓動這一連串的操做呢?也許你已經想到了啓動的原始動力就是結束操做(Terminal Operation),一旦調用某個結束操做,就會觸發整個流水線的執行。

結束操做以後不能再有別的操做,因此結束操做不會建立新的流水線階段(Stage),直觀的說就是流水線的鏈表不會在日後延伸了。結束操做會建立一個包裝了本身操做的Sink,這也是流水線中最後一個Sink,這個Sink只須要處理數據而不須要將結果傳遞給下游的Sink(由於沒有下游)。對於Sink的[處理->轉發]模型,結束操做的Sink就是調用鏈的出口。

咱們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage並訪問Sink字段便可。但Stream類庫的設計者沒有這麼作,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來獲得Sink,該方法的做用是返回一個新的包含了當前Stage表明的操做以及可以將結果傳遞給downstream的Sink對象。爲何要產生一個新對象而不是返回一個Sink字段?這是由於使用opWrapSink()能夠將當前操做與下游Sink(上文中的downstream參數)結合成新Sink。試想只要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,由於stage0表明數據源,不包含操做),就能夠獲得一個表明了流水線上全部操做的Sink,用代碼表示就是這樣:

// AbstractPipeline.wrapSink()
// 從下游向上遊不斷包裝Sink。若是最初傳入的sink表明結束操做,
// 函數返回時就能夠獲得一個表明了流水線上全部操做的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

如今流水線上從開始到結束的全部的操做都被包裝到了一個Sink裏,執行這個Sink就至關於執行整個流水線,執行Sink的代碼以下:

// AbstractPipeline.copyInto(), 對spliterator表明的數據執行wrappedSink表明的操做。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍歷結束
    }
    ...
}

上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,而後調用spliterator.forEachRemaining()方法對數據進行迭代(Spliterator是容器的一種迭代器,參閱),最後調用wrappedSink.end()方法通知Sink數據處理結束。邏輯如此清晰。

>> 執行後的結果在哪裏

最後一個問題是流水線上全部操做都執行後,用戶所須要的結果(若是有)在哪裏?首先要說明的是否是全部的Stream結束操做都須要返回結果,有些操做只是爲了使用其反作用(Side-effects),好比使用Stream.forEach()方法將結果打印出來就是常見的使用反作用的場景(事實上,除了打印以外其餘場景都應避免使用反作用),對於真正須要返回結果的結束操做結果存在哪裏呢?

特別說明:反作用不該該被濫用,也許你會以爲在Stream.forEach()裏進行元素收集是個不錯的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都沒法保證,由於Stream可能會並行執行。大多數使用反作用的地方均可以使用歸約操做更安全和有效的完成。

// 錯誤的收集方式
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
      .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
// 正確的收集方式
List<String>results =
     stream.filter(s -> pattern.matcher(s).matches())
             .collect(Collectors.toList());  // No side-effects!

回到流水線執行結果的問題上來,須要返回結果的流水線結果存在哪裏呢?這要分不一樣的狀況討論,下表給出了各類有返回結果的Stream結束操做。

返回類型 對應的結束操做
boolean anyMatch() allMatch() noneMatch()
Optional findFirst() findAny()
歸約結果 reduce() collect()
數組 toArray()
  1. 對於表中返回boolean或者Optional的操做(Optional是存放 一個 值的容器)的操做,因爲值返回一個值,只須要在對應的Sink中記錄這個值,等到執行結束時返回就能夠了。
  2. 對於歸約操做,最終結果放在用戶調用時指定的容器中(容器類型經過收集器指定)。collect(), reduce(), max(), min()都是歸約操做,雖然max()和min()也是返回一個Optional,但事實上底層是經過調用reduce()方法實現的。
  3. 對於返回是數組的狀況,毫無疑問的結果會放在數組當中。這麼說固然是對的,但在最終返回數組以前,結果實際上是存儲在一種叫作Node的數據結構中的。Node是一種多叉樹結構,元素存儲在樹的葉子當中,而且一個葉子節點能夠存放多個元素。這樣作是爲了並行執行方便。關於Node的具體結構,咱們會在下一節探究Stream如何並行執行時給出詳細說明。

結語

本文詳細介紹了Stream流水線的組織方式和執行過程,學習本文將有助於理解原理並寫出正確的Stream代碼,同時打消你對Stream API效率方面的顧慮。如你所見,Stream API實現如此巧妙,即便咱們使用外部迭代手動編寫等價代碼,也未必更加高效。

本文github地址

相關文章
相關標籤/搜索