JAVA8之數據流Stream

  在JAVA8以前的傳統編程方式,若是咱們須要操做一個集合數據,須要使用集合提供的API,經過一個循環去獲取集合的元素,這種訪問數據的方式會使代碼顯得臃腫,JAVA8新引入的Stream類,用於從新封裝集合數據,經過使用流式Stream代替經常使用集合數組、list和map的遍歷操做能夠極大的提升效率。java

 

1、Stream的組成編程

數據源(Source) + 0個或多箇中間操做(intermediate operation)和終止操做(terminal operation)api

數據源:數據源頭,可爲數組、Collection、I/O資源和生成函數數組

 

2、構造流(Stream)app

Arrays構造流less

public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
        return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

由源碼可知Arrays類由傳入的數組生成一個Stream是委託StreamSupport來構建的,下面看下StreamSupport的源碼ide

 

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
}

 

StreamSupport首先對spliterator參數進行判空,而後把它做爲參數經過調用ReferencePipeline的內部類Head的構造函數生成一個ReferencePipeline.Head實例返回,經過對Head類源碼的閱讀可知他是ReferencePipeline的一個內部子類函數

 

static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
        
        Head(Spliterator<?> source,
             int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }

}

ReferencePipeline(Spliterator<?> source,
                      int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
}
AbstractPipeline(Spliterator<?> source,
                     int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
        this.depth = 0;
        this.parallel = parallel;
}

Head構造函數內部調用父類ReferencePipeline的構造函數,而ReferencePipeline的構造函數則調用父類AbstractPipeline構造函數最終完成Stream的構建,閱讀源碼咱們可知構造流的核心是Spliterator,而最初持有他的是AbstractPipeline,如今咱們來看下Spliterator接口源碼分析

 

下面是源碼對Spliterator的註釋ui

/*
* An object for traversing and partitioning elements of a source.  The source
* of elements covered by a Spliterator could be, for example, an array, a
* {@link Collection}, an IO channel, or a generator function.
**/

翻譯過來的意思是該接口是用於對數據源進行遍歷和分區,即Spliterator對象封裝了對數據源和分區的能力,接口聲明的方法以下:

tryAdvance - 單元素遍歷

trySplit - 分區抽象

forEachRemaining - 批量遍歷

Stream - 實現原理

estimateSize - 默認實現,返回估計的大小

getExactSizeIfKnown - 默認實現,返回元素集合的確切大小

characteristics - 默認實現,返回當前spliterator源數據的一組特徵值

hasCharacteristics - 默認實現,是否源數據是否包含該特徵值

getComparator - 默認實現,若是該Spliterator操做的數據源是有序的,那麼返回他的Comparator

 

好了,如今咱們基本清楚Spliterator的做用和他在流中的定位,如今讓咱們回過頭看下Arrays根據數組構造Stream流的JDK源碼,深刻到AbstractPipeline咱們能夠看到,Stream構造流是經過委託StreamSupport實現的,而核心是構建一個ArraySpliterator對象,可見,構造流(Stream)的核心就是構造一個Spliterator

 

Collection構造流

由上述對Arrays構造流分析可知構建流的核心是Spliterator,咱們直接查看JAVA8中Collection構造流源碼

 default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
 }
 default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
 }

 

Collection容器構造流與Arrays同樣也是委託StreamSupport,代碼設計的好的一點是他的流構建是在接口層面實現的,經過把他自身做爲參數傳入返回一個返回了一個IteratorSpliterator對象,不一樣集合的返回的IteratorSpliterator對象的tryAdvance等遍歷方法不一樣,具體到IteratorSpliterator源碼以下

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (action == null) throw new NullPointerException();
            if (it == null) {
                it = collection.iterator();
                est = (long) collection.size();
            }
            if (it.hasNext()) {
                action.accept(it.next());
                return true;
            }
            return false;
        }    

 

可知IteratorSpliterator對源碼的遍歷處理是經過傳入一個Consumer消費函數消費不一樣Collection實現類Iterator迭代器獲取的元素實現,感謝JAVA8接口的default設計。

 

3、Stream經常使用方法列表

Stream中間操做方法列表

 

