Java8 Stream代碼詳解+BenchMark測試

一、基本介紹

一、建立方式

一、Array的Stream建立
一、直接建立
// main
Stream stream = Stream.of("a", "b", "c");
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);

​ 

// Stream.of()
@SafeVarargs
@SuppressWarnings("varargs") // Creating a stream from an array is safe
public static<T> Stream<T> of(T... values) {
    return Arrays.stream(values);
}
二、直接使用Arrays.stream工具建立
// main
String [] strArray = new String[] {"a", "b", "c"};
stream = Arrays.stream(strArray);

​ 下面是Arrays.stream的具體實現java

// Arrays.stream()
public static <T> Stream<T> stream(T[] array) {
    return stream(array, 0, array.length);
}
/**
 * Arrays.stream()
 * @param startInclusive 起始座標
 * @param endExclusive 最終座標
  */
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

​ StreamSupport.stream的實現使用的是ReferencePipeline.Head<>這個方法,注意這個方法,這個方法是Stream流水線解決方案的核心之一數組

// StreamSupport.stream()
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

​ 注意這裏的Spliterator,這個類是Stream實現並行的核心類。這裏Array生成的spliterator的特徵值是ordered和immutable。(目前沒看到關於特徵值的相關操做,具體解釋能夠看源碼的註釋)數據結構

/** 
 * ReferencePipeline.Head<>()
 * 默認生成一個ordered、immutable的Spliterator
 */
public static <T> Spliterator<T> spliterator(T[] array, int startInclusive, int endExclusive) {
    return Spliterators.spliterator(array, startInclusive, endExclusive,
                                    Spliterator.ORDERED | Spliterator.IMMUTABLE);
}
二、Collection的Stream建立
// main
List<Integer> integers = new ArrayList<>();
integers.stream();

​ Collection的Stream的建立使用的是Collection.stream方法app

// Collection.stream()
default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

​ 這個spliterator(),建立的是Spliterator下的特增值爲sized、subSized的Spliterator框架

// Collection.spliterator()
@Override
    default Spliterator<E> spliterator() {
        return Spliterators.spliterator(this, 0);
    }
// Spliterators.spliterator()
public static <T> Spliterator<T> spliterator(Collection<? extends T> c,
                                                 int characteristics) {
        return new IteratorSpliterator<>(Objects.requireNonNull(c),
                                         characteristics);
    }
/** 
 * IteratorSpliterator<>()
 * 默認生成一個sized、subSized的Spliterator
 */
public IteratorSpliterator(Collection<? extends T> collection, int characteristics) {
            this.collection = collection;
            this.it = null;
            this.characteristics = (characteristics & Spliterator.CONCURRENT) == 0
                                   ? characteristics | Spliterator.SIZED | Spliterator.SUBSIZED
                                   : characteristics;
        }

​ 最後仍是將Spliterator放入ReferencePipeline.Head<>方法建立了Streamless

// ReferencePipeline.Head<>
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
三、其餘建立方式
一、Stream.iterate()
Stream.iterate(1,i->i++)

​ 該方法放入一個seed值做爲種子值,使用第二個參數方法生成一個無限大小的Stream,特徵值與Array的Stream特徵值相同。dom

public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
        Objects.requireNonNull(f);
        final Iterator<T> iterator = new Iterator<T>() {
            @SuppressWarnings("unchecked")
            T t = (T) Streams.NONE;

            @Override
            public boolean hasNext() {
                return true;
            }

            @Override
            public T next() {
                return t = (t == Streams.NONE) ? seed : f.apply(t);
            }
        };
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
                iterator,
                Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
    }
二、Stream.generate()
Stream.generate(Math::random)

​ 該方法沒有放入種子值,放入的是一個Supplier,該類就是java1.8之後加入的函數式接口,該接口只有一個方法就是get()方法,用於提供生成Stream須要的每個的數據,最後生成長度最大爲9223372036854775807L(2的63次方-1)的Stream。ide

