Java8 Stream詳解 & 原理深度解析

Java8中提供了Stream對集合操做做出了極大的簡化,學習了Stream以後,咱們之後不用使用for循環就能對集合做出很好的操做。

1、流的初始化與轉換:
  Java中的Stream的全部操做都是針對流的,因此,使用Stream必需要獲得Stream對象:
  一、初始化一個流:
    Stream stream = Stream.of("a", "b", "c");
  二、數組轉換爲一個流:
    String [] strArray = new String[] {"a", "b", "c"};
    stream = Stream.of(strArray);
    或者
    stream = Arrays.stream(strArray);
  三、集合對象轉換爲一個流(Collections):
    List<String> list = Arrays.asList(strArray);
    stream = list.stream();

2、流的操做:html

流的操做能夠歸結爲幾種:java

 

一、遍歷操做(map):編程

使用map操做能夠遍歷集合中的每一個對象,並對其進行操做,map以後,用.collect(Collectors.toList())會獲得操做後的集合。數組

 

1.一、遍歷轉換爲大寫:
List<String> output = wordList.stream().
     map(String::toUpperCase).數據結構

           collect(Collectors.toList());app

 

1.二、平方數:
List<Integer> nums = Arrays.asList(1, 2, 3, 4);
   List<Integer> squareNums = nums.stream().
         map(n -> n * n).
       collect(Collectors.toList());框架

 

二、過濾操做(filter):less

使用filter能夠對象Stream中進行過濾,經過測試的元素將會留下來生成一個新的Stream。ide

 

2.一、獲得其中不爲空的String
List<String> filterLists = new ArrayList<>();
filterLists.add("");
filterLists.add("a");
filterLists.add("b");
List afterFilterLists = filterLists.stream()
       .filter(s -> !s.isEmpty())函數式編程

        .collect(Collectors.toList());

 

三、循環操做(forEach):
若是隻是想對流中的每一個對象進行一些自定義的操做,可使用forEach:
List<String> forEachLists = new ArrayList<>();
forEachLists.add("a");
forEachLists.add("b");
forEachLists.add("c");

forEachLists.stream().forEach(s-> System.out.println(s));

 

四、返回特定的結果集合(limit/skip):
limit 返回 Stream 的前面 n 個元素;skip 則是扔掉前 n 個元素:
List<String> forEachLists = new ArrayList<>();
forEachLists.add("a");
forEachLists.add("b");
forEachLists.add("c");
forEachLists.add("d");
forEachLists.add("e");
forEachLists.add("f");
List<String> limitLists = forEachLists.stream().skip(2).limit(3).collect(Collectors.toList());

注意skip與limit是有順序關係的,好比使用skip(2)會跳過集合的前兩個,返回的爲c、d、e、f,而後調用limit(3)會返回前3個,因此最後返回的c,d,e

 

五、排序(sort/min/max/distinct):
sort能夠對集合中的全部元素進行排序。max,min能夠尋找出流中最大或者最小的元素,而distinct能夠尋找出不重複的元素:

5.一、對一個集合進行排序:
List<Integer> sortLists = new ArrayList<>();
sortLists.add(1);
sortLists.add(4);
sortLists.add(6);
sortLists.add(3);
sortLists.add(2);
List<Integer> afterSortLists = sortLists.stream().sorted((In1,In2)->

       In1-In2).collect(Collectors.toList());

 

5.二、獲得其中長度最大的元素:
List<String> maxLists = new ArrayList<>();
maxLists.add("a");
maxLists.add("b");
maxLists.add("c");
maxLists.add("d");
maxLists.add("e");
maxLists.add("f");
maxLists.add("hahaha");
int maxLength = maxLists.stream().mapToInt(s->s.length()).max().getAsInt();

System.out.println("字符串長度最長的長度爲"+maxLength);

 

5.三、對一個集合進行查重:
List<String> distinctList = new ArrayList<>();
distinctList.add("a");
distinctList.add("a");
distinctList.add("c");
distinctList.add("d");
List<String> afterDistinctList = distinctList.stream().distinct().collect(Collectors.toList());
其中的distinct()方法能找出stream中元素equal(),即相同的元素,並將相同的去除,上述返回即爲a,c,d。

六、匹配(Match方法):
有的時候,咱們只須要判斷集合中是否所有知足條件,或者判斷集合中是否有知足條件的元素,這時候就可使用match方法:
allMatch:Stream 中所有元素符合傳入的 predicate,返回 true
anyMatch:Stream 中只要有一個元素符合傳入的 predicate,返回 true

noneMatch:Stream 中沒有一個元素符合傳入的 predicate,返回 true

 

