Stream使用這麼久,它是如何提升遍歷集合效率?

對於List 集合類,我想你們確定很瞭解了,那我想必定也知道集合的頂端接口 Collection。在 Java8 中,Collection 新增了兩個流方法,分別是 Stream() 和 parallelStream()數據庫


經過英文名不難猜想,這兩個方法確定和 Stream 有關,那進一步猜想,是否是和咱們熟悉的 InputStream 和 OutputStream 也有關係呢?集合類中新增的兩個 Stream 方法到底有什麼做用?框架


什麼是 Stream?less

在 Java8 以前,咱們一般是經過 for 循環或者 Iterator 迭代來從新排序合併數據,又或者經過從新定義 Collections.sorts 的 Comparator 方法來實現,這兩種方式對於大數據量系統來講,效率並非很理想。ide


Java8 中添加了一個新的接口類 Stream,他和咱們以前接觸的字節流概念不太同樣,Java8 集合中的 Stream 至關於高級版的 Iterator,他能夠經過 Lambda 表達式對集合進行各類很是便利、高效的聚合操做(Aggregate Operation),或者大批量數據操做 (Bulk Data Operation)。學習


Stream 的聚合操做與數據庫 SQL 的聚合操做 sorted、filter、map 等相似。咱們在應用層就能夠高效地實現相似數據庫 SQL 的聚合操做了,而在數據操做方面,Stream 不只能夠經過串行的方式實現數據操做,還能夠經過並行的方式處理大批量數據,提升數據的處理效率。大數據


接下來咱們就用一個簡單的例子來體驗下 Stream 的簡潔與強大。優化

這個 Demo 的需求是過濾分組一所中學裏身高在 160cm 以上的男女同窗,咱們先用傳統的迭代方式來實現,代碼以下:ui













Map<String, List<Student>> stuMap = new HashMap<String, List<Student>>();    for (Student stu: studentsList) {        if (stu.getHeight() > 160) { //若是身高大於160             if (stuMap.get(stu.getSex()) == null) { //該性別還沒分類                  List<Student> list = new ArrayList<Student>(); //新建該性別學生的列表                  list.add(stu);//將學生放進去列表                  stuMap.put(stu.getSex(), list);//將列表放到map中              } else { //該性別分類已存在                  stuMap.get(stu.getSex()).add(stu);//該性別分類已存在,則直接放進去便可              }          }     }



咱們再使用 Java8 中的 Stream API 進行實現:this

1. 串行實現lua


Map<String, List<Student>> stuMap = stuList.stream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));

2. 並行實現


Map<String, List<Student>> stuMap = stuList.parallelStream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));


經過上面兩個簡單的例子,咱們能夠發現,Stream 結合 Lambda 表達式實現遍歷篩選功能很是得簡潔和便捷。


Stream 如何優化遍歷?

上面咱們初步瞭解了 Java8 中的 Stream API,那 Stream 是如何作到優化迭代的呢?並行又是如何實現的?下面咱們就透過 Stream 源碼剖析 Stream 的實現原理。


1.Stream 操做分類


官方將 Stream 中的操做分爲兩大類:中間操做(Intermediate operations)和終結操做(Terminal operations)。中間操做只對操做進行了記錄,即只會返回一個流,不會進行計算操做,而終結操做是實現了計算操做。


終結操做又能夠分爲短路(Short-circuiting)與非短路(Unshort-circuiting)操做,前者是指遇到某些符合條件的元素就能夠獲得最終結果,後者是指必須處理完全部元素才能獲得最終結果。操做分類詳情以下圖所示:


咱們一般還會將中間操做稱爲懶操做,也正是由這種懶操做結合終結操做、數據源構成的處理管道(Pipeline),實現了 Stream 的高效。


2.Stream 源碼實現


在瞭解 Stream 如何工做以前,咱們先來了解下 Stream 包是由哪些主要結構類組合而成的,各個類的職責是什麼。參照下圖:

圖片


BaseStream 和 Stream 爲最頂端的接口類。BaseStream 主要定義了流的基本接口方法,例如,spliterator、isParallel 等;Stream 則定義了一些流的經常使用操做方法,例如,map、filter 等。


ReferencePipeline 是一個結構類,他經過定義內部類組裝了各類操做流。他定義了 Head、StatelessOp、StatefulOp 三個內部類,實現了 BaseStream 與 Stream 的接口方法。