public static<T> Stream<T> generate(Supplier<T> s) {
        Objects.requireNonNull(s);
        return StreamSupport.stream(
                new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
    }
@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     *
     * @return a result
     */
    T get();
}

二、中間操做(intermediate operation)

​ 每個流能有多箇中間操做,中間操做的做用就是將原始的流轉化爲須要的流,而且爲惰性操做,關於惰性操做後面會有具體介紹。而且中間操做可分爲有狀態和無狀態兩種,兩種不一樣的操做在構成Stream流水線時會使用不一樣的建立方式和操做。函數

一、有狀態操做(statefulOp)

​ 一、Stream<T> distinct();// 除去流種重複的元素

​ 二、Stream<T> limit(long maxSize);// 只取前幾個元素

​ 三、Stream<T> skip(long n);// 跳過前幾個元素

​ 四、Stream<T> sorted();// 根據天然排序對流排序

​ 五、Stream<T> sorted(Comparator<? super T> comparator);// 根據本身實現的排序對流排序

二、無狀態操做(statelessOp)

​ 一、Stream<T> filter(Predicate<? super T> predicate);// 根據過濾規則過濾流種的元素

​ 二、<R> Stream<R> map(Function<? super T, ? extends R> mapper);// 將每個元素映射成另外一個元素

​ 三、<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);// 和上一個map映射不一樣的是,扁平映射會將流中的最基礎的元素映射出來

​ 四、Stream<T> peek(Consumer<? super T> action);// 每個元素都要作一下這個action

​ 五、IntStream mapToInt(ToIntFunction<? super T> mapper);// 映射爲IntStream

​ 六、LongStream mapToLong(ToLongFunction<? super T> mapper);// 映射爲LongStream

​ 七、DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);// 映射爲DoubleStream

二、結束操做(terminal operation)

​ 每一個流只能有一個結束操做,結束操做會將和以前的中間操做一同起做用,在使用告終束操做後該流便被消費掉了,不能再次使用。因爲流能夠是無限大的,因此也會有短路操做,當無限大的流使用了短路操做而且知足了短路條件時便會直接結束。

一、非短路操做

​ 一、void forEach(Consumer<? super T> action);// 每個元素都要作一下這個aciton

​ 二、void forEachOrdered(Consumer<? super T> action);// 確保並行時保持順序執行這個action

​ 三、Object[] toArray();// 轉化成Object數組

​ 四、<A> A[] toArray(IntFunction<A[]> generator);// 轉化成本身定義的數組

​ 五、T reduce(T identity, BinaryOperator<T> accumulator);// 匯聚,有起始值,操做

​ 六、Optional<T> reduce(BinaryOperator<T> accumulator);// 匯聚,無起始值,返回的是Optional對象

​ 七、<U> U reduce(U identity,BiFunction<U, ? super T, U> accumulator,BinaryOperator<U> comb iner);// 匯聚,有起始值,操做,合併

​ 八、<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator,BiConsumer<R, R> combiner);// 可變匯聚,本身實現匯聚,容器、操做、合併操做

​ 九、<R, A> R collect(Collector<? super T, A, R> collector);// 可變匯聚,Collectors有封裝工具

​ 十、Optional<T> min(Comparator<? super T> comparator);// 封裝了reduce,使用本身的比較器找到最小的

​ 十一、Optional<T> max(Comparator<? super T> comparator);// 封裝了reduce,使用本身的比較器找到最大的

​ 十二、long count();// 封裝了reduce,把每一個數變成1再求和

二、短路操做(short-circuiting)

​ 一、boolean anyMatch(Predicate<? super T> predicate);// 有一個符合判斷

​ 二、boolean noneMatch(Predicate<? super T> predicate);// 沒有一個符合判斷

​ 三、Optional<T> findFirst();// 有序的,找到第一個

​ 四、Optional<T> findAny();// 不要求有序的,找到一個

