《Java8實戰》-第六章讀書筆記(用流收集數據-02)

使用流收集數據

分區

分區是分組的特殊狀況:由一個謂詞(返回一個布爾值的函數)做爲分類函數,它稱分區函數。分區函數返回一個布爾值,這意味着獲得的分組 Map 的鍵類型是 Boolean ,因而它最多能夠分爲兩組—— true 是一組, false 是一組。例如,若是你是素食者或是請了一位素食的朋友來共進晚餐,可能會想要把菜單按照素食和非素食分開:java

Map<Boolean, List<Dish>> partitionedMenu =
                // 分區函數
                menu.stream().collect(partitioningBy(Dish::isVegetarian));

這會返回下面的 Map :git

{false=[Dish{name='pork'}, Dish{name='beef'}, Dish{name='chicken'}, Dish{name='prawns'}, Dish{name='salmon'}], 
true=[Dish{name='french fries'}, Dish{name='rice'}, Dish{name='season fruit'}, Dish{name='pizza'}]}

那麼經過 Map 中鍵爲 true 的值,就能夠找出全部的素食菜餚了:github

List<Dish> vegetarianDishes = partitionedMenu.get(true);

請注意,用一樣的分區謂詞,對菜單 List 建立的流做篩選,而後把結果收集到另一個 List中也能夠得到相同的結果:算法

List<Dish> vegetarianDishes =
                        menu.stream().filter(Dish::isVegetarian).collect(toList());

分區的優點

分區的好處在於保留了分區函數返回 true 或 false 的兩套流元素列表。在上一個例子中,要獲得非素食 Dish 的 List ,你可使用兩個篩選操做來訪問 partitionedMenu 這個 Map 中 false鍵的值:一個利用謂詞,一個利用該謂詞的非。並且就像你在分組中看到的, partitioningBy工廠方法有一個重載版本,能夠像下面這樣傳遞第二個收集器:小程序

Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianDishesByType =
                menu.stream().collect(
                        // 分區函數
                        partitioningBy(Dish::isVegetarian,
                                // 第二個收集器
                                groupingBy(Dish::getType)));

這將產生一個二級 Map :安全

{false={MEAT=[Dish{name='pork'}, Dish{name='beef'}, Dish{name='chicken'}], FISH=[Dish{name='prawns'}, Dish{name='salmon'}]}, 
true={OTHER=[Dish{name='french fries'}, Dish{name='rice'}, Dish{name='season fruit'}, Dish{name='pizza'}]}}

這裏,對於分區產生的素食和非素食子流,分別按類型對菜餚分組,獲得了一個二級 Map,和上面的相似。再舉一個例子,你能夠重用前面的代碼來找到素食和非素食中熱量最高的菜:框架

Map<Boolean, Dish> mostCaloricPartitionedByVegetarian = menu.stream().collect(
                partitioningBy(Dish::isVegetarian, collectingAndThen(
                        maxBy(comparingInt(Dish::getCalories)),
                        Optional::get
                )));

這將產生如下結果:ide

{false=Dish{name='pork'}, true=Dish{name='pizza'}}

你能夠把分區看做分組一種特殊狀況。 groupingBy 和partitioningBy 收集器之間的類似之處並不止於此。函數

將數字按質數和非質數分區

假設你要寫一個方法,它接受參數 int n,並將前n個天然數分爲質數和非質數。但首先,找出可以測試某一個待測數字是不是質數的謂詞會頗有幫助:性能

private static boolean isPrime(int candidate) {
    // 產生一個天然數範圍,從2開始,直至但不包括待測數
    return IntStream.range(2, candidate)
            // 若是待測數字不能被流中任何數字整除則返回 true
            .noneMatch(i -> candidate % i == 0);
}

一個簡單的優化是僅測試小於等於待測數平方根的因子:

private static boolean isPrime(int candidate) {
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return IntStream.rangeClosed(2, candidateRoot)
            .noneMatch(i -> candidate % i == 0);
}

如今最主要的一部分工做已經作好了。爲了把前n個數字分爲質數和非質數,只要建立一個包含這n個數的流,用剛剛寫的 isPrime 方法做爲謂詞,再給 partitioningBy 收集器歸約就行了:

private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
    return IntStream.rangeClosed(2, n).boxed()
            .collect(
                    partitioningBy(candidate -> isPrime(candidate)));
}

如今咱們已經討論過了 Collectors 類的靜態工廠方法可以建立的全部收集器,並介紹了使用它們的實際例子。

收集器接口