方法 參數 用途
concat Stream< ? extends T> a, Stream< ? extends T> b 將兩個流合起來造成新流
distinct 將流裏的元素按照Ojbect.equal方法進行聚合去重,返回一個去重結果的新流
empty 返回一個空的流
filter Predicate< ? super T> predicate 按照謂詞參數predicate過濾,返回過濾後的流Stream
flatMap Function< ? super T, ? extends Stream< ? extends R>> mapper 將流裏的元素T,按照參數Function進行處理,處理結果是一個子流Stream< ? extends R>,後續將子流flat打平,造成元素R的新流。相似的有flatToDouble、flatToInt和flatToLong
limit long maxSize 返回一個新流,只包含maxSize個元素,其餘被truncate掉
map Function< ? super T, ? extends R> mapper 經典的map操做,對流裏的每一個元素,經過參數mapper映射爲一個新的元素,返回新元素的流。相似map有mapToDouble、mapToInt和mapToLong
peek Consumer< ? super T> action 這個動做很是有趣,它並不改變流,而是對流裏的每一個元素執行一個Consumer,對其進行一次處理。原始流不變繼續返回
skip long n 跳過n個元素,從第n+1個元素開始返回一個新的流
sorted Comparator< ? super T> comparator 根據參數排序器對流進行排序,返回新的流。若是參數爲空,則按照天然序排

 

Stream終止操做方法列表

 

方法 參數 用途
allMatch Predicate< ? super T> predicate 根據謂詞函數判斷流裏的元素是否都知足,返回對應的boolean值
anyMatch Predicate< ? super T> predicate 根據謂詞函數判斷流裏的元素是否存在一個或多個知足,返回對應的boolean值
noneMatch Predicate< ? super T> predicate 根據謂詞函數判斷流裏的元素是否不存在任何一個元素知足,返回對應的boolean值
count 返回這個流裏元素的個數
findAny 返回一個Optional對象,這個等價於對於一個流執行一個select操做,返回一條記錄
findFirst 返回這個流裏的第一個元素的Optional,若是這個流不是有序的,則返回任意元素
forEach Consumer< ? super T> action 對這個流的每一個元素,執行參數Consumer
forEachOrdered Consumer<? super T> action 針對forEach在並行流裏對有序元素的輸出不足,這個方法確保並行流中按照原來順序處理
max Comparator<? super T> comparator 返回一個Optional值,包含了流裏元素的max,條件是按照參數排序器排序
min Comparator<? super T> comparator 返回一個Optional值,包含了流裏元素的min,條件是按照參數排序器排序
reduce BinaryOperator< T> accumulator 經典的reduce,就是根據一個二元操做算子,將流中的元素逐個累計操做一遍,初始元素以foundAny結果爲主
reduce T identity, BinaryOperator< T> accumulator 與上面的方法一致,只不過多了一個初始值,不須要findAny了
reduce U identity,BiFunction< U, ? super T, U> accumulator,BinaryOperator< U> combiner 最複雜的reduce,看到combiner會不會有聯想?它作的也是對於一個流裏的元素T,使用二元函數accumulator計算,計算的值累計到U上,由於以前的reduce要求流元素和結果元素類型一致,因此有限制。而該reduce函數,支持T和U類型不一樣,經過二元函數轉換,可是要求combiner會執行這個事情,要求「 combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)」
collect Supplier< R> supplier,BiConsumer< R, ? super T> accumulator,BiConsumer< R, R> combiner 超級強大的方法。常規的reduce是返回不可變的值。而collect能夠將reduce後的值升級爲一個可變容器。具體這個方法就是對流裏每一個元素T,將Supplier提供的值R做爲初始值,用BiConsumer的accumulator進行累加計算。combiner的做用和要求和reduce是同樣的
collect Collector< ? super T, A, R> collector 和上面的collect一致,只不過Collector封裝了一組上面的參數,T是流裏的元素,A是累計中間結果,R是返回值的類型(collect的話就是容器了)

 

 

4、Stream工做原理

下面咱們分析下Stream的工做原理

 

        Integer[] array = new Integer[]{1,2,3,4};
        Optional<Integer> result =  Stream.of(array).filter(v -> v > 2).sorted((v1, v2) -> v2.compareTo(v1)).limit(2).reduce((v1, v2) -> v1 + v2);

        System.out.println(result.get());

 

  首先調用Stream.of方法根據一個Integer對象數組構建了流,函數內部是經過調用Arrays.stream方法返回流,對應的Spliterator實現是ArraySpliterator,而後調用filter方法過濾,咱們分析下這個階段對應的源碼

 

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

