【修煉內功】[Java8] Stream是怎麼工做的

本文已收錄 【修煉內功】躍遷之路

clipboard.png

Java8中新增的Stream,相信使用過的同窗都已經感覺到了它的便利,容許你以聲明性的方式處理集合,而不用去作繁瑣的for-loop/while-loop,而且能夠以極低的成本並行地處理集合數據java

若是須要從菜單中篩選出卡路里在400如下的菜品,並按卡路里排序後,輸出菜品名稱segmentfault

在java8以前,須要進行兩次顯示迭代,而且還須要藉助中間結果存儲併發

List<Menu> lowCaloricDishes = new LinkedList<>();

// 按照熱量值進行篩選
for(Dish dish : dishes) {
  if (dish.getCalories() < 400) {
    lowCaloricDishes.add(dish);
  }
}

// 按照熱量進行排序
lowCaloricDishes.sort(new Comparator<Dish>() {
  @Override
  public int compare(Dish d1, Dish d2) {
    return d1.getCalories().compareTo(d2.getCalories);
  }
})

// 提取名稱
List<String> lowCaloricDishesName = new LinkedList<>();
for(Dish dish : lowCaloricDishes) {
  lowCaloricDishesName.add(dish.getName());
}

若是使用Stream API,只須要app

List<String> lowCaloricDishesName = 
    dishes.parallelStream() // 開啓並行處理
          .filter(d -> d.getCalories() < 400) // 按照熱量值進行篩選
          .sorted(Comparator.comparing(Dish::getCalories)) // 按照熱量進行排序
          .map(Dish::getName) // 提取名稱
          .collect(Collectors.toList()); // 將結果存入List

甚至,能夠寫出更復雜的功能框架

Map<Integer, List<String>> lowCaloricDishesNameGroup = 
    dishes.parallelStream() // 開啓並行處理
          .filter(d -> d.getCalories() < 400) // 按照熱量值進行篩選
          .sorted(comparing(Dish::getCalories)) // 按照熱量進行排序
          .collect(Collectors.groupingBy( // 將菜品名按照熱量進行分組
              Dish::getCalories, 
              Collectors.mapping(Dish::getName, Collectors.toList())
          ));

是否是很是簡潔,而且愈加形似SQL
如此簡潔的API是如何實現的?中間過程是如何銜接起來的?每一步都會進行一次迭代麼,須要中間結果存儲麼?並行處理是怎麼作到的?less

什麼是Stream?

Stream使用一種相似SQL語句的方式,提供對集合運算的高階抽象,能夠將其處理的元素集合看作一種數據流,流在管道中傳輸,數據在管道節點上進行處理,好比篩選、排序、聚合等ide

stream-2

數據流在管道中通過中間操做(intermediate operation)處理,由終止操做(terminal operation)獲得前面處理的結果
和以往的集合操做不一樣,Stream操做有兩個基礎特徵:函數

  • pipelining:中間操做會返回流對象,多個操做最終串聯成一個管道,管道並不直接操做數據,最終由終止操做觸發數據在管道中的流動及處理,並收集最終的結果oop

    Stream的實現使用流水線( pipelining)的方式巧妙的避免了屢次迭代,其基本思想是在 一次迭代中儘量多的執行用戶指定的操做
  • 內部迭代:區別於以往使用iterator或者for-each等顯示地在集合外部進行迭代計算的方式,內部迭代隱式的在集合內部進行迭代計算

Stream操做分爲兩類:中間操做終止操做ui

  • 中間操做:將流一層層的進行處理,並向下一層進行傳遞,如 filter map sorted

    中間操做又分爲有狀態(stateful)及無狀態(stateless)

    • 有狀態:必須等上一步操做完拿到所有元素後纔可操做,如sorted
    • 無狀態:該操做的數據不收上一步操做的影響,如filter map
  • 終止操做:觸發數據的流動,並收集結果,如collect findFirst forEach

    終止操做又分爲短路操做(short-circuiting)及非短路操做(non-short-circuiting)

    • 短路操做:會在適當的時刻終止遍歷,相似於break,如anyMatch findFirst
    • 非短路操做:會遍歷全部元素,如collect max

stream-3