​ 五、boolean (Predicate<? super T> predicate);// 所有符合判斷

二、Stream流水線解決方案

一、ReferencePipeLine

​ Stream中使用Stage的概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將具備前後順序的各個Stage連到一塊兒,就構成了整個流水線。跟Stream相關類和接口的繼承關係圖示。

​ 下面是源碼,Head表示Source stage,例如Collection.stream(),這裏面沒有對數據的操做,StatelessOp和StatefuleOp分別對應無狀態和有狀態的中間操做

/**
     * Source stage of a ReferencePipeline.
     *
     * @param <E_IN> type of elements in the upstream source
     * @param <E_OUT> type of elements in produced by this stage
     * @since 1.8
     */
    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT>
    
    @Override
        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
            throw new UnsupportedOperationException();
        }
/**
     * Base class for a stateless intermediate stage of a Stream.
     *
     * @param <E_IN> type of elements in the upstream source
     * @param <E_OUT> type of elements in produced by this stage
     * @since 1.8
     */
    abstract static class StatelessOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT>
/**
     * Base class for a stateful intermediate stage of a Stream.
     *
     * @param <E_IN> type of elements in the upstream source
     * @param <E_OUT> type of elements in produced by this stage
     * @since 1.8
     */
    abstract static class StatefulOp<E_IN, E_OUT>
            extends ReferencePipeline<E_IN, E_OUT>

​ 經過以前就能夠看到的ReferencePipeline.Head<>生成第一個Source stage,緊接着調用一系列的中間操做,不斷產生新的Stream。這些Stream對象以雙向鏈表的形式組織在一塊兒,構成整個流水線,因爲每一個Stage都記錄了前一個Stage和本次的操做以及回調函數,依靠這種結構就能創建起對數據源的全部操做。這就是Stream記錄操做的方式。

二、Sink

​ 上面講到的是Stream流水線如何流水操做計算,可是如何組合就要看Sink這個接口了,該接口包含如下方法

方法名 做用
void begin(long size) 開始遍歷元素以前調用該方法,通知Sink作好準備。
void end() 全部元素遍歷完成以後調用,通知Sink沒有更多的元素了。
boolean cancellationRequested() 是否能夠結束操做,可讓短路操做盡早結束。
void accept(T t) 遍歷元素時調用,接受一個待處理元素,並對元素進行處理。Stage把本身包含的操做和回調方法封裝到該方法裏,前一個Stage只須要調用當前Stage.accept(T t)方法就好了。

​ 經過以上方法,各個Stage之間的調用就實現了,每一個Stage將本身的操做封裝到Sink中,而後只須要訪問下一個Stage的accept方法便可。

​ 下面分別舉無狀態中間操做和有狀態中間操做對這幾個方法的實現。

​ 首先是map的實現

// Stream.map()
    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override/*opWripSink()方法返回由回調函數包裝而成Sink*/
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));// 將數據處理而且交給下游
                    }
                };
            }
        };
    }

​ sorted的實現

/**
     * Stream.sort()
     * {@link Sink} for implementing sort on reference streams.
     */
    private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
        private ArrayList<T> list;

        RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }

        @Override
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE)
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            // 建立一個存放排序元素的列表
            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
        }

        @Override
        public void end() {
            list.sort(comparator);// 只有元素所有接收以後才能開始排序
            downstream.begin(list.size());
            if (!cancellationWasRequested) {// 下游Sink不包含短路操做
                list.forEach(downstream::accept);// 將處理結果傳遞給流水線下游的Sink
            }
            else {// 下游Sink包含短路操做
                for (T t : list) {// 每次都調用cancellationRequested()詢問是否能夠結束處理。
                    if (downstream.cancellationRequested()) break;
                    downstream.accept(t);// 將處理結果傳遞給流水線下游的Sink
                }
            }
            downstream.end();
            list = null;
        }

        @Override
        public void accept(T t) {
            list.add(t);// 使用當前Sink包裝動做處理t,只是簡單的將元素添加到中間列表當中
        }
    }