由源碼可知filter把前面構造的流(Stream)自己做爲參數返回了一個StatelessOp實現對象,深刻StatelessOp類咱們知道它是ReferencePipeline的一個內部類,繼承ReferencePipeline,而上面咱們分析過ReferencePipeline繼承自AbstractPipeline,回到filter源碼,咱們看到在filter方法裏面實現了超類AbstractPipeline的OpWrapSink方法

    /**
     * Accepts a {@code Sink} which will receive the results of this operation,
     * and return a {@code Sink} which accepts elements of the input type of
     * this operation and which performs the operation, passing the results to
     * the provided {@code Sink}.
     *
     * @apiNote
     * The implementation may use the {@code flags} parameter to optimize the
     * sink wrapping.  For example, if the input is already {@code DISTINCT},
     * the implementation for the {@code Stream#distinct()} method could just
     * return the sink it was passed.
     *
     * @param flags The combined stream and operation flags up to, but not
     *        including, this operation
     * @param sink sink to which elements should be sent after processing
     * @return a sink which accepts elements, perform the operation upon
     *         each element, and passes the results (if any) to the provided
     *         {@code Sink}.
     */
    abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);

 

ChainedReference.java源碼

/**
     * Abstract {@code Sink} implementation for creating chains of
     * sinks.  The {@code begin}, {@code end}, and
     * {@code cancellationRequested} methods are wired to chain to the
     * downstream {@code Sink}.  This implementation takes a downstream
     * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
     * implementation of the {@code accept()} method must call the correct
     * {@code accept()} method on the downstream {@code Sink}.
     */
    static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }

經過對上述源碼的分析可知Sink(Consumer的一個派生類)在流中的做用實際上是用於控制流中間階段的數據、大小等狀態信息,在Sink方法中還定義了兩個方法,begin和end,begin在Sink的accept方法以前調用,end在accept方法以後調用,主要是用於對流程數據進行一些額外的控制,如今咱們在結合分析filter源碼發現,爲了維護支持流(Stream)的中間操做狀態信息,JAVA8流在結構上其實被設計成一個鏈表結構,一個Head起始節點,多箇中間節點StatelessMap(繼承自ReferencePipeline),而流程中管理和控制數據狀態信息的實際是其中的Sink。

  接下來是到sorted排序階段,咱們繼續深刻源碼。

ReferencePipeline.java

 

@Override
    public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
        return SortedOps.makeRef(this, comparator);
}

SortedOps.java

    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                                Comparator<? super T> comparator) {
        return new OfRef<>(upstream, comparator);
    }

排序是一個有狀態的中間操做,與filter階段相似sorted方法實際返回的是一個OfRef對象,深刻SortedOps的makeRef方法,可知返回了一個OfRef實例分析該類可知該類是StatefulOp的子類,該類持有一個排序器,解讀StatefulOp類源碼可知該類是流中間狀態的基類,對比StatelessOp類,接下來經過源碼分析方法的邏輯

SortedOps.OfRef

 

        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            // If the input is already naturally sorted and this operation
            // also naturally sorted then this is a no-op
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                return sink;
            else if (StreamOpFlag.SIZED.isKnown(flags))
                return new SizedRefSortingSink<>(sink, comparator);
            else
                return new RefSortingSink<>(sink, comparator);
        }

 

分析代碼可知實現邏輯與filter階段相似,都是經過Sink控制流的數據和中間狀態信息,流程邏輯是先對入參Sink判空,若是流有序,直接返回sink,不然判斷是否有界,若是有界返回一個SizedRefSortingSink對象,不然返回一個RefSortingSink對象,深刻兩個類,不出意外,前者內部是經過數組保存數據,後者是經過一個ArrayList實例保存,二者均是在end方法裏藉由內部排序器完成元素排序

 

  接下來的limit截取階段相似,讀者可自行分析,大致的實現邏輯與上述兩個階段並沒有二致

  最後到了終止操做階段reduce

ReferencePipeline.java

    @Override
    public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
        return evaluate(ReduceOps.makeRef(accumulator));
    }

AbstractPipeline.java

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

 

ReduceOps.java

        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                           Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

 

AbstractPipeline.java

    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

經過對源碼的分析可知,在Stream的中間階段若是沒有意外中斷,代碼執行到終止操做時纔開始執行前面定義的各個中間操做,也就是說Stream的中間操做的執行方式都是lazy,讀者可自行在中間流的opWrapSink打斷點,去掉流的終止操做進行校驗。

相關文章
相關標籤/搜索