Java 8 的 Stream 使得代碼更加簡潔易懂,本篇文章深刻分析 Java Stream 的工做原理,並探討 Steam 的性能問題。java
Java 8 集合中的 Stream 至關於高級版的 Iterator,它能夠經過 Lambda 表達式對集合進行各類很是便利、高效的聚合操做(Aggregate Operation),或者大批量數據操做 (Bulk Data Operation)。git
Stream的聚合操做與數據庫SQL的聚合操做sorted、filter、map等相似。咱們在應用層就能夠高效地實現相似數據庫SQL的聚合操做了,而在數據操做方面,Stream不只能夠經過串行的方式實現數據操做,還能夠經過並行的方式處理大批量數據,提升數據的處理效率。github
官方將 Stream 中的操做分爲兩大類:數據庫
中間操做(Intermediate operations)
,只對操做進行了記錄,即只會返回一個流,不會進行計算操做。終結操做(Terminal operations)
,實現了計算操做。中間操做又能夠分爲:數組
無狀態(Stateless)操做
,元素的處理不受以前元素的影響。有狀態(Stateful)操做
,指該操做只有拿到全部元素以後才能繼續下去。終結操做又能夠分爲:安全
短路(Short-circuiting)
操做,指遇到某些符合條件的元素就能夠獲得最終結果非短路(Unshort-circuiting)
操做,指必須處理完全部元素才能獲得最終結果。操做分類詳情以下圖所示:服務器
Stream 相關類和接口的繼承關係以下圖所示:app
最頂端的接口類,定義了流的基本接口方法,最主要的方法爲 spliterator、isParallel。框架
最頂端的接口類。定義了流的經常使用方法,例如 map、filter、sorted、limit、skip、collect 等。less
ReferencePipeline 是一個結構類,定義內部類組裝了各類操做流,定義了Head
、StatelessOp
、StatefulOp
三個內部類,實現了 BaseStream 與 Stream 的接口方法。
Sink 接口定義了 Stream 之間的操做行爲,包含 begin()
、end()
、cancellationRequested()
、accpt()
四個方法。ReferencePipeline最終會將整個 Stream 流操做組裝成一個調用鏈,而這條調用鏈上的各個 Stream 操做的上下關係就是經過 Sink 接口協議來定義實現的。
Stream 的基礎用法就再也不敘述了,這裏從一段代碼開始,分析 Stream 的工做原理。
@Test public void testStream() { List<String> names = Arrays.asList("kotlin", "java", "go"); int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length) .max(Comparator.naturalOrder()).orElse(-1); System.out.println(maxLength); }
當使用 Stream 時,主要有 3 部分組成,下面一一講解。
調用 names.stream()
方法,會初次加載 ReferencePipeline 的 Head 對象,此時爲加載數據源操做。
java.util.Collection#stream
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
StreamSupport 類中的 stream 方法,初始化了一個 ReferencePipeline的 Head 內部類對象。
java.util.stream.StreamSupport#stream(java.util.Spliterator
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
接着爲 filter(name -> name.length() <= 4).mapToInt(String::length)
,是中間操做,分爲無狀態中間操做 StatelessOp 對象和有狀態操做 StatefulOp 對象,此時的 Stage 並無執行,而是經過AbstractPipeline 生成了一箇中間操做 Stage 鏈表。
java.util.stream.ReferencePipeline#filter
@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
java.util.stream.ReferencePipeline#map
@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
能夠看到 filter 和 map 方法都返回了一個新的 StatelessOp
對象。new StatelessOp 將會調用父類 AbstractPipeline 的構造函數,這個構造函數將先後的 Stage 聯繫起來,生成一個 Stage 鏈表:
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
最後爲 max(Comparator.naturalOrder())
,是終結操做,會生成一個最終的 Stage,經過這個 Stage 觸發以前的中間操做,從最後一個Stage開始,遞歸產生一個Sink鏈。
java.util.stream.ReferencePipeline#max
@Override public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { return reduce(BinaryOperator.maxBy(comparator)); }
最終調用到 java.util.stream.AbstractPipeline#wrapSink,這個方法會調用 opWrapSink 生成一個 Sink 鏈表,對應到本文的例子,就是 filter 和 map 操做。
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
在上面 opWrapSink 上斷點調試,發現最終會調用到本例中的 filter 和 map 操做。
wrapAndCopyInto 生成 Sink 鏈表後,會經過 copyInfo 方法執行 Sink 鏈表的具體操做。
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
上面的核心代碼是:
spliterator.forEachRemaining(wrappedSink);
java.util.Spliterators.ArraySpliterator#forEachRemaining
@Override public void forEachRemaining(Consumer<? super T> action) { Object[] a; int i, hi; // hoist accesses and checks from loop if (action == null) throw new NullPointerException(); if ((a = array).length >= (hi = fence) && (i = index) >= 0 && i < (index = hi)) { do { action.accept((T)a[i]); } while (++i < hi); } }
斷點調試,能夠發現首先進入了 filter 的 Sink,其中 accept 方法的入參是 list 中的第一個元素「kotlin」(代碼中的 3 個元素是:"kotlin", "java", "go")。filter 的傳入是一個 Lambda 表達式:
filter(name -> name.length() <= 4)
顯然這個第一個元素「kotlin」的 predicate 是不會進入的。
對於第二個元素「java」,predicate.test 會返回 true(字符串「java」的長度<=4),則會進入 map 的 accept 方法。
本次調用 accept 方法時,empty 爲 false,會將 map 後的結果(int 類型的 4)賦值給 t。
public static <T> TerminalOp<T, Optional<T>> makeRef(BinaryOperator<T> operator) { Objects.requireNonNull(operator); class ReducingSink implements AccumulatingSink<T, Optional<T>, ReducingSink> { private boolean empty; private T state; public void begin(long size) { empty = true; state = null; } @Override public void accept(T t) { if (empty) { empty = false; state = t; } else { state = operator.apply(state, t); } } …… } }
對於第三個元素「go」,也會進入 accept 方法,此時 empty 爲 true, map 後的結果(int 類型的 2)會與上次的結果 4 經過自定義的比較器相比較,存入符合結果的值。
public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) { Objects.requireNonNull(comparator); return (a, b) -> comparator.compare(a, b) >= 0 ? a : b; }
本文代碼中的 max 傳入的比較器爲:
max(Comparator.naturalOrder())
至此會返回 int 類型的 4。
上面的例子是串行處理的,若是要改爲並行也很簡單,只須要在 stream() 方法後加上 parallel()
就能夠了,並行代碼能夠寫成:
@Test public void testStream() { List<String> names = Arrays.asList("kotlin", "java", "go"); int maxLength = names.stream().parallel().filter(name -> name.length() <= 4) .map(String::length).max(Comparator.naturalOrder()).orElse(-1); System.out.println(maxLength); }
Stream 的並行處理在執行終結操做以前,跟串行處理的實現是同樣的。而在調用終結方法以後,實現的方式就有點不太同樣,會調用 TerminalOp 的 evaluateParallel 方法進行並行處理。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
核心是使用了 ForkJoin 框架,對 Stream 處理進行分片,最終會調用下面的代碼,這裏就不展開分析了。
java.util.stream.AbstractTask#compute
@Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(task.doLeaf()); task.tryComplete(); }
@Test public void testParallelWrong() { List<Integer> parallelList = new ArrayList<>(); IntStream.range(0, 1000).boxed().parallel().filter(i -> i % 2 == 1) .forEach(parallelList::add); System.out.println(parallelList.size()); }
上面的輸出結果會常常小於500,這是由於 parallelList 的類型是 ArrayList,並非線程安全的,在執行 add 操做時,可能正好遇上擴容或者線程被佔用,會覆蓋其餘線程的賦好的值。
@Test public void testParallelRight() { List<Integer> parallelList = IntStream.range(0, 1000).boxed().parallel() .filter(i -> i % 2 == 1).collect(Collectors.toList()); System.out.println(parallelList.size()); }
下面的文章參考自:JavaLambdaInternals/8-Stream Performance.md,侵刪。
爲保證測試結果然實可信,咱們將JVM運行在-server
模式下,測試數據在GB量級,測試機器採用常見的商用服務器,配置以下:
OS | CentOS 6.7 x86_64 |
CPU | Intel Xeon X5675, 12M Cache 3.06 GHz, 6 Cores 12 Threads |
內存 | 96GB |
JDK | java version 1.8.0_91, Java HotSpot(TM) 64-Bit Server VM |
性能測試並非容易的事,Java性能測試更費勁,由於虛擬機對性能的影響很大,JVM對性能的影響有兩方面:
-XX:+UseConcMarkSweepGC -Xms10G -Xmx10G
-XX:CompileThreshold=10000
。Stream並行執行時用到ForkJoinPool.commonPool()
獲得的線程池,爲控制並行度咱們使用Linux的taskset
命令指定JVM可用的核數。
測試數據由程序隨機生成。爲防止一次測試帶來的抖動,測試4次求出平均時間做爲運行時間。
測試內容:找出整型數組中的最小值。對比for循環外部迭代和Stream API內部迭代性能。
測試程序IntTest,測試結果以下圖:
圖中展現的是for循環外部迭代耗時爲基準的時間比值。分析以下:
並行迭代性能跟可利用的核數有關,上圖中的並行迭代使用了所有12個核,爲考察使用核數對性能的影響,咱們專門測試了不一樣核數下的Stream並行迭代效果:
分析,對於基本類型:
以上兩個測試說明,對於基本類型的簡單迭代,Stream串行迭代性能更差,但多核狀況下Stream迭代時性能較好。
再來看對象的迭代效果。
測試內容:找出字符串列表中最小的元素(天然順序),對比for循環外部迭代和Stream API內部迭代性能。
測試程序StringTest,測試結果以下圖:
結果分析以下:
再來單獨考察Stream並行迭代效果:
分析,對於對象類型:
以上兩個測試說明,對於對象類型的簡單迭代,Stream串行迭代性能更差,但多核狀況下Stream迭代時性能較好。
從實驗1、二的結果來看,Stream串行執行的效果都比外部迭代差(不少),是否是說明Stream真的不行了?先別下結論,咱們再來考察一下更復雜的操做。
測試內容:給定訂單列表,統計每一個用戶的總交易額。對比使用外部迭代手動實現和Stream API之間的性能。
咱們將訂單簡化爲<userName, price, timeStamp>
構成的元組,並用Order
對象來表示。測試程序ReductionTest,測試結果以下圖:
分析,對於複雜的歸約操做:
再來考察並行度對並行效果的影響,測試結果以下:
分析,對於複雜的歸約操做:
以上兩個實驗說明,對於複雜的歸約操做,Stream串行歸約效果好於手動歸約,在多核狀況下,並行歸約效果更佳。咱們有理由相信,對於其餘複雜的操做,Stream API也能表現出類似的性能表現。
上述三個實驗的結果能夠總結以下:
因此,若是出於性能考慮,1. 對於簡單操做推薦使用外部迭代手動實現,2. 對於複雜操做,推薦使用Stream API, 3. 在多核狀況下,推薦使用並行Stream API來發揮多核優點,4.單核狀況下不建議使用並行Stream API。
coding 筆記、點滴記錄,之後的文章也會同步到公衆號(Coding Insight)中,但願你們關注_
代碼和思惟導圖在 GitHub 項目中,歡迎你們 star!