java8 Stream之原理

Stream

java8的Stream很重要,spring-reactor裏面用到reactor-core,而java8的stream與之很類似,搞懂了再看reactor-core一定事半功倍。
先看一下它的強大,這裏只是冰山一角:
從List<Student> 列表中取出name,將name組成一個List。
老代碼java

List<String> nameList = new ArrayList();
if(null != list){
	for(Student stu : list){
		nameList.add(stu.getName());
	}
}

JAVA8react

List<String> nameList = Optional.ofNullable(list).orElse(Collections.emptyList()).stream()
                 .map(Stu::getName).collect(Collectors.toList());

Stream.of 建立Stream

這裏給你們演示一下經過Stream.of建立Stream。
常見的集合經過stream()方法均可以建立Stream。 其實他們最終都是調用如下方法建立的。算法

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Stream.of有兩種建立Stream的方法。
第一種spring

Stream.of("a1")

第二種數組

Stream.of("a1","a2"); //這種經過Arrays.stream 構建

這裏介紹兩個相關的類:app


若是是單個元素,直接使用Spliterator進行構建。 若是是多個元素,會有一個優化,使用SpineBuffer構建。
若是是大數組,使用SpineBuffer,小數組是使用ArrayList。 如何使用SpineBuffer構建?less

Stream.builder().add("a1").add("a2").build();

Stream 相關概念

stream的操做分爲兩種:
一種是中間操做,就是不須要結果,只須要記錄這個過程,通常返回Stream對象都是屬於這種
一種是終極操做,就是當即須要返回結果,通常返回非Stream對象,都是屬於這種。
stream的狀態分爲三種:
第一種:Head,第一次建立的時候就是這種
第二種:Stateless,無狀態,每一個對象的操做是獨立的。
第三種:Stateful,有狀態,須要聯合多個象才能得出結果。
stream操做特性:
操做特性是指:該stream有固定大小,大小不固定,操做有序,數據有序等。ide

Stream.filter

顧名思義:對 Stream進行filter,而後返回新的Stream。 由前一節咱們知道,stream的具體數據存儲在Spliterator中。而它自己能夠理解爲只是一個算法。
filter只是一箇中間操做,咱們只須要記錄這一個過程就OK了。而後返回新的Stream。若是再次調用fileter,會再次返回一個新Stream。

上面是一個流程圖,Sink是包裝算子的一個類,好比調用filter,從Head裏面拿到對象,通過第一個Sink,再通過第二個Sink的運算,最終獲得結果。
下面是Strea.filter的源碼實現:優化

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        //若是經過當前filter,就進入下一個算子
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

Stream.peek

這個方法能夠理解爲調試方法,它不對結果產生任何影響,將數據原封不動的傳給下一個算子ui

public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
        Objects.requireNonNull(action);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     0) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        action.accept(u);
                        downstream.accept(u);
                    }
                };
            }
        };
    }

Stream.flatMap

算子應該是經過一個對象映身成一個Stream,而後調用foreach,將每一個元素傳遞到下一個算子。

public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
        Objects.requireNonNull(mapper);
        // We can do better than this, by polling cancellationRequested when stream is infinite
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        try (Stream<? extends R> result = mapper.apply(u)) {
                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                            if (result != null)
                                result.sequential().forEach(downstream);
                        }
                    }
                };
            }
        };
    }

Stream.map

與上面的相似,只是映射成另外一個對象

public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

Stream.limit

這是一個有狀態的操做,由於它返回必定數據的數據組成的Stream。 這裏只貼一段核心算法:

Sink<T> opWrapSink(int flags, Sink<T> sink) {
                return new Sink.ChainedReference<T, T>(sink) {
                    long n = skip;
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        if (n == 0) {
                            if (m > 0) {
                                m--;
                                downstream.accept(t);
                            }
                        }
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }

Stream.skip

這個與Stram.limit相似,兩個聯合起來就能夠分面查詢了。

Stream.sorted

排序,若是沒傳比較器就用默認的。
若是有順序,就不用排序了,若是給定大小了就用一個固定大小的數組來排序,不然用一個列來來排序。

public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            // If the input is already naturally sorted and this operation
            // also naturally sorted then this is a no-op
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                return sink;
            else if (StreamOpFlag.SIZED.isKnown(flags))
                return new SizedRefSortingSink<>(sink, comparator);
            else
                return new RefSortingSink<>(sink, comparator);
        }

經過排序,分頁,說明這個算子須要支持開始,結束方法。還須要一個取消方法,爲何了,好比第一個Stream有20個對象,可是後面只須要第一個,因此我第一個算子給到你一個數據時,第一個算子就須要終止了。

Stream.anyMatch

下面看一個anyMatch是怎麼實現的。

@Override
    public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
    }

第二步,主要是用當前stream,和原始的數據容器spliterator

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

第三步,最後一個算子和原始容器

@Override
        public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
                                              Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
        }

第四步 包裝算子

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

第五步 數據傳遞

@Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
	
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
     //知足要求後,是否須要中止計算
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
		//須要中止計算
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

Stream.spliterator

只須要一個Sink,而後調用wrapSink,再copyInto就能夠實現了

final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
                                     Supplier<Spliterator<P_IN>> supplier,
                                     boolean isParallel) {
        return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
    }
相關文章
相關標籤/搜索