四. 用流收集數據java
收集器 -- Stream API的功能安全
reduce方法和collect的區別:reduce方法旨在把兩個值結合起來生成一個新的值,他是一個不可變的歸約。而collect方法的設計就是要改變容器,從而累積要輸出的結果。數據結構
1.分組:groupingBy框架
-- 多級分組ide
Collectors.groupingBy工廠方法建立收集器,它除了普通的分類函數外,還能夠接受collector類型的第二個參數。函數
進行二級分組時能夠把一個內層groupingBy傳遞給外層groupingBy,並定義一個爲流中項目分類的二級標準。性能
eg:測試
//多級分組:先根據種類分組,再在種類中根據卡路里進行分組 public void testDoubleGroup() { Map<DishType, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = getMenu().stream(). collect( Collectors.groupingBy(Dish:: getType, //一級分類函數 Collectors.groupingBy(dish -> { //二級分類函數 if(dish.getCalories() <= 400) return CaloricLevel.DIET; else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; else return CaloricLevel.FAT; }) ) ); }
-- 按子組收集數據編碼
把收集器的結果轉換爲另外一種類型,可使用Collectors.collectingAndThen工廠方法返回的收集器。spa
與groupingBy聯合使用的其餘收集器:經過groupingBy工廠方法的第二個參數傳遞的收集器將會對分到同一組中的全部流元素執行進一步的規約操做。
eg:
//按子組收集數據:與groupingby聯合使用的其餘收集器 public void testGroupMap() { Map<DishType, Integer> totalCaloriesByType = getMenu().stream().collect(Collectors.groupingBy(Dish::getType, Collectors.summingInt(Dish::getCalories))); for (Map.Entry<DishType, Integer> entry : totalCaloriesByType.entrySet()) { System.out.println("key = " + entry.getKey() + " and value = " + entry.getValue()); } }
//按子組收集數據 public void testGroupByMaxType() { Map<DishType, Dish> mostValoricByType = getMenu().stream() .collect( Collectors.groupingBy( //最外層 Dish::getType, //根據菜餚的類型把菜單流分組,獲得三個子流 Collectors.collectingAndThen( //分組操做的每一個子流都用這第二個收集器作進一步規約 Collectors.maxBy(Comparator.comparingInt(Dish::getCalories)), //第三個收集器maxBy Optional::get //由規約收集器進行子流的規約操做並進行Optional get轉換函數獲得各種型中熱量最高的Dish ) ) ); for (Map.Entry<DishType, Dish> entry : mostValoricByType.entrySet()) { System.out.println("key = " + entry.getKey() + " and value = " + entry.getValue().getName()); } }
2.分區:partitioningBy
分區是分組的特殊狀況。
分區:由一個謂詞(返回一個布爾值的函數)做爲分類函數,它成分區函數。
分區函數返回一個布爾值,故獲得的分組Map的鍵類型是Boolean,最多能夠分爲兩組(true,false)。
由於分區是分組的特殊狀況,分區函數也能進行二級分組和將收集器結果轉換爲另外一種類型。
Collectors類的靜態工廠方法:
工廠方法 |
返回類型 |
用於 |
例子 |
toList |
List<T> |
把流中全部項目收集到一個List |
List<Dish> dishes = menu.Stream.collect(toList()); |
toSet |
Set<T> |
把流中全部項目收集到一個Set,刪除重複項 | Set<Dish> dishes = menu.Stream.collect(toSet()); |
toCollection |
Collection<T> |
把流中全部項目收集到給定的供應源建立的集合 |
Collection<Dish> dishes = menuStream.collect(toCollection(), ArrayList::new); |
counting |
Long |
計算流中元素的個數 |
long howManyDishes = menuStream.collection(counting()); |
summingInt |
Integer |
對流中項目的一個整數屬性求和 |
int totalCalories = menuStream.collect(summingInt(Dish::getCalories)); |
averagingInt |
Double |
計算流中項目Integer屬性的平均值 |
double avgCalories = menuStream.collect(averagingInt(Dish::getCalories)); |
summarizingInt |
IntSummaryStatistics |
收集關於流中項目Integer屬性的統計值,例如最大/最小/總和與平均值 |
IntSummaryStatistics = menuStream.collect(summarizingInt(Dish::getCalories)); |
joining |
String |
鏈接對流中每一個項目調用toString方法所生成的字符串 |
String shortMenu = menuStream.map(Dish::getName).collect(joining(", ")); |
maxBy |
Optional<T> |
一個包裹了流中按照給定比較器選出最大元素的Optional,或若是流爲空Optional.empty() |
Optional<Dish> fattest = menuStream.collect(maxBy(comparingInt(Dish::getCaleries))); |
minBy |
Optional<T> |
一個包裹了流中按照給定比較器選出最小元素的Optional,或若是流爲空Optional.empty() |
Optional<Dish> lightest = menuStream.collect(minBy(comparingInt(Dish::getCaleries))); |
reducing |
規約操做產生的類型 |
從一個做爲累加器的初始值開始,利用BinaryOperator與流中元素逐個結合,從而將流規約爲單個值 |
int totalCalories = menuStream.collect(reducing(0, Dish::getCalories, Integer::sum)); |
collectingAndThen |
轉換函數返回的類型 |
包裹另外一個收集器,對其結果應用轉換函數 |
int howManyDishes = menuStream.collect(collectingAndThen(toList(), List::size)); |
groupingBy |
Map<K, List<T>> |
根據項目的一個屬性的值對流中的項目做問組,並將屬性值做爲結果Map的鍵 |
Map<Dish.Type, List<Dish>> dishesByType = menuStream.collect(groupingBy(Dish::getType)); |
partitioningBy |
Map<Boolean, List<T>> |
根據對流中每一個項目應用謂詞的結果來對項目進行分區 | Map<Boolean, List<Dish>> vegetarianDishes = menuStream.collect(partitioningBy(Dish::isVegetarian)); |
3.Collector接口
public interface Collector<T, A, R> { Supplier<A> supplier(); //創建新的結果容器 BiConsumer<A, T> accumulator(); //將元素添加到結果容器 Function<A, R> finisher(); //對結果容器應用最終轉換(返回累積過程的最後一個要調用的函數) BinaryOperator<A> combiner(); //合併兩個結果容器 Set<Characteristics> characteristics(); //返回Characteristic集合,定義了收集器的行爲(尤爲是關於流是否能夠並行規約) }
-- T是流中要收集的項目的泛型;
-- A是累加器的類型,累加器是在收集過程當中用於累積部分結果的對象;
-- R是收集操做獲得的對象(一般單並不必定是集合)的類型。
--Characteristic是一個包含三個項目的枚舉:
UNORDERED:歸約結果不受流中項目的遍歷和累積順序的影響。
CONCURRENT:accumulator函數能夠從多個線程同時調用,且該收集器能夠並行歸約流。若是收集器沒有標爲UNORDERED,那它僅在用於無序數據源時才能夠並行歸約。
INDENTITY_FINISH:代表完成器方法返回的函數是一個恆等函數,能夠跳過。這種狀況下,累加器對象會直接用做歸約過程的最終結果。這也意味着,累加器A不加檢查地轉換爲結果R是安全的。
eg:
/** * ToListCollector是IDENTITY_FINISH的,由於用來累積流元素中的List已是咱們要的最終結果,用不着進一步轉換, * 但它並非UNORDERED,由於用在有序流上的時候,咱們仍是但願順序可以保留在獲得的List中。 * 最後,他是CONCURRENT的,但它僅僅在背後的數據源無序時纔會並行處理 * @param <T> */ public class ToListCollector<T> implements Collector<T, List<T>, List<T>> { /** * 建立集合操做點的起始點 * @return */ @Override public Supplier<List<T>> supplier() { return ArrayList::new; } /** * 累積遍歷過的項目,原位修改累加器 * @return */ @Override public BiConsumer<List<T>, T> accumulator() { return List::add; } /** * 恆等函數 * @return */ @Override public Function<List<T>, List<T>> finisher() { return Function.identity(); } /** *修改第一個累加器, * 將其與第二個累加器的內容合併 * 返回修改後的第一個累加器 * @return */ @Override public BinaryOperator<List<T>> combiner() { return (list1, list2) -> { list1.addAll(list2); return list1; }; } /** *爲收集器添加IDENTITY_FINISH, CONCURRENT標記 * @return */ @Override public Set<Characteristics> characteristics() { return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT)); } }
五. 並行數據處理與性能
正確高效地使用並行流
1.並行流
並行流:是一個把內容分紅多個數據塊,並用不一樣的線程分別處理每一個數據塊的流。
能夠經過對收集源調用parallelStream方法來把集合轉換爲並行流。
並行流從哪兒來,有多少個,如何自定義:
並行流內部使用了默認的ForkJoinPool,它默認的線程數量就是你處理器的數量,這個值是由Runtime.getRuntime().availableProcessors()獲得的。
能夠經過系統屬性java.util.concurrent.ForkJoinPool.common.parallelism來改變線程池大小。這是一個所有設置。
並行化的代價:
並行化過程自己須要對流作遞歸劃分,把每一個子流的概括操做分配到不一樣的線程,而後把這些操做的結果合併成一個值。
在多個內核之間移動數據的代價也可能比你想象的要大。
因此要保證在內核中並行執行工做的時間比在內核之間傳輸數據時間長。必須用對並行Stream,不要採用不易並行haunted的操做如iterate。
使用並行流時要避免共享可變狀態。
高效使用並行流:
-- 測試;
-- 裝箱。自動裝箱和拆箱操做會大大下降性能。java8中有原始流類型(IntStream,LongStream,DoubleStream)來避免這種操做,單反有可能都應該使用這些流。
-- 有些操做自己在並行流上的性能就比順序流差。limit,findFirst等依賴元素順序的操做,能夠用findAny代替findFirst。能夠調用unordered方法來把有序流變成無序流。
-- 流的操做流水線的總計算成本。一個元素經過流水線的大體處理成本較高就意味着使用並行流時性能好的可能性較大;
-- 對於比較小的數據量,選擇並行流不是一個好的額決定;
-- 要考慮背後的數據結構是否易於分解。使用range工廠方法建立的原始類型流也能夠快速分解;
-- 流自身的特色,以及流水線中的中間操做修改流的方式,均可能會改變分解過程的性能;
-- 要考慮終端操做中合併步驟的代價是大是小。
流的數據源和可分解性:
源 |
可分解性 |
ArrayList |
極佳 |
LinkedList |
差 |
IntStream.range |
極佳 |
Stream.iterate |
差 |
HashSet |
好 |
TreeSet |
好 |
2.分支/合併框架
分支/合併框架的目的是以遞歸的方式將能夠並行的任務拆分紅更小的任務,而後將每一個子任務的結果合併起來生成總體結果。是ExecutorService接口的一個實現,他把子任務分配給線程池ForkJoinPool中的工做線程。
要把任務交到這個池,必須建立RecursiveTask<R>的一個子類,其中R是並行化任務(一級全部子任務)產生的結果類型,或者入股任務不返回結果,則是RecursiveActive類型。定義RecursiveTask只需實現compute抽象方法。
該方法的實現相似於:
if (任務足夠小或不可分) { 順序計算該任務 } else { 將任務分紅兩個子任務 遞歸調用本方法,拆分每一個子任務,等待全部子任務完成 合併每一個子任務的結果 }
3.Spliterator
可分迭代器。
和Iterator同樣,Spliterator也用於遍歷數據源中的元素。但他是爲了並行執行而設計的。
public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> var1); //相似於普通的Iterator,它會按順序一個一個使用Spliterator中的元素,若是還有其餘元素要遍歷就返回true Spliterator<T> trySplit(); //專門爲Spliterator接口設計的,它能夠把一些元素劃分出去給第二個Spliterator(由該方法返回),讓它們兩個並行處理 long estimateSize(); //估計還剩多少元素要遍歷 int characteristics(); //返回一個int,表明Spliterator自己特性集的編碼 }
Spliterator的特性:
特性 | 含義 |
ORDERED |
元素既定的順序(例如List),所以Spliterator在遍歷和劃分時也會遵循這一順序 |
DISTINCT |
對於任意一對遍歷過的元素x和y,x.equals(y)返回false |
SORTED |
遍歷的元素按照一個預約義的順序排序 |
SIZED |
該Spliterator由一個已知大小的源創建(例如Set),所以estimateSize()返回的是準確值 |
NONNULL |
保證遍歷的元素不會爲null |
IMMUTABLE |
Spliterator的數據源不能修改。這意味着在遍歷時不能添加、刪除或修改任何元素 |
CONCURRENT |
該Spliterator的數據源能夠被其餘線程同時修改而無需同步 |
SUBSIZED |
該Spliterator和全部從它拆分出來的Spliterator都是SIZED |