​ 上述代碼完美的展示了Sink的四個接口方法是如何協同工做的:

​ 一、首先begin()方法告訴Sink參與排序的元素個數,方便肯定中間結果容器的的大小;

​ 二、以後經過accept()方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷全部元素;

​ 三、最後end()方法告訴Sink全部元素遍歷完畢,啓動排序步驟,排序完成後將結果傳遞給下游的Sink;

​ 四、若是下游的Sink是短路操做,將結果傳遞給下游時不斷詢問下游cancellationRequested()是否能夠結束處理。

三、整合與執行

​ Sink完美封裝了Stream每一步操做,並給出了[處理->轉發]的模式來疊加操做。那麼整個操做的啓動動力就是結束操做(Terminal Operation),一旦調用某個結束操做,就會觸發整個流水線的執行。

​ 結束操做會建立一個包裝了本身操做的Sink,這也是流水線中最後一個Sink,這個Sink只須要處理數據而不須要將結果傳遞給下游的Sink(由於沒有下游)。對於Sink的[處理->轉發]模型,結束操做的Sink就是調用鏈的出口。

​ Stream設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來獲得Sink,該方法的做用是返回一個新的包含了當前Stage表明的操做以及可以將結果傳遞給downstream的Sink對象。使用opWrapSink()將當前操做與下游Sink(上文中的downstream參數)結合成新Sink。

​ 從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括source stage,由於source stage表明數據源,不包含操做),就能夠獲得一個表明了流水線上全部操做的Sink,用代碼表示就是這樣:

// AbstractPipeline.wrapSink()
// 從下游向上遊不斷包裝Sink。若是最初傳入的sink表明結束操做,
// 函數返回時就能夠獲得一個表明了流水線上全部操做的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

​ 流水線上從開始到結束的全部的操做都被包裝到了一個Sink裏,執行這個Sink就至關於執行整個流水線,執行Sink的代碼以下:

// AbstractPipeline.copyInto(), 對spliterator表明的數據執行wrappedSink表明的操做。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍歷結束
    }
    ...
}

​ 上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,而後調用spliterator.forEachRemaining()方法對數據進行迭代,最後調用wrappedSink.end()方法通知Sink數據處理結束。

三、Stream並行實現原理

一、執行結束操做的默認實現,以findFirst爲例

@Override
    public final Optional<P_OUT> findFirst() {
        return evaluate(FindOps.makeRef(true));
    }

​ 並行的實如今AbstractPipeline的evaluate中的evaluateParallel中

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
![](http://images2015.cnblogs.com/blog/1116549/201707/1116549-20170724151158934-1122314196.png)
@Override
        public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
                                         Spliterator<P_IN> spliterator) {
            return new FindTask<>(this, helper, spliterator).invoke();
        }
private static final class FindTask<P_IN, P_OUT, O>
            extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
        private final FindOp<P_OUT, O> op;

        FindTask(FindOp<P_OUT, O> op,
                 PipelineHelper<P_OUT> helper,
                 Spliterator<P_IN> spliterator) {
            super(helper, spliterator);
            this.op = op;
        }

​ 這裏一FindOps的實現爲例,這四個操做都是建立一個Task的示例,而後執行invoke方法。這些Task的繼承關係如圖:

​ 這裏能夠看出Stream的並行實現都繼承了jdk7中的ForkJoin並行框架的ForkJoinTask。

​ 下面是AbstractShortCircuitTask的並行計算方法

