《java 8 實戰》讀書筆記 -第六章 用流收集數據

1、收集器簡介

把列表中的交易按貨幣分組:java

Map<Currency, List<Transaction>> transactionsByCurrencies = 
 transactions.stream().collect(groupingBy(Transaction::getCurrency));

圖片描述

從Collectors
類提供的工廠方法(例如groupingBy)建立的收集器。它們主要提供了三大功能:算法

  • 將流元素歸約和彙總爲一個值
  • 元素分組
  • 元素分區

2、歸約和彙總

數一數菜單裏有多少種菜:安全

long howManyDishes = menu.stream().collect(Collectors.counting());

這還能夠寫得更爲直接:app

long howManyDishes = menu.stream().count();

1.查找流中的最大值和最小值

可使用兩個收集器,Collectors.maxBy和Collectors.minBy,來計算流中的最大或最小值。這兩個收集器接收一個Comparator參數來比較流中的元素.
找出菜單中熱量最高的菜:框架

Comparator<Dish> dishCaloriesComparator = 
 Comparator.comparingInt(Dish::getCalories); 
Optional<Dish> mostCalorieDish = 
 menu.stream() 
 .collect(maxBy(dishCaloriesComparator));

2.彙總

  • Collectors.summingInt
    它可接受一個把對象映射爲求和所需int的函數,並返回一個收集器;該收集器在傳遞給普通的collect方法後即執行咱們須要的彙總操做。
    eg:異步

    int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));

    另外,Collectors.summingLong和Collectors.summingDouble方法的做用徹底同樣,能夠用於求和字段爲long或double的狀況。還有Collectors.averagingInt,連同對應的averagingLong和averagingDouble能夠計算數值的平均數。ide

  • summarizing操做
    經過一次summarizing操做你能夠就數出菜單中元素的個數,並獲得菜餚熱量總和、平均值、最大值和最小值函數

    IntSummaryStatistics menuStatistics = 
    menu.stream().collect(summarizingInt(Dish::getCalories));

    這個收集器會把全部這些信息收集到一個叫做IntSummaryStatistics的類裏,它提供了方便的取值(getter)方法來訪問結果。打印menuStatisticobject會獲得如下輸出:優化

    IntSummaryStatistics{count=9, sum=4300, min=120, 
    average=477.777778, max=800}

    一樣,相應的summarizingLong和summarizingDouble工廠方法有相關的LongSummaryStatistics和DoubleSummaryStatistics類型。ui

3.鏈接字符串

joining工廠方法返回的收集器會把對流中每個對象應用toString方法獲得的全部字符串鏈接成一個字符串。

String shortMenu = menu.stream().map(Dish::getName).collect(joining());

joining工廠方法有一個重載版本能夠接受元素之間的分界符

String shortMenu = menu.stream().map(Dish::getName).collect(joining(", "));

4.廣義的歸約彙總

能夠用reducing方法建立的收集器來計算你菜單的總熱量,以下所示:

int totalCalories = menu.stream().collect(reducing( 
 0, Dish::getCalories, (i, j) -> i + j));
  • 第一個參數是歸約操做的起始值。
  • 第二個參數將菜餚轉換成一個表示其所含熱量的int。
  • 第三個參數是一個BinaryOperator,將兩個項目累積成一個同類型的值。這裏它就是對兩個int求和。

單參數形式的reducing來找到熱量最高的菜,以下所示:

Optional<Dish> mostCalorieDish = 
 menu.stream().collect(reducing( 
 (d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2));
相比stream的reduce方法collect方法特別適合表達可變容器上的歸約,更關鍵的是它適合並行操做

計算菜單裏全部菜餚的卡路里總和,以不一樣的方法執行一樣的操做:

第一種:

int totalCalories = menu.stream().collect(reducing(0, 
 Dish::getCalories,
 Integer::sum));

第二種:

int totalCalories = 
  menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();//reduce返回的是Optional

第三種:

int totalCalories = menu.stream().mapToInt(Dish::getCalories).sum();

最後一種最佳。

3、分組

假設你要把菜單中的菜按照類型進行分類,有肉的放一組,有魚的放一組,其餘的都放另外一組。用Collectors.groupingBy工廠方法返回的收集器就能夠輕鬆地完成這項任務,以下所示:

Map<Dish.Type, List<Dish>> dishesByType = 
 menu.stream().collect(groupingBy(Dish::getType));

其結果是下面的Map:

{FISH=[prawns, salmon], OTHER=[french fries, rice, season fruit, pizza], 
MEAT=[pork, beef, chicken]}
給groupingBy方法傳遞了一個Function(以方法引用的形式),它提取了流中每一道Dish的Dish.Type。咱們把這個Function叫做 分類函數

若是Dish中沒有定義類型獲取方法,可使用lambda表達式:

public enum CaloricLevel { DIET, NORMAL, FAT } 

Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect( 
 groupingBy(dish -> { 
 if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
 else if (dish.getCalories() <= 700) return 
 CaloricLevel.NORMAL; 
 else return CaloricLevel.FAT; 
 } ));

1.多級分組

使用一個由雙參數版本的Collectors.groupingBy工廠方法建立的收集器,它除了普通的分類函數以外,還能夠接受collector類型的第二個參數:

Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = 
menu.stream().collect( 
 groupingBy(Dish::getType, 
 groupingBy(dish -> { 
 if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
 else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
 else return CaloricLevel.FAT; 
 } ) 
 ) 
);
這種多級分組操做能夠擴展至任意層級,n級分組就會獲得一個表明n級樹形結構的n級Map

2.按子組收集數據

傳遞給第一個groupingBy的第二個收集器能夠是任何類型,而不必定是另外一groupingBy

Map<Dish.Type, Long> typesCount = menu.stream().collect( 
 groupingBy(Dish::getType, counting()));

其結果是下面的Map:

{MEAT=3, FISH=2, OTHER=4}
普通的單參數groupingBy(f)(其中f是分類函數)其實是 groupingBy(f, toList())的簡便寫法。
  • 把收集器的結果轉換爲另外一種類型
    查找每一個子組中熱量最高的Dish

    Map<Dish.Type, Dish> mostCaloricByType = 
    menu.stream() 
    .collect(groupingBy(Dish::getType,
    collectingAndThen( 
    maxBy(comparingInt(Dish::getCalories)), //maxBy工廠方法生成的收集器的類型是Optional
    Optional::get)));
包裝的Optional沒什麼用,把收集器返回的結果轉換爲另外一種類型,你可使用 Collectors.collectingAndThen工廠方法;返回的收集器groupingBy收集器只有在應用分組條件後,第一次在流中找到某個鍵對應的元素時纔會把鍵加入分組Map中,因此Optional::get這個操做放在這裏是安全的,由於reducing收集器永遠都不會返回Optional.empty()
  • 與groupingBy聯合使用的其餘收集器的例子

    Map<Dish.Type, Integer> totalCaloriesByType = 
    menu.stream().collect(groupingBy(Dish::getType, 
    summingInt(Dish::getCalories)));

    對於每種類型的Dish,菜單中都有哪些CaloricLevel。咱們能夠把groupingBy和mapping收集器結合起來,以下所示:

    Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = 
    menu.stream().collect( 
    groupingBy(Dish::getType, mapping( 
    dish -> { 
    if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
    else return CaloricLevel.FAT;
    }, 
    toSet() )));//生成的CaloricLevel流傳遞給一個toSet收集器,
    //它和toList相似,不過是把流中的元素累積到一個Set而不是List中,以便僅保留各不相同的值。

    但經過使用toCollection,你就能夠有更多的控制。例如,你能夠給它傳遞一個構造函數引用來要求HashSet:

    Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = 
    menu.stream().collect( 
    groupingBy(Dish::getType, mapping( 
    dish -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
    else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
    else return CaloricLevel.FAT; }, 
    toCollection(HashSet::new) )));

4、分區

1.分區的優點

分區是分組的特殊狀況:由一個謂詞(返回一個布爾值的函數)做爲分類函數,它稱分區函數。分區函數返回一個布爾值,這意味着獲得的分組Map的鍵類型是Boolean,因而它最多能夠分爲兩組——true是一組,false是一組。例如,若是你是素食者或是請了一位素食的朋友來共進晚餐,可能會想要把菜單按照素食和非素食分開:

Map<Boolean, List<Dish>> partitionedMenu = 
 menu.stream().collect(partitioningBy(Dish::isVegetarian));

計算素食和非素食的數量:

menu.stream().collect(partitioningBy(Dish::isVegetarian,
 counting()));

2.將數字按質數和非質數分區

public boolean isPrime(int candidate) { 
 int candidateRoot = (int) Math.sqrt((double) candidate); 
 return IntStream.rangeClosed(2, candidateRoot) 
 .noneMatch(i -> candidate % i == 0); 
}

public Map<Boolean, List<Integer>> partitionPrimes(int n) { 
 return IntStream.rangeClosed(2, n).boxed() 
 .collect( 
 partitioningBy(candidate -> isPrime(candidate))); 
}

Collectors類的靜態工廠方法:
圖片描述

圖片描述

5、收集器接口