Collector 接口包含了一系列方法,爲實現具體的歸約操做(即收集器)提供了範本。咱們已經看過了 Collector 接口中實現的許多收集器,例如 toList 或 groupingBy 。這也意味着,你能夠爲 Collector 接口提供本身的實現,從而自由地建立自定義歸約操做。

要開始使用 Collector 接口,咱們先看看本章開始時講到的一個收集器—— toList 工廠方法,它會把流中的全部元素收集成一個 List 。咱們當時說在平常工做中常常會用到這個收集器,並且它也是寫起來比較直觀的一個,至少理論上如此。經過仔細研究這個收集器是怎麼實現的,咱們能夠很好地瞭解 Collector 接口是怎麼定義的,以及它的方法所返回的函數在內部是如何爲collect 方法所用的。

首先讓咱們在下面的列表中看看 Collector 接口的定義,它列出了接口的簽名以及聲明的五個方法。

public interface Collector<T, A, R> {
        Supplier<A> supplier();
        BiConsumer<A, T> accumulator();
        Function<A, R> finisher();
        BinaryOperator<A> combiner();
        Set<Characteristics> characteristics();
}

本列表適用如下定義。

  1. T 是流中要收集的項目的泛型。
  2. A 是累加器的類型,累加器是在收集過程當中用於累積部分結果的對象。
  3. R 是收集操做獲得的對象(一般但並不必定是集合)的類型。

例如,你能夠實現一個 ToListCollector<T> 類,將 Stream<T> 中的全部元素收集到一個List<T> 裏,它的簽名以下:

public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

咱們很快就會澄清,這裏用於累積的對象也將是收集過程的最終結果。

理解 Collector 接口聲明的方法

如今咱們能夠一個個來分析 Collector 接口聲明的五個方法了。經過分析,你會注意到,前四個方法都會返回一個會被 collect 方法調用的函數,而第五個方法 characteristics 則提供了一系列特徵,也就是一個提示列表,告訴 collect 方法在執行歸約操做的時候能夠應用哪些優化(好比並行化)。

1. 創建新的結果容器: supplier 方法

supplier 方法必須返回一個結果爲空的 Supplier ,也就是一個無參數函數,在調用時它會建立一個空的累加器實例,供數據收集過程使用。很明顯,對於將累加器自己做爲結果返回的收集器,好比咱們的 ToListCollector ,在對空流執行操做的時候,這個空的累加器也表明了收集過程的結果。在咱們的 ToListCollector 中, supplier 返回一個空的 List ,以下所示:

@Override
public Supplier<List<T>> supplier() {
    return () -> new ArrayList<>();
}

請注意你也能夠只傳遞一個構造函數引用:

@Override
public Supplier<List<T>> supplier() {
    return ArrayList::new;
}

2. 將元素添加到結果容器: accumulator 方法

accumulator 方法會返回執行歸約操做的函數。當遍歷到流中第n個元素時,這個函數執行時會有兩個參數:保存歸約結果的累加器(已收集了流中的前 n-1 個項目),還有第n個元素自己。該函數將返回void ,由於累加器是原位更新,即函數的執行改變了它的內部狀態以體現遍歷的元素的效果。對於ToListCollector ,這個函數僅僅會把當前項目添加至已經遍歷過的項目的列表:

@Override
public BiConsumer<List<T>, T> accumulator() {
    return (list, item) -> list.add(item);
}

你也可使用方法引用,這會更爲簡潔:

@Override
public BiConsumer<List<T>, T> accumulator() {
    return List::add;
}

3. 對結果容器應用最終轉換: finisher 方法

在遍歷完流後, finisher 方法必須返回在累積過程的最後要調用的一個函數,以便將累加器對象轉換爲整個集合操做的最終結果。一般,就像 ToListCollector 的狀況同樣,累加器對象剛好符合預期的最終結果,所以無需進行轉換。因此 finisher 方法只需返回 identity 函數:

@Override
public Function<List<T>, List<T>> finisher() {
    return Function.identity();
}

這三個方法已經足以對流進行循序規約。實踐中的實現細節可能還要複雜一點,一方面是應爲流的延遲性質,可能在collect操做以前還需完成其餘中間操做的流水線,另外一方面則是理論上可能要進行並行規約。

4. 合併兩個結果容器: combiner 方法

四個方法中的最後一個————combiner方法會返回一個供歸約操做的使用函數,它定義了對流的各個子部分進行並行處理時,各個子部分歸約所得的累加器要如何合併。對於toList而言,這個方法的實現很是簡單,只要把從流的第二個部分收集到的項目列表加到遍歷第一部分時獲得的列表後面就好了:

@Override
public BinaryOperator<List<T>> combiner() {
    return (list1, list2) -> {
        list1.addAll(list2);
        return list1;
    };
}

