Java8特性之Stream篇

Stream介紹

Stream是Java8的新特性之一,從名字看出來,它的含義是流,所謂的流就是一個數據渠道,能夠用於操做集合所生成的元素序列。 實際上不光是集合,包括數組、文件等,只要是能夠轉換成流,咱們均可以藉助流式處理,相似於咱們寫SQL語句同樣對其進行操做。java

1)Stream本身不會存儲元素。
2)Stream不會改變源對象。
3)Stream操做是延時執行的,這意味着它們會等到須要結果時才執行。數組

處理過程

Stream的流式處理能夠分爲三個部分:轉換成流、中間操做、終端操做,以下圖:數據結構

具體語句以下:app

List userIds = userLists.stream().map(UserDO::getId).collect(Collectors.toList()); stream語句將userLists集合轉換成了一個流,map爲中間操做,經過函數式表達式獲取user的Id,最後經過終端操做將其轉換成一個list集合返回。less

能夠發現,Stream極大地簡化了咱們對集合的操做。dom

API

建立流

1) 經過Collection得Stream()方法(串行流)或者 parallelStream()方法(並行流)建立Stream。ide

@Test
public void test1 () {

    //1. 經過Collection得Stream()方法(串行流)
    //或者 parallelStream()方法(並行流)建立Stream
    List<String> list = new ArrayList<String>();
    Stream<String> stream1 = list.stream();
    
    Stream<String> stream2 = list.parallelStream();
    
}
複製代碼

2) 經過Arrays中得靜態方法stream()獲取數組流 。函數

@Test
public void test2 () {
    
    //2. 經過Arrays中得靜態方法stream()獲取數組流
    IntStream stream = Arrays.stream(new int[]{3,5});
    
}
複製代碼

3) 經過Stream類中得 of()靜態方法獲取流 。學習

@Test
public void test3 () {
    
    //3. 經過Stream類中得 of()靜態方法獲取流
    Stream<String> stream = Stream.of("4645", "huinnj");
    
}
複製代碼

4) 建立無限流(迭代、生成) 。ui

@Test
public void test4 () {
    
    //4. 建立無限流
    //迭代(須要傳入一個種子,也就是起始值,而後傳入一個一元操做)
    Stream<Integer> stream1 = Stream.iterate(2, (x) -> x * 2);
    
    //生成(無限產生對象)
    Stream<Double> stream2 = Stream.generate(() -> Math.random());
    
}
複製代碼

中間操做API

1)distinct

distinct: 對於Stream中包含的元素進行去重操做(去重邏輯依賴元素的equals方法),新生成的Stream中沒有重複的元素。

2)filter 過濾

filter方法是根據設置的條件來過濾、篩選出所須要的元素。

3)map 根據指定的函數進行轉化

map方法是對於Stream中包含的元素使用給定的轉換函數進行轉換操做,新生成的Stream只包含轉換生成的元素。

4)limit 獲取指定數量的元素

limit方法是根據設定的數量從流中獲取對應數量的元素。

5)skip 獲取去除指定數量以後的元素

skip方法與limt有點區別,此方法是獲取到根據設定的數量n去除掉集合中前n個元素以後的全部數據。

6)sorted 排序

sorted方法是對流中的元素進行排序,降序和升序均可以使用天然排序的方法,也能夠調用Comparator中的方法進行排序

7)peek

peek 方法是生成一個包含原Stream的全部元素的新Stream,同時會提供一個消費函數(Consumer實例),新Stream每一個元素被消費的時候都會執行給定的消費函數 注:剛開始覺得peek與map的用法同樣,後面經過學習及練習,peek方法實際上是沒有返回值的,多半用於一些輸出,數據的校驗等。

8)flatMap

接收一個函數做爲參數,將流中的每一個值都換成一個流,而後把全部流鏈接成一個流。

終端操做API

1) allMatch 查找匹配

檢查是否匹配全部元素。

2)anyMatch

檢查是否至少匹配全部元素 。

3)noneMatch

檢查是否沒有匹配全部元素 。

4)findFirst

返回第一個元素。

@Test
public void test16 () {
    Optional<Person> person = persons.stream().findFirst();
    System.out.println(person);
​
    person.orElse(new Person("王五", "男", 35, Status.BUSY));
}
複製代碼

5)findAny

返回當前流中任意元素。