public interface Collector<T, A, R> { 
 Supplier<A> supplier(); 
 BiConsumer<A, T> accumulator(); 
 Function<A, R> finisher(); 
 BinaryOperator<A> combiner(); 
 Set<Characteristics> characteristics(); 
}
  • T是流中要收集的項目的泛型。
  • A是累加器的類型,累加器是在收集過程當中用於累積部分結果的對象。
  • R是收集操做獲得的對象(一般但並不必定是集合)的類型。

例如,你能夠實現一個ToListCollector<T>類,將Stream<T>中的全部元素收集List<T>裏,它的簽名以下:

public class ToListCollector<T> implements Collector<T, List<T>, List<T>>

1.理解 Collector 接口聲明的方法

(1)創建新的結果容器:supplier方法

在調用時它會建立一個空的累加器實例,供數據收集過程使用

public Supplier<List<T>> supplier() { 
 return () -> new ArrayList<T>(); 
}

或者使用構造函數引用;

public Supplier<List<T>> supplier() { 
 return ArrayList::new; 
}

(2)將元素添加到結果容器:accumulator方法

accumulator方法會返回執行歸約操做的函數。當遍歷到流中第n個元素時,這個函數執行時會有兩個參數:保存歸約結果的累加器(已收集了流中的前 n1 個項目),還有第n個元素自己。該函數將返回void,由於累加器是原位更新,即函數的執行改變了它的內部狀態以體現遍歷的元素的效果。對於ToListCollector,這個函數僅僅會把當前項目添加至已經遍歷過的項目的列表:

public BiConsumer<List<T>, T> accumulator() { 
 return (list, item) -> list.add(item); 
}

你也可使用方法引用,這會更爲簡潔:

public BiConsumer<List<T>, T> accumulator() { 
 return List::add; 
}

(3)對結果容器應用最終轉換:finisher方法

在遍歷完流後,finisher方法必須返回在累積過程的最後要調用的一個函數,以便將累加器對象轉換爲整個集合操做的最終結果。

public Function<List<T>, List<T>> finisher() { 
 return Function.identity(); //累加器對象剛好符合預期的最終結果,
//所以無需進行轉換。因此finisher方法只需返回identity函數
}

(4) 合併兩個結果容器:combiner方法

combiner方法會返回一個供歸約操做使用的函數,它定義了對流的各個子部分進行並行處理時,各個子部分歸約所得的累加器要如何合併。

public BinaryOperator<List<T>> combiner() { 
 return (list1, list2) -> { 
 list1.addAll(list2); 
 return list1; } 
}

有了這第四個方法,就能夠對流進行並行歸約了,會用到Java 7中引入的Fork/Join框架和Spliterator抽象
圖片描述

Fork/Join是什麼?
Fork/Join框架是Java7提供的並行執行任務框架,思想是將大任務分解成小任務,而後小任務又能夠繼續分解,而後每一個小任務分別計算出結果再合併起來,最後將彙總的結果做爲大任務結果。其思想和MapReduce的思想很是相似。對於任務的分割,要求各個子任務之間相互獨立,可以並行獨立地執行任務,互相之間不影響。

Fork/Join的運行流程圖以下:

21.png

咱們能夠經過Fork/Join單詞字面上的意思去理解這個框架。Fork是叉子分叉的意思,即將大任務分解成並行的小任務,Join是鏈接結合的意思,即將全部並行的小任務的執行結果彙總起來。

040501.gif

工做竊取算法
ForkJoin採用了工做竊取(work-stealing)算法,若一個工做線程的任務隊列爲空沒有任務執行時,便從其餘工做線程中獲取任務主動執行。爲了實現工做竊取,在工做線程中維護了雙端隊列,竊取任務線程從隊尾獲取任務,被竊取任務線程從隊頭獲取任務。這種機制充分利用線程進行並行計算,減小了線程競爭。可是當隊列中只存在一個任務了時,兩個線程去取反而會形成資源浪費。

工做竊取的運行流程圖以下:

image3.png