@Override
    public void compute() {
        Spliterator<P_IN> rs = spliterator, ls;
        long sizeEstimate = rs.estimateSize();
        long sizeThreshold = getTargetSize(sizeEstimate);
        boolean forkRight = false;
        @SuppressWarnings("unchecked") K task = (K) this;
        AtomicReference<R> sr = sharedResult;
        R result;
        while ((result = sr.get()) == null) {
            if (task.taskCanceled()) {
                result = task.getEmptyResult();
                break;
            }
            if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
                result = task.doLeaf();
                break;
            }
            K leftChild, rightChild, taskToFork;
            task.leftChild  = leftChild = task.makeChild(ls);
            task.rightChild = rightChild = task.makeChild(rs);
            task.setPendingCount(1);
            if (forkRight) {
                forkRight = false;
                rs = ls;
                task = leftChild;
                taskToFork = rightChild;
            }
            else {
                forkRight = true;
                task = rightChild;
                taskToFork = leftChild;
            }
            taskToFork.fork();
            sizeEstimate = rs.estimateSize();
        }
        task.setLocalResult(result);
        task.tryComplete();
    }

​ 這裏面的主要邏輯是

​ 一、經過estimateSize方法預估工做量總數

​ 二、經過getTargetSize(sizeEstimate)得到最後能把工做量分紅多少分,具體以下圖,得到當前可用核數減1,而後乘以4,再用預估工做量總數除以該數獲得目標工做量

protected final long getTargetSize(long sizeEstimate) {
        long s;
        return ((s = targetSize) != 0 ? s :
                (targetSize = suggestTargetSize(sizeEstimate)));
    }
public static long suggestTargetSize(long sizeEstimate) {
        long est = sizeEstimate / LEAF_TARGET;
        return est > 0L ? est : 1L;
    }

static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
this.config = (parallelism & SMASK) | mode;
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;

​ 三、在將工做所有fork開以前不斷循環將當前任務分爲左右子任務

​ 四、退出循環後嘗試結束,調用子類實現的doLeaf方法,完成最小計算單元的計算任務,而後設置到當前任務的localResult中

​ 五、再使用tryComplete方法進行最終任務的掃尾工做,若是該任務pending 值不等於0,則原子的減1,若是已經等於0,說明任務都已經完成,則調用onCompletion 回調,若是該任務是葉子任務,則直接銷燬中間數據結束;若是是中間節點會將左右子節點的結果進行合併

​ 六、檢查若是這個任務已經沒有父級任務了,則將該任務置爲正常結束,若是還有則嘗試遞歸的去調用父級節點的onCompletion回調,逐級進行任務的合併

/**
     * If the pending count is nonzero, decrements the count;
     * otherwise invokes {@link #onCompletion(CountedCompleter)}
     * and then similarly tries to complete this task's completer,
     * if one exists, else marks this task as complete.
     */
    public final void tryComplete() {
        CountedCompleter<?> a = this, s = a;
        for (int c;;) {
            if ((c = a.pending) == 0) {
                a.onCompletion(s);
                if ((a = (s = a).completer) == null) {
                    s.quietlyComplete();
                    return;
                }
            }
            else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
                return;
        }
    }

總結:最後能夠看出Stream並行的實現本質上就是在ForkJoin上進行了一層封裝,將Stream不斷分解成更小的Spliterator進行計算。

二、本身控制線程數

ForkJoinPool pool = new ForkJoinPool(2);
        Long result = pool.submit(() -> LongStream.range(1, 10)
                                  .parallel()
                                  .map(x -> x + 1)
                                  .filter(x -> x < 5)
                                  .reduce((x,y) -> x+y)
                                  .getAsLong()
                                 ).get();
System.out.println(result);

四、測試,Benchmark

一、JAVA版本

​ java version "1.8.0_131"

二、機器的配置

​ 處理器:1.4GHz Intel Core i5

​ 內存:8GB 1600 MHz DDR3

三、Benchmark配置

​ 2 個JVM、5 次預熱迭代和 5 次測量迭代來測試平均時間

四、檢驗的方法:

普通for循環

public int forMaxInteger() {
        int max = Integer.MIN_VALUE;
        for (int i = 0; i < size; i++) {
            max = Math.max(max, integers.get(i));
        }
        return max;
    }