有了這第四個方法,就能夠對流進行並行歸約了。它會用到Java7中引入的分支/合併框架和Spliterator抽象。

5. characteristics 方法

最後一個方法—— characteristics 會返回一個不可變的 Characteristics 集合,它定義了收集器的行爲——尤爲是關於流是否能夠並行歸約,以及可使用哪些優化的提示。Characteristics 是一個包含三個項目的枚舉。

  1. UNORDERED ——歸約結果不受流中項目的遍歷和累積順序的影響。
  2. CONCURRENT —— accumulator 函數能夠從多個線程同時調用,且該收集器能夠並行歸約流。若是收集器沒有標爲 UNORDERED ,那它僅在用於無序數據源時才能夠並行歸約。
  3. IDENTITY_FINISH ——這代表完成器方法返回的函數是一個恆等函數,能夠跳過。這種狀況下,累加器對象將會直接用做歸約過程的最終結果。這也意味着,將累加器 A 不加檢查地轉換爲結果 R 是安全的。

咱們迄今開發的 ToListCollector 是 IDENTITY_FINISH 的,由於用來累積流中元素的List 已是咱們要的最終結果,用不着進一步轉換了,但它並非 UNORDERED ,由於用在有序流上的時候,咱們仍是但願順序可以保留在獲得的 List 中。最後,它是 CONCURRENT 的,但咱們剛纔說過了,僅僅在背後的數據源無序時纔會並行處理。

所有融合到一塊兒

前一小節中談到的五個方法足夠咱們開發本身的 ToListCollector 了。你能夠把它們都融合起來,以下面的代碼清單所示。

public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
    @Override
    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<T>, T> accumulator() {
        return List::add;
    }

    @Override
    public BinaryOperator<List<T>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }

    @Override
    public Function<List<T>, List<T>> finisher() {
        return Function.identity();
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.CONCURRENT));
    }
}

請注意,這個是實現與Collections.toList()方法並不徹底相同,但區別僅僅是一些小的優化。這些優化的一個主要方面是Java API所提供的收集器在須要返回空列表時使用了 Collections.emptyList() 這個單例(singleton)。這意味着它可安全地替代原生Java,來收集菜單流中的全部 Dish 的列表:

List<Dish> dishes = menuStream.collect(new ToListCollector<>());

這個實現和標準的

List<Dish> dishes = menuStream.collect(toList());

構造之間的其餘差別在於 toList 是一個工廠,而 ToListCollector 必須用 new 來實例化。

進行自定義收集而不去實現 Collector

對於 IDENTITY_FINISH 的收集操做,還有一種方法能夠獲得一樣的結果而無需從頭實現新的 Collectors 接口。 Stream 有一個重載的 collect 方法能夠接受另外三個函數—— supplier 、accumulator 和 combiner ,其語義和 Collector 接口的相應方法返回的函數徹底相同。因此好比說,咱們能夠像下面這樣把菜餚流中的項目收集到一個 List 中:

List<Dish> dishes = menuStream.collect(
                ArrayList::new,
                List::add,
                List::addAll);

咱們認爲,這第二種形式雖然比前一個寫法更爲緊湊和簡潔,卻不那麼易讀。此外,以恰當的類來實現本身的自定義收集器有助於重用並可避免代碼重複。另外值得注意的是,這第二個collect 方法不能傳遞任何 Characteristics ,因此它永遠都是一個 IDENTITY_FINISH 和CONCURRENT 但並不是 UNORDERED 的收集器。

在下一節中,咱們一塊兒來實現一個收集器的,讓咱們對收集器的新知識更上一層樓。你將會爲一個更爲複雜,但更爲具體、更有說服力的用例開發本身的自定義收集器。

開發你本身的收集器以得到更好的性能

咱們用 Collectors 類提供的一個方便的工廠方法建立了一個收集器,它將前n個天然數劃分爲質數和非質數,以下所示。

將前n個天然數按質數和非質數分區:

private static Map<Boolean, List<Integer>> partitionPrimes(int n) {
    return IntStream.rangeClosed(2, n).boxed()
            .collect(
                    partitioningBy(candidate -> isPrime(candidate)));
}

當時,經過限制除數不超過被測試數的平方根,咱們對最初的 isPrime 方法作了一些改進:

private static boolean isPrime(int candidate) {
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return IntStream.rangeClosed(2, candidateRoot)
            .noneMatch(i -> candidate % i == 0);
}

還有沒有辦法來得到更好的性能呢?答案是「有」,但爲此你必須開發一個自定義收集器。

僅用質數作除數

