本文已收錄 【修煉內功】躍遷之路
Java8中新增的Stream,相信使用過的同窗都已經感覺到了它的便利,容許你以聲明性的方式處理集合,而不用去作繁瑣的for-loop/while-loop,而且能夠以極低的成本並行地處理集合數據java
若是須要從菜單中篩選出卡路里在400如下的菜品,並按卡路里排序後,輸出菜品名稱segmentfault
在java8以前,須要進行兩次顯示迭代,而且還須要藉助中間結果存儲併發
List<Menu> lowCaloricDishes = new LinkedList<>(); // 按照熱量值進行篩選 for(Dish dish : dishes) { if (dish.getCalories() < 400) { lowCaloricDishes.add(dish); } } // 按照熱量進行排序 lowCaloricDishes.sort(new Comparator<Dish>() { @Override public int compare(Dish d1, Dish d2) { return d1.getCalories().compareTo(d2.getCalories); } }) // 提取名稱 List<String> lowCaloricDishesName = new LinkedList<>(); for(Dish dish : lowCaloricDishes) { lowCaloricDishesName.add(dish.getName()); }
若是使用Stream API,只須要app
List<String> lowCaloricDishesName = dishes.parallelStream() // 開啓並行處理 .filter(d -> d.getCalories() < 400) // 按照熱量值進行篩選 .sorted(Comparator.comparing(Dish::getCalories)) // 按照熱量進行排序 .map(Dish::getName) // 提取名稱 .collect(Collectors.toList()); // 將結果存入List
甚至,能夠寫出更復雜的功能框架
Map<Integer, List<String>> lowCaloricDishesNameGroup = dishes.parallelStream() // 開啓並行處理 .filter(d -> d.getCalories() < 400) // 按照熱量值進行篩選 .sorted(comparing(Dish::getCalories)) // 按照熱量進行排序 .collect(Collectors.groupingBy( // 將菜品名按照熱量進行分組 Dish::getCalories, Collectors.mapping(Dish::getName, Collectors.toList()) ));
是否是很是簡潔,而且愈加形似SQL
如此簡潔的API是如何實現的?中間過程是如何銜接起來的?每一步都會進行一次迭代麼,須要中間結果存儲麼?並行處理是怎麼作到的?less
Stream使用一種相似SQL語句的方式,提供對集合運算的高階抽象,能夠將其處理的元素集合看作一種數據流,流在管道中傳輸,數據在管道節點上進行處理,好比篩選、排序、聚合等ide
數據流在管道中通過中間操做(intermediate operation)處理,由終止操做(terminal operation)獲得前面處理的結果
和以往的集合操做不一樣,Stream操做有兩個基礎特徵:函數
pipelining:中間操做
會返回流對象,多個操做最終串聯成一個管道,管道並不直接操做數據,最終由終止操做
觸發數據在管道中的流動及處理,並收集最終的結果oop
Stream的實現使用流水線( pipelining)的方式巧妙的避免了屢次迭代,其基本思想是在 一次迭代中儘量多的執行用戶指定的操做
Stream操做分爲兩類:中間操做
及終止操做
ui
中間操做:將流一層層的進行處理,並向下一層進行傳遞,如 filter
map
sorted
等
中間操做又分爲有狀態(stateful)及無狀態(stateless)
sorted
filter
map
終止操做:觸發數據的流動,並收集結果,如collect
findFirst
forEach
等
終止操做又分爲短路操做(short-circuiting)及非短路操做(non-short-circuiting)
anyMatch
findFirst
等collect
max
等
Stream採用某種方式記錄用戶每一步的操做,當用戶調用終止操做時將以前記錄的操做疊加到一塊兒,儘量地在一次迭代中所有執行掉,那麼
Stream中使用Stage的概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將各Pipeline按照前後順序鏈接到一塊兒,就構成了整個流水線
與Stream相關類和接口的繼承關係以下圖
Head用於表示第一個Stage,該Stage不包含任何操做
StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage
使用Collection.stream
Arrays.stream
或Stream.of
等接口會生成Head
,其內部均採用StreamSupport.stream
方法,將原始數據包裝爲Spliterator
存放在Stage中
Head StatelessOp StatefulOp三個操做實例化會指向其父類AbstractPipeline
對於Head
/** * Constructor for the head of a stream pipeline. * * @param source {@code Spliterator} describing the stream source * @param sourceFlags the source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel {@code true} if the pipeline is parallel */ AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }
其會將包裝爲Spliterator的原始數據存放在Stage中,並將自身存放在sourceStage中
對於StatelessOp及StatefulOp
/** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */ AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
每個Stage都會存放原始的sourceStage,即Head
經過previousStage及nextStage,將各Stage串聯爲一個雙向鏈表,使得每一步都知道上一步與下一步的操做
以上已經解決了如何記錄操做的問題,想要讓pipeline運行起來,須要一種將全部操做疊加到一塊兒的方案
因爲前面的Stage並不知道後面的Stage致使須要執行何種操做,只有當前Stage自己知道該如何執行本身包含的動做,這就須要某種協議來協調相鄰Stage之間的調用關係
Stream類庫採用了Sink接口來協調各Stage之間的關係
interface Sink<T> extends Consumer<T> { /** * Resets the sink state to receive a fresh data set. This must be called * before sending any data to the sink. After calling {@link #end()}, * you may call this method to reset the sink for another calculation. * @param size The exact size of the data to be pushed downstream, if * known or {@code -1} if unknown or infinite. * * <p>Prior to this call, the sink must be in the initial state, and after * this call it is in the active state. * * 開始遍歷前調用,通知Sink作好準備 */ default void begin(long size) {} /** * Indicates that all elements have been pushed. If the {@code Sink} is * stateful, it should send any stored state downstream at this time, and * should clear any accumulated state (and associated resources). * * <p>Prior to this call, the sink must be in the active state, and after * this call it is returned to the initial state. * * 全部元素遍歷完成後調用,通知Sink沒有更多元素了 */ default void end() {} /** * Indicates that this {@code Sink} does not wish to receive any more data. * * @implSpec The default implementation always returns false. * * @return true if cancellation is requested * * 是否能夠結束操做,可讓短路操做盡早結束 */ default boolean cancellationRequested() {} /** * Accepts a value. * * @implSpec The default implementation throws IllegalStateException. * * @throws IllegalStateException if this sink does not accept values * * 遍歷時調用,接收的一個待處理元素,並對元素進行處理 * Stage把本身包含的操做和回調方法封裝到該方法裏,前一個Stage只須要調用當前Stage.accept方法便可 */ default void accept(T value) {} }
Sink的四個接口方法經常相互協做,共同完成計算任務
實際上Stream API內部實現的的本質,就是如何重載Sink的這四個接口方法,下面結合具體源碼來理解Stage是如何將自身的操做包裝秤Sink,以及Sink是如何將處理結果轉發給下一個Sink的
無狀態Stage,Stream.map
// Stream.map 將生成一個新Stream public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { // 該方法將回調函數(處理邏輯)包裝成Sink @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { // 接收數據,使用當前包裝的回調函數處理數據,並傳遞給下游Sink downstream.accept(mapper.apply(u)); } }; } }; }
上述代碼邏輯很是簡單,接下來能夠看一下有狀態Stage,Stream.sorted
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { // 存放用於排序的元素 private ArrayList<T> list; RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super(sink, comparator); } @Override public void begin(long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException(Nodes.BAD_SIZE); // 建立用於存放排序元素的列表 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); } @Override public void end() { // 只有在接收到全部元素後纔開始排序 list.sort(comparator); downstream.begin(list.size()); // 排序完成後,將數據傳遞給下游Sink if (!cancellationWasRequested) { // 下游Sink不包含短路操做,將數據依次傳遞給下游Sink list.forEach(downstream::accept); } else { // 下游Sink包含短路操做 for (T t : list) { // 對於每個元素,都要詢問是否能夠結束處理 if (downstream.cancellationRequested()) break; // 將元素傳遞給下游Sink downstream.accept(t); } } // 告知下游Sink數據傳遞完畢 downstream.end(); list = null; } @Override public void accept(T t) { // 依次將須要排序的元素加入到臨時列表中 list.add(t); } }
Stream.sorted會在接收到全部元素以後再進行排序,在此以後纔開始將數據依次傳遞給下游Sink
Sink就如齒輪,每一步的操做邏輯是封裝在Sink中的,那各Sink是如何串聯咬合在一塊兒的,首個Sink又是如何啓動來觸發整個pipeline執行的?
結束操做(TerminalOp)以後不能再有別的操做,結束操做會建立一個包裝了本身操做的Sink,這個Sink只處理數據而不會將數據傳遞到下游Sink
TerminalOp的類圖很是簡單
FindOp: 用於查找,如findFirst
,findAny
,生成FindSink
ReduceOp: 用於規約,如reduce
collect
,生成ReduceSink
MatchOp: 用於匹配,如allMatch
anyMatch
,生成MatchSink
ForEachOp: 用於遍歷,如forEach
,生成ForEachSink
在調用Stream的終止操做時,會執行AbstractPipeline.evaluate
/** * Evaluate the pipeline with a terminal operation to produce a result. * * @param <R> the type of result * @param terminalOp the terminal operation to be applied to the pipeline. * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各類終止操做 */) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 併發執行 */ : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行執行 */ }
最終會根據是否並行執行TerminalOp中不一樣的的evaluate方法,在TerminalOp的evaluate方法中會調用helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get()
來串聯各層Sink,觸發pipeline,並獲取最終結果,那TerminalOp究竟是如何串聯各層Sink的?
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; }
其中玄機盡在warpSink
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); // AbstractPipeline.this,最後一層Stage for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { // 從下游向上遊遍歷,不斷包裝Sink sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一層Stage的Sink */); } return (Sink<P_IN>) sink; }
還記得opWrapSink
麼?它會返回一個新的Sink,實現begin
end
accept
等方法,當前Stage的處理邏輯封裝在其中,並將處理後的結果傳遞給下游的Sink
這樣,便將從開始到結束的全部操做都包裝到了一個Sink裏,執行這個Sink就至關於執行首個Sink,並帶動全部下游的Sink,使整個pipeline運行起來
有了包含全部操做的Sink,如何執行Sink呢?wrapAndCopyInto
中還有一個copyInto
方法
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { // 不包含短路操做 // 1. begin wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 2. 遍歷調用 sink.accept spliterator.forEachRemaining(wrappedSink); // 3. end wrappedSink.end(); } else { // 包含短路操做 copyIntoWithCancel(wrappedSink, spliterator); } } final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this; while (p.depth > 0) { p = p.previousStage; } // 1. begin wrappedSink.begin(spliterator.getExactSizeIfKnown()); // 2. 遍歷調用 sink.accept // 每一次遍歷都詢問cancellationRequested結果 // 若是cancellationRequested爲true,則中斷遍歷 p.forEachWithCancel(spliterator, wrappedSink); // 3. end wrappedSink.end(); }
copyInto
會根據不一樣的狀況依次
sink.bigin
sink.accept
若是包含短路操做,則每次遍歷都須要詢問cancellationRequested,適時中斷遍歷
sink.end
各層Stage經過Sink協議將全部的操做串聯到一塊兒,遍歷原始數據並執行,終止操做會建立一個包裝了本身操做的TerminalSink,該Sink中處理最終的數據並作數據收集(若是須要),每一種TerminalSink中均會提供一個獲取最終數據的方法
TerminalOp經過調用TerminalSink中的對應方法,獲取最終的數據並返回,如ReduceOp中
@Override public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator)/* 執行各Sink */.get()/* 獲取最終數據 */; }
使用Collection.parallelStream
或Stream.parallel
等方法能夠將當前的流標記
爲併發,從新來看AbstractPipeline.evaluate
,該方法會在終止操做時被執行
/** * Evaluate the pipeline with a terminal operation to produce a result. * * @param <R> the type of result * @param terminalOp the terminal operation to be applied to the pipeline. * @return the result */ final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各類終止操做 */) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 併發執行 */ : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行執行 */ }
若是被標記爲sequential
,則會調用TerminalOp.evaluateSequential
,evaluateSequential的調用過程上文已經講述的很清楚
若是被標記爲parallel
,則會調用TerminalOp.evaluateParallel
,對於該方法不一樣的TerminalOp會有不一樣的實現,但都使用了ForkJoin框架,將原始數據不斷拆分爲更小的單元,對每個單元作上述evaluateSequential
相似的動做,最後將每個單元計算的結果依次整合,獲得最終結果
默認狀況下,ForkJoin的線程數即爲機器的CPU核數,若是想自定義Stream並行執行的線程數,能夠參考Custom Thread Pools In Java 8 Parallel Streams
在將原始數據進行拆分的時候,拆分的策略是什麼?拆分的粒度又是什麼(拆分到什麼程度)?
還記得上文所說,原始數據是如何存放的麼?Spliterator
(可分迭代器 splitable iterator),不管使用何種API,均會將原始數據封裝爲Spliterator
後存放在Stage中,在進行parallel計算時,對原始數據的拆分以及拆分粒度都是基於Spliterator
的,和Iterator同樣,Spliterator也用於遍歷數據源中的數據,但它是專門爲並行執行而設計的
public interface Spliterator<T> { /** * 若是還有元素須要遍歷,則遍歷該元素並執行action,返回true,不然返回false */ boolean tryAdvance(Consumer<? super T> action); /** * 若是能夠,則將一部分元素劃分出去,構造另外一個Spliterator,使得兩個Spliterator能夠並行處理 */ Spliterator<T> trySplit(); /** * 估算還有多少元素須要遍歷 */ long estimateSize(); /** * 遍歷全部未遍歷的元素 */ default void forEachRemaining(Consumer<? super T> action) { do { } while (tryAdvance(action)); } }
動圖以下
在使用Stream parallel時,若是默認Spliterator的拆分邏輯不能知足你的需求,即可以自定義Spliterator,具體示例能夠參考《Java 8 in Action》中『7.3.2 實現你本身的Spliterator』
Head
會生成一個不包含任何操做的Stage,並將原始數據Spliterator
存放在sourceStage
中StagelessOp
StagefulOp
將當前操做封裝在Sink中,生成一個新的Stage,並使用雙鏈表結構將先後的Stage連接在一塊兒,Sink用於調用當前指定的操做處理數據,並將處理後的結果傳遞給下游SinkTerminalOp
生成一個TerminalSink
,從下游向上遊遍歷Stage,不斷包裝各Stage中的Sink,最終生成一個串聯了全部操做的TerminalSink,適時調用該Sink的begin
accept
end
等方法,觸發整個pipeline的數據流動及處理,最終調用TerminalSink的get
方法,獲取最終結果(若是有)JavaLambdaInternals - 6-Stream Pipelines)
java8實戰:Stream執行原理