6.一、判斷集合中沒有有爲‘c’的元素:
List<String> matchList = new ArrayList<>();
matchList.add("a");
matchList.add("a");
matchList.add("c");
matchList.add("d");

boolean isExits = matchList.stream().anyMatch(s -> s.equals("c"));

 

6.二、判斷集合中是否全不爲空:
List<String> matchList = new ArrayList<>();
matchList.add("a");
matchList.add("");
matchList.add("a");
matchList.add("c");
matchList.add("d");
boolean isNotEmpty = matchList.stream().noneMatch(s -> s.isEmpty());

則返回的爲false

轉自:https://blog.csdn.net/happyheng/article/details/52832313

 

 

在這篇文章中,將會對流的實現原理進行深度,解析,具體關於如何使用,請參考《Java8函數式編程》。

經常使用的流操做

在深刻原理以前,咱們有必要知道關於Stream的一些基礎知識,關於Stream的操做分類,如表1-1所示。

表1-1 Stream的經常使用操做分類(表格引自這裏)

表1-1

如表1-1中所示,Stream中的操做能夠分爲兩大類:中間操做與結束操做,中間操做只是對操做進行了記錄,只有結束操做纔會觸發實際的計算(即惰性求值),這也是Stream在迭代大集合時高效的緣由之一。中間操做又能夠分爲無狀態(Stateless)操做與有狀態(Stateful)操做,前者是指元素的處理不受以前元素的影響;後者是指該操做只有拿到全部元素以後才能繼續下去。結束操做又能夠分爲短路與非短路操做,這個應該很好理解,前者是指遇到某些符合條件的元素就能夠獲得最終結果;然後者是指必須處理全部元素才能獲得最終結果。

原理探祕

在探究Stream的執行原理以前,咱們先看以下兩段代碼(本文將以code_1爲例進行說明):

code_1

public static void main(String[] args) { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = list.stream() //.parallel() .filter(e -> e.length() >= 3) .map(e -> e.charAt(0)) //.peek(System.out :: println) //.sorted() //.peek(e -> System.out.println("++++" + e)) .map(e -> String.valueOf(e)) .collect(Collectors.toList()); System.out.println("----------------------------"); System.out.println(result); }

code_2

public void targetMethod() { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = Lists.newArrayListWithCapacity(list.size()); for (String str : list) { if (str.length() >= 3) { char e = str.charAt(0); String tempStr = String.valueOf(e); result.add(tempStr); } } System.out.println("----------------------------"); System.out.println(result); }

很明顯,在最終結果上而言,code_1與code_2是等價的。那麼,Stream是怎麼作的呢?顯然不是每次操做都進行迭代,由於這對於執行時間與存儲中間變量來講都將是噩夢。

要解決的問題

顯然,若是code_2只對集合迭代了一次,也就是說至關高效。那麼這麼作有沒有弊端?有!模板代碼、中間變量、不利於並行都是其存在的問題。可是按着code_2的思路能夠知道有如下幾個問題須要解決:

  • 如何記錄每次操做?
  • 操做如何疊加?
  • 疊加後的操做如何執行?
  • 最後的結果如何存儲?

包結構分析

那麼Stream是如何解決的呢?所謂源碼之下,無所遁形。那麼,首先來看一下Stream包的結構(如圖1-1所示)。

圖1-1

圖1-1 Stream包的結構示意圖

其中各個部分的主要功能爲:

  1. 主要是各類操做的工廠類、數據的存儲結構以及收集器的工廠類等;
  2. 主要用於Stream的惰性求值實現;
  3. Stream的並行計算框架;
  4. 存儲並行流的中間結果;
  5. 終結操做的定義;

咱們單獨把第二部分拎出來用於說明Stream的惰性求值實現,如圖1-2所示,Java8針對Int、long、double進行了優化,主要用於頻繁的拆裝箱。咱們以引用類型進行介紹,在圖中已經標爲綠色。

  • BaseStream規定了流的基本接口,好比iterator、spliterator、isParallel等;
  • Stream中定義了map、filter、flatmap等用戶關注的經常使用操做;
  • PipelineHelper主要用於Stream執行過程當中相關結構的構建;
  • Head、StatelessOp、StatefulOp爲ReferencePipeline中的內部類。

圖1-2

圖1-2

操做如何記錄

關於操做如何記錄,在JDK源碼註釋中屢次用(操做)stage來標識用戶的每一次操做,而一般狀況下Stream的操做又須要一個回調函數,因此一個完整的操做是由數據來源、操做、回調函數組成的三元組來表示。而在具體實現中,使用實例化的ReferencePipeline來表示,即圖1-2中的Head、StatelessOp、StatefulOp的實例。

如code_三、code_4所示爲調用stream.map()的關鍵的兩個方法,在用戶
調用一系列操做後會造成如圖1-3所示的雙鏈表結構。

圖1-3

圖1-3

code_3(ReferencePipeline.map())

@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }

code_4(AbstractPipeline.AbstractPipeline())

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }

如何疊加

在上一步已經在stage中記錄了每一步操做,此時並無執行。可是stage只是保存了當前的操做,並不能肯定下一個stage須要何種操做,何種數據,其實JDK爲此定義了Sink接口,其中只有begin()、end()、cancellationRequested()、accept()四個接口(如表1-2所示,摘自這裏),其中中間操做的子類中包含一個指向下游sink的指針。

表1-2

表1-2

如今轉向code_3,能夠看出,在satge鏈中,每一步都包含了opWrapSink()。當調用終結操做時,將會觸發code_5從最後一個stage(終結操做產生的satge)開始,遞歸產生圖1-4所示的結構。

code_5(AbstractPipeline.wrapSink())

@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }

圖1-4

圖1-4

如何執行

全部的操做已經造成了圖1-4的結構,接下來就會觸發code_6,此時結果就會產生對應的結果啦!

code_6(AbstractPipelie.copyInto())

@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }

並行原理

那麼,Stream是如何並行執行的呢?其實產生stage鏈的過程和串行並無區別,只是在最終執行時進行了相應的調整,咱們將code_1改變爲code_7

code_7

public static void main(String[] args) { List<String> list = Lists.newArrayList( "bcd", "cde", "def", "abc"); List<String> result = list.stream() .parallel() .filter(e -> e.length() >= 3) //.map(e -> e.charAt(0)) //.peek(System.out :: println) .sorted() //.peek(e -> System.out.println("++++" + e)) .map(e -> String.valueOf(e)) .collect(Collectors.toList()); System.out.println("----------------------------"); System.out.println(result); }

那麼最終產生的stage鏈與sink的結構如圖1-5所示,由於此時stage鏈中有一個有狀態操做(sorted()),也就是說在這裏必須處理完全部元素才能進行下一步操做。那麼此時不管是並行仍是串行,此時都會產生兩個sink鏈,也就是表明了兩次迭代,才產生了最終結果。

圖1-5

圖1-5

那麼,到底是如何並行的呢?其實當調用collect操做時會調用code_8,其中的evaluateParallel()如code_9所示。

code_8(AbstractPipeline.evaluate())

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }

code_9(ReduceOp.evaluateParallel())

@Override public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new ReduceTask<>(this, helper, spliterator).invoke().get(); }

其實Stream的並行處理是基於ForkJoin框架的,相關類與接口的結構如圖1-6所示。其中AbstractShortCircuitTask用於處理短路操做,其餘相關操做相似,會產生對應的Task。

圖1-6

圖1-6

關於code_8中獲取源Spliterator,如code_10所示,

code_10(AbstractPipeline.sourceSpliterator())

@SuppressWarnings("unchecked") private Spliterator<?> sourceSpliterator(int terminalFlags) { Spliterator<?> spliterator = null; if (sourceStage.sourceSpliterator != null) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; } else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { throw new IllegalStateException(MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { //若是是並行流而且有stage包含stateful操做 //那麼就會依次遍歷stage,直到遇到stateful stage時 int depth = 1; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { //若是有短路操做,則去除相應標記 thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } //儘可能以惰性求值的方式進行操做 spliterator = p.opEvaluateParallelLazy(u, spliterator); thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0) { // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; }

如何並行執行

關於各個task就行是如何並行執行,其實最終調用的是code_11所示,對應的流程如圖1-7所示,其中交替fork子節點是爲了緩和數據分片不均形成的性能退化。

code_11(AbstractTask.compute())

@Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) { K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(task.doLeaf()); task.tryComplete(); }

圖1-7

圖1-7

影響並行流的因素

數據大小;源數據結構(分割越容易越好),arraylist、數組比較好,hashSet、treeSet次之,linked最差;裝箱;核的數量(可以使用);單元處理開銷(越大越好)

建議:

終結操做之外的操做,儘可能避免反作用,避免突變基於堆棧的引用,或者在執行過程當中進行任何I/O;傳遞給流操做的數據源應該是互不干擾(避免修改數據源)。

小結

本文主要探究了Stream的實現原理,並無涉及到具體的流操做的用法(讀者能夠參考《java8函數式編程》),而且給出了使用Stream的部分建議。

參考文章

深刻理解Java Stream流水線
Java 8 Stream探祕
java.util.stream 庫簡介

Dorae 轉載註明出處 http://www.cnblogs.com/Dorae/

相關文章
相關標籤/搜索