Sink 接口是定義每一個 Stream 操做之間關係的協議,他包含 begin()、end()、cancellationRequested()、accpt() 四個方法。ReferencePipeline 最終會將整個 Stream 流操做組裝成一個調用鏈,而這條調用鏈上的各個 Stream 操做的上下關係就是經過 Sink 接口協議來定義實現的。


3.Stream 操做疊加


咱們知道,一個 Stream 的各個操做是由處理管道組裝,並統一完成數據處理的。在 JDK 中每次的中斷操做會以使用階段(Stage)命名。


管道結構一般是由 ReferencePipeline 類實現的,前面講解 Stream 包結構時,我提到過 ReferencePipeline 包含了 Head、StatelessOp、StatefulOp 三種內部類。


Head 類主要用來定義數據源操做,在咱們初次調用 names.stream() 方法時,會初次加載 Head 對象,此時爲加載數據源操做;接着加載的是中間操做,分別爲無狀態中間操做 StatelessOp 對象和有狀態操做 StatefulOp 對象,此時的 Stage 並無執行,而是經過 AbstractPipeline 生成了一箇中間操做 Stage 鏈表;當咱們調用終結操做時,會生成一個最終的 Stage,經過這個 Stage 觸發以前的中間操做,從最後一個 Stage 開始,遞歸產生一個 Sink 鏈。以下圖所示:









List<String> names = Arrays.asList("張三", "李四", "王老五", "李三", "劉老四", "王小二", "張四", "張五六七");
String maxLenStartWithZ = names.stream()                  .filter(name -> name.startsWith("張"))                  .mapToInt(String::length)                  .max()                  .toString();



這個例子的需求是查找出一個長度最長,而且以張爲姓氏的名字。從代碼角度來看,你可能會認爲是這樣的操做流程:首先遍歷一次集合,獲得以「張」開頭的全部名字;而後遍歷一次 filter 獲得的集合,將名字轉換成數字長度;最後再從長度集合中找到最長的那個名字而且返回。



4.Stream 並行處理


Stream 處理數據的方式有兩種,串行處理和並行處理。要實現並行處理,咱們只須要在例子的代碼中新增一個 Parallel() 方法,代碼以下所示:









List<String> names = Arrays.asList("張三", "李四", "王老五", "李三", "劉老四", "王小二", "張四", "張五六七");
String maxLenStartWithZ = names.stream()                    .parallel()                  .filter(name -> name.startsWith("張"))                  .mapToInt(String::length)                  .max()                  .toString();


Stream 的並行處理在執行終結操做以前,跟串行處理的實現是同樣的。而在調用終結方法以後,實現的方式就有點不太同樣,會調用 TerminalOp 的 evaluateParallel 方法進行並行處理。











final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
       assert getOutputShape() == terminalOp.inputShape();        if (linkedOrConsumed)            throw new IllegalStateException(MSG_STREAM_LINKED);        linkedOrConsumed = true;        return isParallel()               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));    }


這裏的並行處理指的是,Stream 結合了 ForkJoin 框架,對 Stream 處理進行了分片,Splititerator 中的 estimateSize 方法會估算出分片的數據量。


總結


縱觀 Stream 的設計實現,很是值得咱們學習。從大的設計方向上來講,Stream 將整個操做分解爲了鏈式結構,不只簡化了遍歷操做,還爲實現了並行計算打下了基礎


從小的分類方向上來講,Stream 將遍歷元素的操做和對元素的計算分爲中間操做和終結操做,而中間操做又根據元素之間狀態有無干擾分爲有狀態和無狀態操做,實現了鏈結構中的不一樣階段。


在串行處理操做中,Stream 在執行每一步中間操做時,並不會作實際的數據操做處理,而是將這些中間操做串聯起來,最終由終結操做觸發,生成一個數據處理鏈表,經過 Java8 中的 Spliterator 迭代器進行數據處理;此時,每執行一次迭代,就對全部的無狀態的中間操做進行數據處理,而對有狀態的中間操做,就須要迭代處理完全部的數據,再進行處理操做;最後就是進行終結操做的數據處理。


在並行處理操做中,Stream 對中間操做基本跟串行處理方式是同樣的,但在終結操做中,Stream 將結合 ForkJoin 框架對集合進行切片處理,ForkJoin 框架將每一個切片的處理結果 Join 合併起來。最後就是要注意 Stream 的使用場景。

相關文章
相關標籤/搜索