簡寫for循環(forEach)

public int forMaxSimpleInteger() {
        int max = Integer.MIN_VALUE;
        for (Integer n : integers) {
            max = Math.max(max, n);
        }
        return max;
    }

iterator循環

public int iteratorMaxInteger() {
        int max = Integer.MIN_VALUE;
        for (Iterator<Integer> it = integers.iterator(); it.hasNext(); ) {
            max = Math.max(max, it.next());
        }
        return max;
    }

forEach+Lambda

public int forEachLambdaMaxInteger() {
    final Wrapper wrapper = new Wrapper();
    wrapper.inner = Integer.MIN_VALUE;
    integers.forEach(i -> helper(i, wrapper));
    return wrapper.inner.intValue();
}

public static class Wrapper {
     public Integer inner;
}

private int helper(int i, Wrapper wrapper) {
    wrapper.inner = Math.max(i, wrapper.inner);
    return wrapper.inner;
}

stream

public int streamMaxInteger() {
    OptionalInt max = integers.stream().mapToInt(i->i.intValue()).max();
    return max.getAsInt();
}

parallelStream

public int parallelStreamMaxInteger() {
    OptionalInt max = integers.parallelStream().mapToInt(i->i.intValue()).max();
    return max.getAsInt();
}

五、測試結果

一、找到最大的數

一、10萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark.forEachLambdaMaxInteger avgt 10 0.703 ± 0.024 ms/op
StreamBenchmark.forMaxSimpleInteger avgt 10 0.148 ± 0.003 ms/op
StreamBenchmark.forMaxInteger avgt 10 0.135 ± 0.005 ms/op
StreamBenchmark.iteratorMaxInteger avgt 10 0.150 ± 0.003 ms/op
StreamBenchmark.parallelStreamMaxInteger avgt 10 0.186 ± 0.002 ms/op
StreamBenchmark.streamMaxInteger avgt 10 0.353 ± 0.006 ms/op
二、100萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark.forEachLambdaMaxInteger avgt 10 8.870 ± 0.094 ms/op
StreamBenchmark.forMaxSimpleInteger avgt 10 1.870 ± 0.036 ms/op
StreamBenchmark.forMaxInteger avgt 10 1.979 ± 0.034 ms/op
StreamBenchmark.iteratorMaxInteger avgt 10 1.890 ± 0.065 ms/op
StreamBenchmark.parallelStreamMaxInteger avgt 10 1.895 ± 0.297 ms/op
StreamBenchmark.streamMaxInteger avgt 10 3.738 ± 0.087 ms/op
三、1000萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark.forEachLambdaMaxInteger avgt 10 84.733 ± 1.989 ms/op
StreamBenchmark.forMaxSimpleInteger avgt 10 20.063 ± 0.559 ms/op
StreamBenchmark.forMaxInteger avgt 10 21.301 ± 0.712 ms/op
StreamBenchmark.iteratorMaxInteger avgt 10 19.838 ± 0.585 ms/op
StreamBenchmark.parallelStreamMaxInteger avgt 10 15.481 ± 0.677 ms/op
StreamBenchmark.streamMaxInteger avgt 10 37.521 ± 0.689 ms/op

二、累加