Stream採用某種方式記錄用戶每一步的操做,當用戶調用終止操做時將以前記錄的操做疊加到一塊兒,儘量地在一次迭代中所有執行掉,那麼

  1. 用戶的操做如何記錄?
  2. 操做如何疊加?
  3. 疊加後的操做如何執行?
  4. 執行後的結果(若是有)在哪裏?

Stream如何實現

操做如何記錄

Stream中使用Stage的概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將各Pipeline按照前後順序鏈接到一塊兒,就構成了整個流水線

與Stream相關類和接口的繼承關係以下圖

stream-4

Head用於表示第一個Stage,該Stage不包含任何操做
StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage

stream-5

使用Collection.stream Arrays.streamStream.of等接口會生成Head,其內部均採用StreamSupport.stream方法,將原始數據包裝爲Spliterator存放在Stage中

  • Head記錄Stream起始操做,將包裝爲Spliterator的原始數據存放在Stage中
  • StatelessOp記錄無狀態的中間操做
  • StatefulOp記錄有狀態的中間操做
  • TerminalOp用於觸發數據數據在各Stage間的流動及處理,並收集最終數據(若是有)

Head StatelessOp StatefulOp三個操做實例化會指向其父類AbstractPipeline

對於Head

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

其會將包裝爲Spliterator的原始數據存放在Stage中,並將自身存放在sourceStage中

對於StatelessOp及StatefulOp

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

每個Stage都會存放原始的sourceStage,即Head

經過previousStage及nextStage,將各Stage串聯爲一個雙向鏈表,使得每一步都知道上一步與下一步的操做

操做如何疊加

以上已經解決了如何記錄操做的問題,想要讓pipeline運行起來,須要一種將全部操做疊加到一塊兒的方案

因爲前面的Stage並不知道後面的Stage致使須要執行何種操做,只有當前Stage自己知道該如何執行本身包含的動做,這就須要某種協議來協調相鄰Stage之間的調用關係

Stream類庫採用了Sink接口來協調各Stage之間的關係

interface Sink<T> extends Consumer<T> {
    /**
     * Resets the sink state to receive a fresh data set.  This must be called
     * before sending any data to the sink.  After calling {@link #end()},
     * you may call this method to reset the sink for another calculation.
     * @param size The exact size of the data to be pushed downstream, if
     * known or {@code -1} if unknown or infinite.
     *
     * <p>Prior to this call, the sink must be in the initial state, and after
     * this call it is in the active state.
     *
     * 開始遍歷前調用,通知Sink作好準備
     */
    default void begin(long size) {}

    /**
     * Indicates that all elements have been pushed.  If the {@code Sink} is
     * stateful, it should send any stored state downstream at this time, and
     * should clear any accumulated state (and associated resources).
     *
     * <p>Prior to this call, the sink must be in the active state, and after
     * this call it is returned to the initial state.
     *
     * 全部元素遍歷完成後調用,通知Sink沒有更多元素了
     */
    default void end() {}
    
    /**
     * Indicates that this {@code Sink} does not wish to receive any more data.
     *
     * @implSpec The default implementation always returns false.
     *
     * @return true if cancellation is requested
     *
     * 是否能夠結束操做,可讓短路操做盡早結束
     */
    default boolean cancellationRequested() {}

    /**
     * Accepts a value.
     *
     * @implSpec The default implementation throws IllegalStateException.
     *
     * @throws IllegalStateException if this sink does not accept values
     *
     * 遍歷時調用,接收的一個待處理元素,並對元素進行處理
     * Stage把本身包含的操做和回調方法封裝到該方法裏,前一個Stage只須要調用當前Stage.accept方法便可
     */
    default void accept(T value) {}
}

Sink的四個接口方法經常相互協做,共同完成計算任務

實際上Stream API內部實現的的本質,就是如何重載Sink的這四個接口方法,下面結合具體源碼來理解Stage是如何將自身的操做包裝秤Sink,以及Sink是如何將處理結果轉發給下一個Sink的

無狀態Stage,Stream.map

// Stream.map 將生成一個新Stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        // 該方法將回調函數(處理邏輯)包裝成Sink
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    // 接收數據,使用當前包裝的回調函數處理數據,並傳遞給下游Sink
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

上述代碼邏輯很是簡單,接下來能夠看一下有狀態Stage,Stream.sorted