6)count

返回流中元素總個數。

7)max

返回流中最大值。

8)min

返回流中最小值。

9)reduce(能夠將流中元素反覆結合在一塊兒,獲得一個值)

有reduce(T identitty,BinaryOperator) 與reduce(BinaryOperator),前者須要一個起始值,而後傳入二元運算,後者沒有起始值。

10)collect (將流轉換爲其餘形式。接收一個Collector接口得實現,用於給其餘Stream中元素作彙總的方法)

Collector接口中方法得實現決定了如何對流執行收集操做(如收集到List,Set,Map)。可是Collectors實用類提供了不少靜態方法,能夠方便地建立常見得收集器實例。

總結

Stream流水線原理

問題引出

先來看下列語句:

List<String> list = Arrays.asList("Hello","World","Word");
// 求出以W開頭,長度最長的單詞,並把長度賦值給num
Integer num = list.stream().filter(x -> x.startsWith("W")).mapToInt(String::length).max().getAsInt();
複製代碼

邏輯很簡單,使用一行stream()語句便可完成需求,可是這裏有一個問題,stream()到底是怎麼作到用一行來完成這麼多篩選條件的語句呢?它是每調用一次中間操做就遍歷一遍集合元素嗎?答案是否是的,咱們來看看stream()具體是怎麼操做的。

首先咱們來看看通常的for循環是怎麼在一次迭代的狀況下完成的:

int maxLength = 0;
for (String str : list) {
    if (str.startsWith("W")) {
        maxLength = Math.max(str.length(), maxLength);
    }
}
複製代碼

這種方式不但只用了一次迭代,同時避免了存儲中間結果,可是它有個前提,必須得知道當前的程序意圖。而Stream設計者並不能知道用戶想要實現的功能是什麼。所以,在沒法假設用戶行爲的前提下實現流水線,是設計者首要考慮的問題。

解決

在這裏,你們應該都能想到,若是可以像隊列同樣記錄用戶的每一次操做,在用戶結束操做的時候,再將全部的記錄疊加到一次迭代中所有執行掉,那不就解決問題了嗎。那麼接下來又會有一系列的問題:

1)用戶的操做該怎麼記錄?

2)操做如何疊加?

3)疊加以後的操做如何?

4)執行後的結果在哪裏?

操做如何記錄

注意,這裏的操做是指中間操做,Stream中會使用Stage概念來描述一個完整的操做,並用某種實例化後的PipelineHelper來表明Stage,將具備前後順序的各個Stage連到一塊兒,就構成了整個流水線,其中Stream相關類和接口的繼承關係如上圖所示。

Stream流水線組織結構示意圖以下:

經過Collection.stream()方法獲得Head,也就是stage0,緊接着調用一系列中間操做,不斷產生新的Stream,這些Stream對象以雙向鏈表的形式組織在一塊兒,構成整個流水線,因爲每一個Stage都記錄了前一個Stage和本次的操做以及回調函數,依靠這種結構就能創建起對數據源的全部操做。

Head記錄Stream起始操做。
StatelessOp記錄無狀態的中間操做。
StatefulOp記錄有狀態的中間操做 。

Reference構造方法

AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;
    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
複製代碼

Reference構造方法調用了AbstractPipeline的構造方法,能夠發現,它的構造函數更像是鏈表的操做,指定了前驅stage的後繼stage爲當前stage。

Head構造方法

AbstractPipeline(Supplier<? extends Spliterator<?>> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSupplier = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}
複製代碼

能夠發現,Head的構造方法指定了sourceStage爲當前stage。

操做如何疊加

如今咱們知道Stream()是如何記錄每一步的操做了,要想讓流水線將全部的操做疊加在一塊兒,還須要有一種協議來協調相鄰的Stage之間的關係。這個協議就是Sink接口,Sink接口方法以下:

有了此協議,相鄰Stage之間調用就很方便了,每一個Stage都會將本身的操做封裝到一個Sink裏,前一個Stage只需調用後一個Stage的accept()方法便可,並不須要知道內部是如何處理的。對於短路操做,也要實現cancellationRequested()。實際上Stream API內部實現的本質,就是如何重載Sink的這四個接口方法。

有了Sink包裝,流水線調用時只須要從流水線的head開始,對數據源依次調用每一個Stage對應的Sink{begin(),accept(),cancellationRequested(),end()}就能夠了。就相似於[處理->轉發]這種模型。