一、10萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark.forEachLambdaMaxInteger avgt 10 0.840 ± 0.275 ms/op
StreamBenchmark.forMaxInteger avgt 10 0.139 ± 0.012 ms/op
StreamBenchmark.forMaxSimpleInteger avgt 10 0.177 ± 0.017 ms/op
StreamBenchmark.iteratorMaxInteger avgt 10 0.172 ± 0.012 ms/op
StreamBenchmark.parallelStreamMaxInteger avgt 10 0.126 ± 0.080 ms/op
StreamBenchmark.streamMaxInteger avgt 10 0.715 ± 0.008 ms/op
二、100萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark.forEachLambdaMaxInteger avgt 10 11.053 ± 1.540 ms/op
StreamBenchmark.forMaxInteger avgt 10 1.882 ± 0.194 ms/op
StreamBenchmark.forMaxSimpleInteger avgt 10 1.860 ± 0.038 ms/op
StreamBenchmark.iteratorMaxInteger avgt 10 1.892 ± 0.039 ms/op
StreamBenchmark.parallelStreamMaxInteger avgt 10 2.407 ± 1.767 ms/op
StreamBenchmark.streamMaxInteger avgt 10 2.868 ± 3.522 ms/op
三、1000萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark.forEachLambdaMaxInteger avgt 10 91.385 ± 14.993 ms/op
StreamBenchmark.forMaxInteger avgt 10 13.175 ± 0.188 ms/op
StreamBenchmark.forMaxSimpleInteger avgt 10 16.285 ± 0.466 ms/op
StreamBenchmark.iteratorMaxInteger avgt 10 15.999 ± 0.244 ms/op
StreamBenchmark.parallelStreamMaxInteger avgt 10 29.746 ± 0.251 ms/op
StreamBenchmark.streamMaxInteger avgt 10 10.196 ± 0.194 ms/op

三、模擬數據處理

一、10萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark3.forEachLambdaTrans avgt 10 3.100 ± 0.169 ms/op
StreamBenchmark3.forTrans avgt 10 2.863 ± 0.204 ms/op
StreamBenchmark3.forTransSimple avgt 10 2.667 ± 0.126 ms/op
StreamBenchmark3.iteratorTrans avgt 10 2.648 ± 0.140 ms/op
StreamBenchmark3.parallelStreamTrans avgt 10 2.118 ± 0.058 ms/op
StreamBenchmark3.streamTrans avgt 10 2.896 ± 0.049 ms/op
二、100萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark3.forEachLambdaTrans avgt 10 37.726 ± 7.249 ms/op
StreamBenchmark3.forTrans avgt 10 31.871 ± 1.802 ms/op
StreamBenchmark3.forTransSimple avgt 10 30.901 ± 1.921 ms/op
StreamBenchmark3.iteratorTrans avgt 10 31.023 ± 0.876 ms/op
StreamBenchmark3.parallelStreamTrans avgt 10 25.768 ± 0.985 ms/op
StreamBenchmark3.streamTrans avgt 10 31.347 ± 1.519 ms/op
三、1000萬數據
Benchmark Mode Cnt Score Error Units
StreamBenchmark3.forEachLambdaTrans avgt 10 3171.935 ± 195.461 ms/op
StreamBenchmark3.forTrans avgt 10 3088.757 ± 373.870 ms/op
StreamBenchmark3.forTransSimple avgt 10 3073.300 ± 224.154 ms/op
StreamBenchmark3.iteratorTrans avgt 10 3083.537 ± 223.975 ms/op
StreamBenchmark3.parallelStreamTrans avgt 10 3347.938 ± 485.051 ms/op
StreamBenchmark3.streamTrans avgt 10 3203.518 ± 357.881 ms/op

四、影響並行流的主要五個因素

一、數據大小

​ 輸入數據的大小會影響並行化處理,當只有足夠大、每一個數據處理管道花費的時間足夠多時,並行化纔有意義

二、源數據結構

​ 通常都是基於集合進行並行化

三、裝箱

​ 處理基本類型比處理裝箱類型要快

四、核的數量

​ 只有在多核的機器上使用纔有意義,而且是運行時可以使用的多核

五、單元處理開銷

​ 原先處理耗時較長,使用並行化纔有意義

五、資料:

一、http://ifeve.com/stream

二、http://lvheyang.com/?p=87

三、http://www.cnblogs.com/CarpenterLee/p/6637118.html


轉載請標明出處:http://www.cnblogs.com/MoEee/p/6490573.html

相關文章
相關標籤/搜索