private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    // 存放用於排序的元素
    private ArrayList<T> list;

    RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
        super(sink, comparator);
    }

    @Override
    public void begin(long size) {
        if (size >= Nodes.MAX_ARRAY_SIZE)
            throw new IllegalArgumentException(Nodes.BAD_SIZE);
        // 建立用於存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }

    @Override
    public void end() {
        // 只有在接收到全部元素後纔開始排序
        list.sort(comparator);
        downstream.begin(list.size());
        // 排序完成後,將數據傳遞給下游Sink
        if (!cancellationWasRequested) {
            // 下游Sink不包含短路操做,將數據依次傳遞給下游Sink
            list.forEach(downstream::accept);
        }
        else {
            // 下游Sink包含短路操做
            for (T t : list) {
                // 對於每個元素,都要詢問是否能夠結束處理
                if (downstream.cancellationRequested()) break;
                // 將元素傳遞給下游Sink
                downstream.accept(t);
            }
        }
        // 告知下游Sink數據傳遞完畢
        downstream.end();
        list = null;
    }

    @Override
    public void accept(T t) {
        // 依次將須要排序的元素加入到臨時列表中
        list.add(t);
    }
}

Stream.sorted會在接收到全部元素以後再進行排序,在此以後纔開始將數據依次傳遞給下游Sink

stream-6

疊加後的操做如何執行

Sink就如齒輪,每一步的操做邏輯是封裝在Sink中的,那各Sink是如何串聯咬合在一塊兒的,首個Sink又是如何啓動來觸發整個pipeline執行的?
結束操做(TerminalOp)以後不能再有別的操做,結束操做會建立一個包裝了本身操做的Sink,這個Sink只處理數據而不會將數據傳遞到下游Sink

TerminalOp的類圖很是簡單

stream-7

FindOp: 用於查找,如findFirstfindAny,生成FindSink
ReduceOp: 用於規約,如reduce collect,生成ReduceSink
MatchOp: 用於匹配,如allMatch anyMatch,生成MatchSink
ForEachOp: 用於遍歷,如forEach,生成ForEachSink

stream-8

在調用Stream的終止操做時,會執行AbstractPipeline.evaluate

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各類終止操做 */) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 併發執行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行執行 */
}

最終會根據是否並行執行TerminalOp中不一樣的的evaluate方法,在TerminalOp的evaluate方法中會調用helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get()來串聯各層Sink,觸發pipeline,並獲取最終結果,那TerminalOp究竟是如何串聯各層Sink的?

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink /* TerminalSink */, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

其中玄機盡在warpSink

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    // AbstractPipeline.this,最後一層Stage
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // 從下游向上遊遍歷,不斷包裝Sink
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink /* 下一層Stage的Sink */);
    }
    return (Sink<P_IN>) sink;
}

還記得opWrapSink麼?它會返回一個新的Sink,實現begin end accept等方法,當前Stage的處理邏輯封裝在其中,並將處理後的結果傳遞給下游的Sink
這樣,便將從開始到結束的全部操做都包裝到了一個Sink裏,執行這個Sink就至關於執行首個Sink,並帶動全部下游的Sink,使整個pipeline運行起來

有了包含全部操做的Sink,如何執行Sink呢?wrapAndCopyInto中還有一個copyInto方法

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        // 不包含短路操做
        
        // 1. begin
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        // 2. 遍歷調用 sink.accept
        spliterator.forEachRemaining(wrappedSink);
        // 3. end
        wrappedSink.end();
    }
    else {
        // 包含短路操做
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    @SuppressWarnings({"rawtypes","unchecked"})
    AbstractPipeline p = AbstractPipeline.this;
    while (p.depth > 0) {
        p = p.previousStage;
    }
    // 1. begin
    wrappedSink.begin(spliterator.getExactSizeIfKnown());
    // 2. 遍歷調用 sink.accept
    //    每一次遍歷都詢問cancellationRequested結果
    //    若是cancellationRequested爲true,則中斷遍歷
    p.forEachWithCancel(spliterator, wrappedSink);
    // 3. end
    wrappedSink.end();
}

copyInto會根據不一樣的狀況依次

  1. 調用sink.bigin
  2. 遍歷調用sink.accept

    若是包含短路操做,則每次遍歷都須要詢問cancellationRequested,適時中斷遍歷

  3. 調用sink.end