讓咱們以**Stream.map()**爲例,探索Sink是怎麼實現的:

// 產生一個新的stream
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        // opWrapSink返回回調函數包裝成的Sink
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    // 將處理結果傳遞給流水線下游
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}
複製代碼

上述邏輯總結以下:首先,將回調函數mapper包裝到一個Sink中,因爲Stream.map()是一個無狀態的中間操做,因此返回一個StatelessOp對象,調用這個新Stream的opWripSink()獲得一個包裝了當前回調函數的Sink。

接下來咱們看一看**Stream.sorted()**方法:

// Stream.sort()方法用到的Sink實現
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList<T> list;// 存放用於排序的元素
    RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 建立一個存放排序元素的列表
        list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素所有接收以後才能開始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操做
            list.forEach(downstream::accept);// 2. 將處理結果傳遞給流水線下游的Sink
        }
        else {// 下游Sink包含短路操做
            for (T t : list) {// 每次都調用cancellationRequested()詢問是否能夠結束處理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 將處理結果傳遞給流水線下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用當前Sink包裝動做處理t,只是簡單的將元素添加到中間列表當中
    }
}
複製代碼

1)首先beging()方法告訴Sink參與排序的元素個數,方便肯定中間結果容器的的大小;

2)以後經過accept()方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷全部元素;

3)最後end()方法告訴Sink全部元素遍歷完畢,啓動排序步驟,排序完成後將結果傳遞給下游的Sink;

4)若是下游的Sink是短路操做,將結果傳遞給下游時不斷詢問下游cancellationRequested()是否能夠結束處理。

疊加以後的操做如何執行

既然Sink已經將Stream的每一步操做都進行了封裝,就差最後一步執行了,那麼到底是誰來執行呢?沒錯,就是咱們一開始提到的結束操做。

結束操做以後不能再有別的操做,因此結束操做會建立一個包裝了本身操做的Sink,這也是最後一個Sink,它只需處理數據,而不須要轉發,對於Sink的[處理,轉發]模型,結束操做的Sink就是調用鏈的出口。

咱們再來考察一下上游的Sink是如何找到下游Sink的。一種可選的方案是在PipelineHelper中設置一個Sink字段,在流水線中找到下游Stage並訪問Sink字段便可。

但Stream類庫的設計者沒有這麼作,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來獲得Sink,該方法的做用是返回一個新的包含了當前Stage表明的操做以及可以將結果傳遞給downstream的Sink對象。

爲何要產生一個新對象而不是返回一個Sink字段?這是由於使用opWrapSink()能夠將當前操做與下游Sink(上文中的downstream參數)結合成新Sink。

試想只要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,由於stage0表明數據源,不包含操做),就能夠獲得一個表明了流水線上全部操做的Sink,用代碼表示就是這樣:

// AbstractPipeline.wrapSink()
// 從下游向上遊不斷包裝Sink。若是最初傳入的sink表明結束操做,
// 函數返回時就能夠獲得一個表明了流水線上全部操做的Sink。
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
複製代碼

如今流水線上從開始到結束的全部的操做都被包裝到了一個Sink裏,執行這個Sink就至關於執行整個流水線,執行Sink的代碼以下:

// AbstractPipeline.copyInto(), 對spliterator表明的數據執行wrappedSink表明的操做。
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知開始遍歷
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍歷結束
    }
    ...
}
複製代碼

執行結果存放

1)對於表中返回boolean或者Optional的操做(Optional是存放 一個 值的容器)的操做,因爲值返回一個值,只須要在對應的Sink中記錄這個值,等到執行結束時返回就能夠了。

2)對於歸約操做,最終結果放在用戶調用時指定的容器中(容器類型經過收集器指定)。collect(), reduce(), max(), min()都是歸約操做,雖然max()和min()也是返回一個Optional,但事實上底層是經過調用reduce()方法實現的。

3)於返回是數組的狀況,毫無疑問的結果會放在數組當中。這麼說固然是對的,但在最終返回數組以前,結果實際上是存儲在一種叫作Node的數據結構中的。Node是一種多叉樹結構,元素存儲在樹的葉子當中,而且一個葉子節點能夠存放多個元素。這樣作是爲了並行執行方便。

相關文章
相關標籤/搜索