java8 Stream Pipelines 淺析

相信如今不少人都已經使用過java8提供的java.util.stream編程接口,用起來是如此的。有如這夏天裏一縷清爽涼風,讓你眼前一亮的女神。所以我就想試着去了解女神的心裏,她爲什麼如此的美麗高雅。。。下面咱們就試着去了解Stream API。java

Stream的使用


首先咱們看一下stream的基本使用方法:編程


ArrayList<String> list = Lists.newArrayList("America", "ABC", "CNN", "OK", "ASYNC");
        List<String> strings = list.stream().filter(e -> e.startsWith("A")).map(e -> e + " nice").collect(Collectors.toList());

最終咱們會獲得ArrayList中以A開頭的字母加上「nice」的字符串List,若是放在jdk7裏咱們會這樣寫:數據結構


ArrayList<String> strings = Lists.newArrayList();
        for (String s : list) {
            if(s.startsWith("A")){
                String newStr = s + "nice";
                strings.add(newStr);
            }
        }

我試着去看源代碼,發現Stream實質上就是這樣執行咱們的需求的。下面就說說我看到了什麼。app


Stream相關類的介紹


打開java.util.stream包,能夠看到核心接口Stream類,顧名思義就是流水的意思,官方文檔原話說的是less

A sequence of elements supporting sequential and parallel aggregate operations.

圖片描述

Stream就是一個支持串行和並行的彙集操做的一系列元素。
定義了一些中間操做(Intermediate operations)結束操做(Terminal operations)
中間操做包括無狀態(Stateless)操做好比:filter, map, flatMap等,有狀態(Stateful)操做好比:distinct, sorted, limit等;
結束操做(Terminal operations)包括非短路操做(short-circuiting)好比:forEach, reduce, collect等和短路操做如:findFirst, findAny;ide

中間操做不是真正的操做而是一種操做的描述,只有執行到結束操做纔會觸發實際計算,在結束操做執行以前只是把中間操做記錄了下來。無狀態中間操做指元素的操做不受其餘元素的影響,好比以某一Predicate去filter元素,元素和元素以前不互相影響。而有狀態中間操做指的是元素和元素之間是有關聯的,好比sorted,只有讀取全部元素以後才能肯定排序結果。函數

短路結束操做指的是不用處理全部元素才能返回結果,好比findFirst,只要找到第一個符合條件的元素便可返回結果。非短路結束操做則必須處理完全部元素才能返回結果。學習

Stream繼承了BaseStream,定義了一些Stream的基本操做。ui


Pipeline記錄操做


以上所說的操做須要被按順序記錄下來,這裏就須要管道流水線Pipeline的概念來實現。this

管道有一個基類PipelineHelper,他是執行Stream管道的一個helper,將Stream的全部信息收集到一個地方。

上面所說的操做其實都定義在PipelineHelper的一個子類ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)StatelessOp(Base class for a stateless intermediate stage of a Stream.)StatefulOp(Base class for a stateful intermediate stage of a Stream.)靜態內部類。

ReferencePipeline是描述中間操做管道流和源管道流的一個類,同時也實現了Stream接口
圖片描述


在Stream中使用stage(階段)來描述一個完整的操做,而HeadStatelessOpStatefulOp這三個操做都是實例化的PipelineHelper,也就是stage。能夠把stage理解爲帶管道的流(Stream with Pipeline)


圖片描述


在本文一開始的例子中,咱們分析一下有幾個stage,下圖:


圖片描述


每一步Stream的方法調用都產生一個新的stage,在隨後的分析中會發現,這些stage會以雙向鏈表的方式連接,而每一個stage都記錄了每個階段的操做,這樣咱們就能夠依賴這種數據結構來保存對數據源的全部操做了。


連接stage


stage的連接靠Sink來實現,咱們先看一下Sink的接口,咱們這裏只看ChainedReference


圖片描述


ChainedReference包括:

  • begin:在遍歷元素前調用,作好遍歷準備
  • accept:遍歷每一個元素的時候調用,包含每一個stage的操做和回掉函數
  • end:遍歷結束後調用
  • cancellationRequested:是否可以儘早結束遍歷,用於短路操做

每一個stage都把操做實如今Sink裏,上游stage調用下游stageaccept方法,達到按順序執行每一個操做的目的。


stage的自動執行


直接上代碼

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

上面代碼是Streamfilter方法,fiter是一個無狀態操做,返回一個新的stage,還實現了AbstractPipeline.opWrapSink來返回stage實現的sink。這裏filter的參數是一個predicate,在predicate.test返回true時調用下游的stage的sink的accept方法,這樣整個操做流就連續執行下去了。


stage的雙向連接


在說Stream自動執行以前,有必要說一說每一個stage是怎麼連接起來的。Stream在操做時產生的Operation類是如何用雙向鏈表的結構來先後連接的?
在上面Stream.filter的源代碼能夠看到,filter返回了一個StatelessOp對象,構造函數接受了當前對象this爲第一個參數,而後來看StatelessOp的代碼:

abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * Construct a new Stream by appending a stateless intermediate
         * operation to an existing stream.
         *
         * @param upstream The upstream pipeline stage
         * @param inputShape The stream shape for the upstream pipeline stage
         * @param opFlags Operation flags for the new stage
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                    StreamShape inputShape,
                    int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

能夠看到StatelessOp實現了ReferencePipeline接口,在構造函數裏調用了super(upstream, opFlags),而這個upstream(上游流)參數就是上面傳入的this,下游流StatelessOpupstream就指向this了,這樣就經過下游流的upstream連接上游流。目前每一個操做之間還只是單鏈表。

那有人就會想了,下游流保存了上游流的引用,那上游流是怎麼保存下游流的引用呢?這就要看最後的結束操做了,咱們來看Stream.collect代碼:

public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        A container;
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
            container = collector.supplier().get();
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
            forEach(u -> accumulator.accept(container, u));
        }
        else {
            container = evaluate(ReduceOps.makeRef(collector));
        }
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

這裏咱們只看串行操做的分支。filter返回了一個結束操做的計算結果。咱們來看evaluate方法:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

AbstractPipeline.evaluate方法接收了一個結束操做對象,咱們只看串行操做:

public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

繼續看AbstractPipeline.wrapAndCopyInto

@Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

AbstractPipeline.wrapAndCopyInto接收告終束操做的sink,繼續看AbstractPipeline.wrapSink:

@Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

從結束操做的sink開始,一層一層包裝sink,最後第一個中間操做的sink在最外層,在每一個操做的opWrapSink方法裏返回的sink都維護了一個downstream指向後一個操做,這樣,雙向鏈表的結構就完成了。這樣,咱們在copyInto方法裏調用beginacceptend的時候就會經過downstream一層一層的調用下去,最終在結束操做執行實際計算。


結束Stream的基本原理就分析到這裏,比較淺,並且思路也有點混亂,有不少問題在裏面,但願你們和我一塊兒討論學習。但願看不明白的童鞋能夠向我提問,看過源碼的童鞋歡迎指出錯誤!你們一塊兒學習!

相關文章
相關標籤/搜索