執行後的結果在哪裏

各層Stage經過Sink協議將全部的操做串聯到一塊兒,遍歷原始數據並執行,終止操做會建立一個包裝了本身操做的TerminalSink,該Sink中處理最終的數據並作數據收集(若是須要),每一種TerminalSink中均會提供一個獲取最終數據的方法

stream-10

TerminalOp經過調用TerminalSink中的對應方法,獲取最終的數據並返回,如ReduceOp中

@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator)/* 執行各Sink */.get()/* 獲取最終數據 */;
}

併發是如何作到的

使用Collection.parallelStreamStream.parallel等方法能夠將當前的流標記爲併發,從新來看AbstractPipeline.evaluate,該方法會在終止操做時被執行

/**
 * Evaluate the pipeline with a terminal operation to produce a result.
 *
 * @param <R> the type of result
 * @param terminalOp the terminal operation to be applied to the pipeline.
 * @return the result
 */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp /* 各類終止操做 */) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) /* 併發執行 */
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); /* 串行執行 */
}

若是被標記爲sequential,則會調用TerminalOp.evaluateSequential,evaluateSequential的調用過程上文已經講述的很清楚
若是被標記爲parallel,則會調用TerminalOp.evaluateParallel,對於該方法不一樣的TerminalOp會有不一樣的實現,但都使用了ForkJoin框架,將原始數據不斷拆分爲更小的單元,對每個單元作上述evaluateSequential相似的動做,最後將每個單元計算的結果依次整合,獲得最終結果

stream-9

默認狀況下,ForkJoin的線程數即爲機器的CPU核數,若是想自定義Stream並行執行的線程數,能夠參考Custom Thread Pools In Java 8 Parallel Streams

在將原始數據進行拆分的時候,拆分的策略是什麼?拆分的粒度又是什麼(拆分到什麼程度)?

還記得上文所說,原始數據是如何存放的麼?Spliterator(可分迭代器 splitable iterator),不管使用何種API,均會將原始數據封裝爲Spliterator後存放在Stage中,在進行parallel計算時,對原始數據的拆分以及拆分粒度都是基於Spliterator的,和Iterator同樣,Spliterator也用於遍歷數據源中的數據,但它是專門爲並行執行而設計的

public interface Spliterator<T> {
    /**
     * 若是還有元素須要遍歷,則遍歷該元素並執行action,返回true,不然返回false
     */
    boolean tryAdvance(Consumer<? super T> action);
    
    /**
     * 若是能夠,則將一部分元素劃分出去,構造另外一個Spliterator,使得兩個Spliterator能夠並行處理
     */
    Spliterator<T> trySplit();
    
    /**
     * 估算還有多少元素須要遍歷
     */
    long estimateSize();
    
    /**
     * 遍歷全部未遍歷的元素
     */
    default void forEachRemaining(Consumer<? super T> action) {
        do { } while (tryAdvance(action));
    }
}

spliterator.png

動圖以下

spliterator

在使用Stream parallel時,若是默認Spliterator的拆分邏輯不能知足你的需求,即可以自定義Spliterator,具體示例能夠參考《Java 8 in Action》中『7.3.2 實現你本身的Spliterator』

結語

  1. Head會生成一個不包含任何操做的Stage,並將原始數據Spliterator存放在sourceStage
  2. 中間操做StagelessOp StagefulOp將當前操做封裝在Sink中,生成一個新的Stage,並使用雙鏈表結構將先後的Stage連接在一塊兒,Sink用於調用當前指定的操做處理數據,並將處理後的結果傳遞給下游Sink
  3. 終止操做TerminalOp生成一個TerminalSink,從下游向上遊遍歷Stage,不斷包裝各Stage中的Sink,最終生成一個串聯了全部操做的TerminalSink,適時調用該Sink的begin accept end等方法,觸發整個pipeline的數據流動及處理,最終調用TerminalSink的get方法,獲取最終結果(若是有)
  4. 被標記爲parallel的流,會使用ForkJoin框架,將原始流拆分爲更小的單元,對每個單元分別做計算,並將各單元的計算結果進行整合,獲得最終結果

JavaLambdaInternals - 6-Stream Pipelines)
java8實戰:Stream執行原理

訂閱號

相關文章
相關標籤/搜索