一個可能的優化是僅僅看看被測試數是否是可以被質數整除。要是除數自己都不是質數就用不着測了。因此咱們能夠僅僅用被測試數以前的質數來測試。然而咱們目前所見的預約義收集器的問題,也就是必須本身開發一個收集器的緣由在於,在收集過程當中是沒有辦法訪問部分結果的。這意味着,當測試某一個數字是不是質數的時候,你無法訪問目前已經找到的其餘質數的列表。

假設你有這個列表,那就能夠把它傳給 isPrime 方法,將方法重寫以下:

private static boolean isPrime(List<Integer> primes, int candidate) {
    return primes.stream().noneMatch(i -> candidate % i == 0);
}

並且還應該應用先前的優化,僅僅用小於被測數平方根的質數來測試。所以,你須要想辦法在下一個質數大於被測數平方根時當即中止測試。不幸的是,Stream API中沒有這樣一種方法。你可使用 filter(p -> p <= candidateRoot) 來篩選出小於被測數平方根的質數。但 filter要處理整個流才能返回恰當的結果。若是質數和非質數的列表都很是大,這就是個問題了。你用不着這樣作;你只需在質數大於被測數平方根的時候停下來就能夠了。所以,咱們會建立一個名爲 takeWhile 的方法,給定一個排序列表和一個謂詞,它會返回元素知足謂詞的最長前綴:

public static <A> List<A> takeWhile(List<A> list, Predicate<A> p) {
    int i = 0;
    for (A item : list) {
        if (!p.test(item)) {
            return list.subList(0, i);
        }
        i++;
    }
    return list;
}

利用這個方法,你就能夠優化 isPrime 方法,只用不大於被測數平方根的質數去測試了:

private static boolean isPrime(List<Integer> primes, int candidate){
    int candidateRoot = (int) Math.sqrt((double) candidate);
    return takeWhile(primes, i -> i <= candidateRoot)
            .stream()
            .noneMatch(p -> candidate % p == 0);
}

請注意,這個 takeWhile 實現是即時的。理想狀況下,咱們會想要一個延遲求值的takeWhile ,這樣就能夠和 noneMatch 操做合併。不幸的是,這樣的實現超出了本章的範圍,你須要瞭解Stream API的實現才行。

有了這個新的 isPrime 方法在手,你就能夠實現本身的自定義收集器了。首先要聲明一個實現 Collector 接口的新類,而後要開發 Collector 接口所需的五個方法。

1. 第一步:定義 Collector 類的簽名

讓咱們從類簽名開始吧,記得 Collector 接口的定義是:

public interface Collector<T, A, R>

其中 T 、 A 和 R 分別是流中元素的類型、用於累積部分結果的對象類型,以及 collect 操做最終結果的類型。這裏應該收集 Integer 流,而累加器和結果類型則都是 Map<Boolean,List<Integer>>,鍵是 true 和 false ,值則分別是質數和非質數的 List :

public class PrimeNumbersCollector implements Collector<Integer, Map<Boolean, List<Integer>>,
        Map<Boolean, List<Integer>>>

2. 第二步:實現歸約過程

接下來,你須要實現 Collector 接口中聲明的五個方法。 supplier 方法會返回一個在調用時建立累加器的函數:

@Override
public Supplier<Map<Boolean, List<Integer>>> supplier() {
    return () -> new HashMap<Boolean, List<Integer>>(2) {
        {
            put(true, new ArrayList<>());
            put(false, new ArrayList<>());
        }
    };
}

這裏不但建立了累積器的Map,還爲true和false兩個鍵下面出實話了對應的空列表。在收集過程當中會把質數和非指數分別添加到這裏。收集器重要的方法是accumulator,由於它定義瞭如何收集流中元素的邏輯。這裏它也是實現了前面所講的優化的關鍵。如今在任何一次迭代中,均可以訪問收集過程的部分結果,也就是包含迄今找到的質數的累加器:

@Override
public BiConsumer<Map<Boolean, List<Integer>>, Integer> accumulator() {
    return ((Map<Boolean, List<Integer>> acc, Integer candidate) -> acc.get(isPrime(acc.get(true), candidate)).add(candidate));
}

在這個個方法中,你調用了isPrime方法,將待測試是否爲質數的數以及迄今爲止找到的質數列表(也就是累積Map中true鍵對應的值)傳遞給它。此次調用的結果隨後被用做獲取質數或非質數列表的鍵,這樣就能夠把新的被測數添加到恰當的列表中。

3.第三步:讓收集器並行工做(若是可能)

下一個方法要在並行收集時把兩個部分累加器合併起來,這裏,它只須要合併兩個Map,即將第二個Map中質數和非質數列表中的全部數字合併到第一個Map的對應列表中就好了:

@Override
public BinaryOperator<Map<Boolean, List<Integer>>> combiner() {
    return (Map<Boolean, List<Integer>> map1, Map<Boolean, List<Integer>> map2) -> {
        map1.get(true).addAll(map2.get(true));
        map1.get(false).addAll(map2.get(false));
        return map1;
    };
}

請注意,實際上這個收集器是不能並行的,由於該算法自己是順序的。這意味着永遠都不會調用combiner方法,你能夠把它的實現留空。爲了讓這個例子完整,咱們仍是決定實現它。

4.第四步:finisher方法和收集器的characteristics方法

最後兩個方法實現都很簡單。前面說過,accumulator正好就是收集器的結果,也用不着進一步轉換,那麼finisher方法就返回identity函數:

@Override
public Function<Map<Boolean, List<Integer>>, Map<Boolean, List<Integer>>> finisher() {
    return Function.identity();
}

就characteristics方法而言,咱們已經說過,它既不是CONCURRENT也不是UNOREDERED,但倒是IDENTITY_FINISH的:

@Override
public Set<Characteristics> characteristics() {
    return Collections.unmodifiableSet(EnumSet.of(Characteristics.IDENTITY_FINISH));
}

如今,你能夠用這個新的自定義收集器來替代partitioningBy工廠方法建立的那個,並得到徹底相同的結果了:

private static Map<Boolean, List<Integer>> partitionPrimesWithCustomCollector(int n) {
    return IntStream.rangeClosed(2, n).boxed()
            .collect(new PrimeNumbersCollector());
}
Map<Boolean, List<Integer>> primes = partitionPrimesWithCustomCollector(10);
// {false=[4, 6, 8, 9, 10], true=[2, 3, 5, 7]}
System.out.println(primes);

收集器性能比較

用partitioningBy工廠方法穿件的收集器和你剛剛開發的自定義收集器在功能上是同樣的,可是咱們沒有實現用自定義收集器超越partitioningBy收集器性能的目標呢?如今讓咱們寫個小程序測試一下吧:

public class CollectorHarness {
    public static void main(String[] args) {
        long fastest = Long.MAX_VALUE;
        // 運行十次
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            // 將前100萬個天然數按指數和非質數區分
            partitionPrimes(1_000_000);
            long duration = (System.nanoTime() - start) / 1_000_000;
            // 檢查這個執行是不是最快的一個
            if (duration < fastest) {
                fastest = duration;
            }
            System.out.println("done in " + duration);
        }
        System.out.println("Fastest execution done in " + fastest + " msecs");
    }
}

在因特爾I5 6200U 2.4HGz的筆記上運行獲得如下的結果:

done in 976
done in 1091
done in 866
done in 867
done in 760
done in 759
done in 777
done in 894
done in 765
done in 763
Fastest execution done in 759 msecs

如今把測試框架的 partitionPrimes 換成 partitionPrimesWithCustomCollector ,以便測試咱們開發的自定義收集器的性能。

public class CollectorHarness {
    public static void main(String[] args) {
        excute(PrimeNumbersCollectorExample::partitionPrimesWithCustomCollector);
    }

    private static void excute(Consumer<Integer> primePartitioner) {
        long fastest = Long.MAX_VALUE;
        // 運行十次
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            // 將前100萬個天然數按指數和非質數區分
            // partitionPrimes(1_000_000);
            primePartitioner.accept(1_000_000);
            long duration = (System.nanoTime() - start) / 1_000_000;
            // 檢查這個執行是不是最快的一個
            if (duration < fastest) {
                fastest = duration;
            }
            System.out.println("done in " + duration);
        }
        System.out.println("Fastest execution done in " + fastest + " msecs");
    }
}

如今,程序打印:

done in 703
done in 649
done in 715
done in 434
done in 386
done in 403
done in 449
done in 416
done in 353
done in 405
Fastest execution done in 353 msecs

還不錯!看來咱們沒有白費功夫開發這個自定義收集器。

總結

  1. collect 是一個終端操做,它接受的參數是將流中元素累積到彙總結果的各類方式(稱爲收集器)。
  2. 預約義收集器包括將流元素歸約和彙總到一個值,例如計算最小值、最大值或平均值。
  3. 預約義收集器能夠用 groupingBy 對流中元素進行分組,或用 partitioningBy 進行分區。
  4. 收集器能夠高效地複合起來,進行多級分組、分區和歸約。
  5. 你能夠實現 Collector 接口中定義的方法來開發你本身的收集器。

代碼

Github:chap6

Gitee:chap6

相關文章
相關標籤/搜索