Fork/Join核心類
1.ForkJoinPool
ForkJoinPool是ForkJoin框架中的任務調度器,和ThreadPoolExecutor同樣實現了本身的線程池,提供了三種調度子任務的方法:
execute:異步執行指定任務,無返回結果;
invoke、invokeAll:同步執行指定任務,等待完成才返回結果;
submit:異步執行指定任務,並當即返回一個Future對象;
2.ForkJoinTask
Fork/Join框架中的實際的執行任務類,有如下兩種實現,通常繼承這兩種實現類便可。
RecursiveAction:用於無結果返回的子任務;
RecursiveTask:用於有結果返回的子任務;
Fork/Join框架實戰
下面實現一個Fork/Join小例子,從1+2+...10億,每一個任務只能處理1000個數相加,超過1000個的自動分解成小任務並行處理;並展現了經過不使用Fork/Join和使用時的時間損耗對比。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTask extends RecursiveTask<Long> {
   private static final long MAX = 1000000000L;
   private static final long THRESHOLD = 1000L;
   private long start;
   private long end;

   public ForkJoinTask(long start, long end) {
       this.start = start;
       this.end = end;
   }

   public static void main(String[] args) {
       test();
       System.out.println("--------------------");
       testForkJoin();
   }

   private static void test() {
       System.out.println("test");
       long start = System.currentTimeMillis();
       Long sum = 0L;
       for (long i = 0L; i <= MAX; i++) {
           sum += i;
       }
       System.out.println(sum);
       System.out.println(System.currentTimeMillis() - start + "ms");
   }

   private static void testForkJoin() {
       System.out.println("testForkJoin");
       long start = System.currentTimeMillis();
       ForkJoinPool forkJoinPool = new ForkJoinPool();
       Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX));
       System.out.println(sum);
       System.out.println(System.currentTimeMillis() - start + "ms");
   }

   @Override
   protected Long compute() {
       long sum = 0;
       if (end - start <= THRESHOLD) {
           for (long i = start; i <= end; i++) {
               sum += i;
           }
           return sum;
       } else {
           long mid = (start + end) / 2;

           ForkJoinTask task1 = new ForkJoinTask(start, mid);
           task1.fork();

           ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);
           task2.fork();

           return task1.join() + task2.join();
       }
   }

}

這裏須要計算結果,因此任務繼承的是RecursiveTask類。ForkJoinTask須要實現compute方法,在這個方法裏首先須要判斷任務是否小於等於閾值1000,若是是就直接執行任務。不然分割成兩個子任務,每一個子任務在調用fork方法時,又會進入compute方法,看看當前子任務是否須要繼續分割成孫任務,若是不須要繼續分割,則執行當前子任務並返回結果。使用join方法會阻塞並等待子任務執行完並獲得其結果。

程序輸出:

test
500000000500000000
4992ms
--------------------
testForkJoin
500000000500000000
508ms

須要特別注意的是:

ForkJoinPool 使用submit 或 invoke 提交的區別:invoke是同步執行,調用以後須要等待任務完成,才能執行後面的代碼;submit是異步執行,只有在Future調用get的時候會阻塞。
這裏繼承的是RecursiveTask,還能夠繼承RecursiveAction。前者適用於有返回值的場景,然後者適合於沒有返回值的場景
這一點是最容易忽略的地方,其實這裏執行子任務調用fork方法並非最佳的選擇,最佳的選擇是invokeAll方法。

leftTask.fork();  
rightTask.fork();

替換爲

invokeAll(leftTask, rightTask);

具體說一下原理:對於Fork/Join模式,假如Pool裏面線程數量是固定的,那麼調用子任務的fork方法至關於A先分工給B,而後A當監工不幹活,B去完成A交代的任務。因此上面的模式至關於浪費了一個線程。那麼若是使用invokeAll至關於A分工給B後,A和B都去完成工做。這樣能夠更好的利用線程池,縮短執行的時間。

(5) characteristics方法

返回一個不可變的Characteristics集合,它定義了收集器的行爲——尤爲是關於流是否能夠並行歸約,以及可使用哪些優化的提示。
Characteristics是一個包含三個項目的枚舉。

  • UNORDERED——歸約結果不受流中項目的遍歷和累積順序的影響。
  • CONCURRENT——accumulator函數能夠從多個線程同時調用,且該收集器能夠並行歸約流。若是收集器沒有標爲UNORDERED,那它僅在用於無序數據源時才能夠並行歸約。
  • IDENTITY_FINISH——這代表完成器方法返回的函數是一個恆等函數,能夠跳過。這種狀況下,累加器對象將會直接用做歸約過程的最終結果。這也意味着,將累加器A不加檢查地轉換爲結果R是安全的。
@Override 
 public Set<Characteristics> characteristics() { 
 return Collections.unmodifiableSet(EnumSet.of( 
 IDENTITY_FINISH, CONCURRENT));
 }

2.進行自定義收集而不去實現Collector

Stream有一個重載的collect方法能夠接受另外三個函數——supplier、accumulator和combiner,其語義和Collector接口的相應方法返回的函數徹底相同。

List<Dish> dishes = menuStream.collect( 
 ArrayList::new,
 List::add,
 List::addAll);//它永遠都是一個IDENTITY_FINISH和CONCURRENT但並不是UNORDERED的收集器。
相關文章
相關標籤/搜索