Comparator從Java1.2就出來了,可是在1.8的時候,又添加了大量的默認方法.html
compare() equals() reversed() //倒序 thenComparing(Comparator<? super T> other) //而後,再去比較. thenComparing( Function<? super T, ? extends U> keyExtractor, Comparator<? super U> keyComparator) //先經過第一個比較器,再執行第二個比較器...串聯 thenComparing() thenComparingInt() thenComparingLong() thenComparingDouble() reverseOrder() naturalOrder() nullsFirst() nullsLast() comparing () //靜態方法 comparing() comparingInt() comparingLong() comparingDouble()
### 從Demo代碼看Comparatorjava
package com.dawa.jdk8.stream2; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; //關於比較器comparator,案例詳解. public class MyComparator { public static void main(String[] args) { List<String> list = Arrays.asList("hello", "world", "welcome", "nihao"); //按照字母排序 Collections.sort(list); System.out.println(list); //按照字符串的長度. Collections.sort(list, (item1, item2) -> item1.length() - item2.length()); System.out.println(list); //按照字符串的長度降序排序. Collections.sort(list, (item1, item2) -> item2.length() - item1.length()); //使用方法引用 //長度排序 Collections.sort(list, Comparator.comparingInt(String::length)); System.out.println(list); //長度倒敘排序 Collections.sort(list, Comparator.comparingInt(String::length).reversed()); System.out.println(list); //使用lambda表達式實現上述兩個方法 // Collections.sort(list,Comparator.comparingInt(item->item.length()).reversed()); //這裏,reversed()方法,參數要的是Object類型. //參數的類型推斷. Collections.sort(list,Comparator.comparingInt((String item)->item.length()).reversed()); //這樣寫就好了. //問題:以前爲何會成功? 由於是從Stream<T> 類型開始推斷的,能夠獲取到原屬性的元素. //問題:爲何上述的類型推斷失敗了/? 看sort方法的 Comparator類的泛型<T>,T是傳入參數的泛型- <? super T>. // String上的類型.你沒指定,編譯器也沒辦法幫你指定. // public static <T> void sort(List<T> list, Comparator<? super T> c) { // list.sort(c); // } //如: Collections.sort(list,Comparator.comparingInt((Boolean item)->1).reversed()); //這樣不會被兼容.由於Boolean 不是 String的上類型. //如: Collections.sort(list,Comparator.comparingInt((Object item)->1).reversed()); //這樣就是能夠的. //如: Collections.sort(list,Comparator.comparingInt(item->item.length()); //這樣也是能夠的. } }
@SuppressWarnings({"unchecked", "rawtypes"}) public static <T> void sort(List<T> list, Comparator<? super T> c) { list.sort(c); }
關於: <? super T> 泛型的使用.須要注意.api
語義更寬泛,可是從實際結果類型,實際就是T類型自己.這個須要仔細思考一下.數組
//經過兩層比較,1:排序(升序) ,2:字母順序排序. 使用thenComparing() Collections.sort(list,Comparator.comparingInt(String::length).thenComparing(String.CASE_INSENSITIVE_ORDER));
thenComparing()方法源碼以下併發
/** * Returns a lexicographic-order comparator with another comparator. * If this {@code Comparator} considers two elements equal, i.e. * {@code compare(a, b) == 0}, {@code other} is used to determine the order. * * <p>The returned comparator is serializable if the specified comparator * is also serializable. * * @apiNote * For example, to sort a collection of {@code String} based on the length * and then case-insensitive natural ordering, the comparator can be * composed using following code, * 不區分大小寫,的實現. 技術上述案例. * <pre>{@code * Comparator<String> cmp = Comparator.comparingInt(String::length) * .thenComparing(String.CASE_INSENSITIVE_ORDER); * }</pre> * * @param other the other comparator to be used when this comparator * compares two objects that are equal. * @return a lexicographic-order comparator composed of this and then the * other comparator * @throws NullPointerException if the argument is null. * @since 1.8 */ default Comparator<T> thenComparing(Comparator<? super T> other) { Objects.requireNonNull(other); return (Comparator<T> & Serializable) (c1, c2) -> { int res = compare(c1, c2); return (res != 0) ? res : other.compare(c1, c2); }; }
前面比較器的結果等於0,這個thenComparing()纔會被調用. 就如三個長度相同的那三個數,纔會被二次排序.app
也就是說若是第一個比較器,可以排序,就用第一個,第一個排序不成再用第二個.框架
另外一種實現less
Collections. sort(list,Comparator.comparingInt(String::length). thenComparing((item1,item2)->item1.toLowerCase().compareTo(item2)));
另外一種實現socket
Collections.sort(list,Comparator.comparingInt(String::length).thenComparing(Comparator.comparing(String::toUpperCase)));
另外一種實現ide
Collections.sort(list,Comparator.comparingInt(String::length).thenComparing(Comparator.comparing(String::toLowerCase,Comparator.reverseOrder())));
上述幾個案例,主要就是對於 thenComparing()方法的不一樣使用實現.
那麼,下面這個方法的輸出結果是什麼?
Collections.sort(list,Comparator.comparingInt(String::length).thenComparing(Comparator.comparing(String::toLowerCase,Comparator.reverseOrder())));
再次重複一下:前面比較器的結果等於0,這個thenComparing()纔會被調用. 就如三個長度相同的那三個數,纔會被二次排序.也就是說若是第一個比較器,可以排序,就用第一個,第一個排序不成再用第二個.
多級排序
Collections.sort(list,Comparator.comparingInt(String::length).reversed() .thenComparing(Comparator.comparing(String::toLowerCase, Comparator.reverseOrder())) .thenComparing(Comparator.reverseOrder()));
JDK1.8以前,Collections裏面提供的方法是不多的,從JDK1.8以後,新增了大量的實現方法和具體的特化的實現.
避免了裝箱和拆箱操做.這也可能會影響性能.
實現Collector接口
public interface Collector<T, A, R> { Supplier<A> supplier(); BiConsumer<A, T> accumulator(); BinaryOperator<A> combiner(); Function<A, R> finisher(); Set<Characteristics> characteristics(); public static<T, R> Collector<T, R, R> of(Supplier<R> supplier, BiConsumer<R, T> accumulator, BinaryOperator<R> combiner, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(characteristics); Set<Characteristics> cs = (characteristics.length == 0) ? Collectors.CH_ID : Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH, characteristics)); return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, cs); } public static<T, A, R> Collector<T, A, R> of(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A, R> finisher, Characteristics... characteristics) { Objects.requireNonNull(supplier); Objects.requireNonNull(accumulator); Objects.requireNonNull(combiner); Objects.requireNonNull(finisher); Objects.requireNonNull(characteristics); Set<Characteristics> cs = Collectors.CH_NOID; if (characteristics.length > 0) { cs = EnumSet.noneOf(Characteristics.class); Collections.addAll(cs, characteristics); cs = Collections.unmodifiableSet(cs); } return new Collectors.CollectorImpl<>(supplier, accumulator, combiner, finisher, cs); } Characteristics { CONCURRENT, UNORDERED, IDENTITY_FINISH } }
自定義的收集器
package com.dawa.jdk8.stream2; import java.util.*; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; import static java.util.stream.Collector.Characteristics.IDENTITY_FINISH; public class MySetCollector<T> implements Collector<T,Set<T>,Set<T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("supplier invoked"); return HashSet<T>::new;// 返回一個HasHSet容器. } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumalator invoked");//累加器 return Set<T>::add; // return HashSet<T>::add; //不行,沒有靜態方法支持. 應該是 Supplier返回值的父類接口. 不能使用具體類型的set. } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("combiner invoked");//並行流的時候,合併中間結果 return (set1,set2)->{ set1.addAll(set2);return set1; }; } @Override public Function<Set<T>, Set<T>> finisher() {//合併結果類型.結果容器 System.out.println("finisher invoked"); // return ts -> ts; return Function.identity(); //底層是同樣的. 同一性. } @Override public Set<Characteristics> characteristics() { System.out.println("charcteristics invoked "); return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH)); } public static void main(String[] args) { List<String> list = Arrays.asList("hello", "world", "welcome"); Set<String> collect = list.stream().collect(new MySetCollector<>()); System.out.println(collect); } }
public static void main(String[] args) { List<String> list = Arrays.asList("hello", "world", "welcome"); Set<String> collect = list.stream().collect(new MySetCollector<>()); System.out.println(collect); }
@Override @SuppressWarnings("unchecked") public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); }
IDENTITY_FINISH的字段特別重要,在這裏使用
public static <T, I> TerminalOp<T, I> makeRef(Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; }
使用這個案例去理解運做過程.
把一個set集合進行收集,咱們對結果作一個加強.(原來是直接放在set當中了.)咱們如今放在Map當中.
聲明一個Collector類,要求.
示例輸入:["hello","world","hello world"]
示例輸出:[{hello,hello},{world,world},{hello world,hello world}
泛型:<T,T,T>
package com.dawa.jdk8.stream2; import java.util.*; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; public class MySetCollector2<T> implements Collector<T, Set<T>, Map<T,T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("supplier invoked"); return HashSet<T>::new; } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumulator invoked"); return Set::add; } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("combiner invoked"); return (set1, set2) -> { set1.addAll(set2); return set1; }; } @Override public Function<Set<T>, Map<T, T>> finisher() { //這裏必定會被調用.由於結果類型和最終類型不一樣 //示例輸入:["hello","world","hello world"] //示例輸出:[{hello,hello},{world,world},{hello world,hello world} System.out.println("finisher invoked"); return set ->{ Map<T, T> map = new HashMap<>(); set.stream().forEach(item -> map.put(item, item)); return map; }; } @Override public Set<Characteristics> characteristics() { System.out.println("characteristics invoked"); return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED)); } public static void main(String[] args) { List<String> list = Arrays.asList("hello", "world", "hello", "welocome", "a", "b", "c", "d", "e"); HashSet<String> set = new HashSet<>(list); System.out.println("set:"+list); Map<String, String> collect = set.stream().collect(new MySetCollector2<>()); System.out.println(collect); } }
若是多一個參數:
return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.IDENTITY_FINISH));
則會出現類型轉換異常.
/** * Indicates that the finisher function is the identity function and * can be elided. If set, it must be the case that an unchecked cast * from A to R will succeed. */ IDENTITY_FINISH
若是定義這個屬性,則表明 indentity和 finish 是同一個類型的,要執行強制類型轉換.因此會出現上述異常.
收集器是什麼特性的,都是由這個Characteristics類來由你定義的.
因此你必需要理解你寫的程序的類型.才能正確的使用這個枚舉定義類.
分支合併框架ForkJoinPoll(並行流)
對程序進行必定的改造,打印出相應的線程名稱
@Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("accumulator invoked"); return (set,item)->{ System.out.println("accumulator:"+ Thread.currentThread().getName()); set.add(item); }; }
Map<String, String> collect = set.Stream().collect(new MySetCollector2<>());
運行結果以下:
Map<String, String> collect = set.parallelStream().collect(new MySetCollector2<>());
運行結果以下.
若是加上 Characteristics.CONCURRENT.
@Override public Set<Characteristics> characteristics() { System.out.println("characteristics invoked"); return Collections.unmodifiableSet(EnumSet.of(Characteristics.UNORDERED,Characteristics.CONCURRENT)); }
則可能會出來一個異常
Caused by: java.util.ConcurrentModificationException
若是不加 ,則不會出現異常
多執行幾回,會有必定的發現.
查看屬性的源碼.
/** * Indicates that this collector is <em>concurrent</em>, meaning that * the result container can support the accumulator function being * called concurrently with the same result container from multiple * threads. * * <p>If a {@code CONCURRENT} collector is not also {@code UNORDERED}, * then it should only be evaluated concurrently if applied to an * unordered data source. */ CONCURRENT,
出現問題的緣由:是在打印了set集合.
/** * This exception may be thrown by methods that have detected concurrent * modification of an object when such modification is not permissible. * <p> * For example, it is not generally permissible for one thread to modify a Collection * while another thread is iterating over it. In general, the results of the * iteration are undefined under these circumstances. Some Iterator * implementations (including those of all the general purpose collection implementations * provided by the JRE) may choose to throw this exception if this behavior is * detected. Iterators that do this are known as <i>fail-fast</i> iterators, * as they fail quickly and cleanly, rather that risking arbitrary, * non-deterministic behavior at an undetermined time in the future. * <p> * Note that this exception does not always indicate that an object has * been concurrently modified by a <i>different</i> thread. If a single * thread issues a sequence of method invocations that violates the * contract of an object, the object may throw this exception. For * example, if a thread modifies a collection directly while it is * iterating over the collection with a fail-fast iterator, the iterator * will throw this exception. * * <p>Note that fail-fast behavior cannot be guaranteed as it is, generally * speaking, impossible to make any hard guarantees in the presence of * unsynchronized concurrent modification. Fail-fast operations * throw {@code ConcurrentModificationException} on a best-effort basis. * Therefore, it would be wrong to write a program that depended on this * exception for its correctness: <i>{@code ConcurrentModificationException} * should be used only to detect bugs.</i> * * @author Josh Bloch * @see Collection * @see Iterator * @see Spliterator * @see ListIterator * @see Vector * @see LinkedList * @see HashSet * @see Hashtable * @see TreeMap * @see AbstractList * @since 1.2 */ public class ConcurrentModificationException extends RuntimeException { }
併發修改異常.
由於若是加上這個屬性,那麼這個就有一個結果集
並行的時候,會對set進行操做,可是你同時又在遍歷打印, 兩個趕到一塊兒了.而後就會拋出這個異常.
這就是拋出這個異常的根本緣由.
注意:若是是並行的話,千萬要避免 打印遍歷 你要操做的對象.
若是不加這個屬性,那麼combiner()方法的中間結果集就會被調用,因此就不會出現搶佔資源的現象.
Set<String> collect = list.stream().parallel().sequential().sequential().parallel().collect(new MySetCollector<>());
只有最後一個會生效.
/** * Returns an equivalent stream that is sequential. May return * itself, either because the stream was already sequential, or because * the underlying stream state was modified to be sequential. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @return a sequential stream */ S sequential();
/** * Returns an equivalent stream that is parallel. May return * itself, either because the stream was already parallel, or because * the underlying stream state was modified to be parallel. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @return a parallel stream */ S parallel();
修改代碼.查看 串行 和並行的 區別.
@Override public Supplier<Set<T>> supplier() { System.out.println("supplier invoked"); // return HashSet<T>::new;// 返回一個HasHSet容器. System.out.println("-----"); return HashSet::new; }
結論:串行的時候,會生成單個初始容器 / 並行的時候,會生成多個初始容器.
並非說串行的效率就必定比並行的效率低.這都是要看實際狀況的.
最多會生成系統最大CPU核心
超線程技術
題外話:當你具有一些底層基礎知識以後,你看一些東西會以爲是理所固然的.
若是你不具有這些知識的話,是看不懂的.雲裏霧裏的.
關注一下JDK提供的方法是怎麼實現的.對於Collectors靜態工廠類來講,其實現一共分爲兩種方式.
因此,全部的方法都是經過CollectorImpl來實現的.
static final Set<Collector.Characteristics> CH_CONCURRENT_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_CONCURRENT_NOID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED)); static final Set<Collector.Characteristics> CH_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_UNORDERED_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_NOID = Collections.emptySet();
public static <T, C extends Collection<T>> Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) { return new CollectorImpl<>(collectionFactory, Collection<T>::add, (r1, r2) -> { r1.addAll(r2); return r1; }, CH_ID); }
public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
public static <T> Collector<T, ?, Set<T>> toSet() { return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add, (left, right) -> { left.addAll(right); return left; }, CH_UNORDERED_ID); }
public static Collector<CharSequence, ?, String> joining() { return new CollectorImpl<CharSequence, StringBuilder, String>( StringBuilder::new, StringBuilder::append, (r1, r2) -> { r1.append(r2); return r1; }, StringBuilder::toString, CH_NOID); }
public static <T, U, A, R> Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper, Collector<? super U, A, R> downstream) { BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator(); return new CollectorImpl<>(downstream.supplier(), (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)), downstream.combiner(), downstream.finisher(), downstream.characteristics()); }
collectingAndThen() 收集,而且作處理
原理:把IDENTITY_FINISH標識符給去掉.
爲何要去掉:不去掉的話,表示不會執行 finisher()方法.
public static<T,A,R,RR> Collector<T,A,RR> collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher) { Set<Collector.Characteristics> characteristics = downstream.characteristics(); if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) { if (characteristics.size() == 1) characteristics = Collectors.CH_NOID; else { characteristics = EnumSet.copyOf(characteristics); characteristics.remove(Collector.Characteristics.IDENTITY_FINISH); characteristics = Collections.unmodifiableSet(characteristics); } } return new CollectorImpl<>(downstream.supplier(), downstream.accumulator(), downstream.combiner(), downstream.finisher().andThen(finisher), characteristics); }
public static <T> Collector<T, ?, Long> counting() { return reducing(0L, e -> 1L, Long::sum); }
public static <T> Collector<T, ?, Optional<T>> minBy(Comparator<? super T> comparator) { return reducing(BinaryOperator.minBy(comparator)); }
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator) { return reducing(BinaryOperator.maxBy(comparator)); }
summingInt(),Long(),Double
爲何要用一個 int[1]? 最後還要返回一個數組中的單個數組呢?直接用一個數組行不行.
由於:不行,由於直接用數字,數字是不能被傳遞的. 數組自己是一個引用.是能夠改變的.數組自己就是一個容器.
public static <T> Collector<T, ?, Integer> summingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new int[1], (a, t) -> { a[0] += mapper.applyAsInt(t); }, (a, b) -> { a[0] += b[0]; return a; }, a -> a[0], CH_NOID); }
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) { return new CollectorImpl<>( () -> new long[2], (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; }, (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; }, a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID); }
public static <T> Collector<T, ?, T> reducing(T identity, BinaryOperator<T> op) { return new CollectorImpl<>( boxSupplier(identity), (a, t) -> { a[0] = op.apply(a[0], t); }, (a, b) -> { a[0] = op.apply(a[0], b[0]); return a; }, a -> a[0], CH_NOID); }
public static <T, K> Collector<T, ?, Map<K, List<T>>> groupingBy(Function<? super T, ? extends K> classifier) { return groupingBy(classifier, toList());//調用下面2個參數的重載和toList()方法 }
public static <T, K, A, D> Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier, Collector<? super T, A, D> downstream) { return groupingBy(classifier, HashMap::new, downstream);//調用下面的三個參數的重載 }
downstream下游. (接受一個,返回一個. 返回的就叫下游)
T:分類器函數,輸入參數的類型.
K:分類器函數,返回的結果的類型.
D:返回的值的結果的類型.
HashMap::new :就是返回給客戶的Map/
好處:爲了給用戶更好的使用.直接返回HashMap
壞處:侷限了只能返回HashMap類型.
//groupBy函數的最底層實現. /** * Returns a {@code Collector} implementing a cascaded "group by" operation * on input elements of type {@code T}, grouping elements according to a * classification function, and then performing a reduction operation on * the values associated with a given key using the specified downstream * {@code Collector}. The {@code Map} produced by the Collector is created * with the supplied factory function. * * <p>The classification function maps elements to some key type {@code K}. * The downstream collector operates on elements of type {@code T} and * produces a result of type {@code D}. The resulting collector produces a * {@code Map<K, D>}. * * <p>For example, to compute the set of last names of people in each city, * where the city names are sorted: * <pre>{@code * Map<City, Set<String>> namesByCity * = people.stream().collect(groupingBy(Person::getCity, TreeMap::new, * mapping(Person::getLastName, toSet()))); * }</pre> * * @implNote * The returned {@code Collector} is not concurrent. For parallel stream * pipelines, the {@code combiner} function operates by merging the keys * from one map into another, which can be an expensive operation. If * preservation of the order in which elements are presented to the downstream * collector is not required, using {@link #groupingByConcurrent(Function, Supplier, Collector)} * may offer better parallel performance. * * @param <T> the type of the input elements * @param <K> the type of the keys * @param <A> the intermediate accumulation type of the downstream collector * @param <D> the result type of the downstream reduction * @param <M> the type of the resulting {@code Map} * @param classifier a classifier function mapping input elements to keys * @param downstream a {@code Collector} implementing the downstream reduction * @param mapFactory a function which, when called, produces a new empty * {@code Map} of the desired type * @return a {@code Collector} implementing the cascaded group-by operation * * @see #groupingBy(Function, Collector) * @see #groupingBy(Function) * @see #groupingByConcurrent(Function, Supplier, Collector) */ public static <T, K, D, A, M extends Map<K, D>> Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(container, t); }; BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings("unchecked") Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory; if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<Map<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID); } }
參數分析:
1.分類器: 輸入T類型,返回K類型 返回的Map的鍵,是K類型.
2.容器:HashMap
3.下游收集器: D爲下游收集器的返回的類型.
方法邏輯分析.
public static <T, K> Collector<T, ?, ConcurrentMap<K, List<T>>> groupingByConcurrent(Function<? super T, ? extends K> classifier) { return groupingByConcurrent(classifier, ConcurrentHashMap::new, toList()); } //ConcurrentHashMap 實現起來支持併發.
public static <T, K, A, D, M extends ConcurrentMap<K, D>> Collector<T, ?, M> groupingByConcurrent(Function<? super T, ? extends K> classifier, Supplier<M> mapFactory, Collector<? super T, A, D> downstream) { Supplier<A> downstreamSupplier = downstream.supplier(); BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BinaryOperator<ConcurrentMap<K, A>> merger = Collectors.<K, A, ConcurrentMap<K, A>>mapMerger(downstream.combiner()); @SuppressWarnings("unchecked") Supplier<ConcurrentMap<K, A>> mangledFactory = (Supplier<ConcurrentMap<K, A>>) mapFactory; BiConsumer<ConcurrentMap<K, A>, T> accumulator; //支持併發的同步的源碼: if (downstream.characteristics().contains(Collector.Characteristics.CONCURRENT)) { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); downstreamAccumulator.accept(resultContainer, t); }; } else { accumulator = (m, t) -> { K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); A resultContainer = m.computeIfAbsent(key, k -> downstreamSupplier.get()); synchronized (resultContainer) {//同步鎖. downstreamAccumulator.accept(resultContainer, t); } }; } if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_CONCURRENT_ID); } else { @SuppressWarnings("unchecked") Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher(); Function<ConcurrentMap<K, A>, M> finisher = intermediate -> { intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v)); @SuppressWarnings("unchecked") M castResult = (M) intermediate; return castResult; }; return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_CONCURRENT_NOID); } }
public static <T> Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) { return partitioningBy(predicate, toList());//調用徹底的重載方法. }
public static <T, D, A> Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate, Collector<? super T, A, D> downstream) { BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator(); BiConsumer<Partition<A>, T> accumulator = (result, t) -> downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t); BinaryOperator<A> op = downstream.combiner(); BinaryOperator<Partition<A>> merger = (left, right) -> new Partition<>(op.apply(left.forTrue, right.forTrue), op.apply(left.forFalse, right.forFalse)); Supplier<Partition<A>> supplier = () -> new Partition<>(downstream.supplier().get(), downstream.supplier().get()); if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) { return new CollectorImpl<>(supplier, accumulator, merger, CH_ID); } else { Function<Partition<A>, Map<Boolean, D>> finisher = par -> new Partition<>(downstream.finisher().apply(par.forTrue), downstream.finisher().apply(par.forFalse)); return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID); } }
本身提供的內部靜態類:
/** * Implementation class used by partitioningBy. */ private static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> { final T forTrue; final T forFalse; Partition(T forTrue, T forFalse) { this.forTrue = forTrue; this.forFalse = forFalse; } @Override public Set<Map.Entry<Boolean, T>> entrySet() { return new AbstractSet<Map.Entry<Boolean, T>>() { @Override public Iterator<Map.Entry<Boolean, T>> iterator() { Map.Entry<Boolean, T> falseEntry = new SimpleImmutableEntry<>(false, forFalse); Map.Entry<Boolean, T> trueEntry = new SimpleImmutableEntry<>(true, forTrue); return Arrays.asList(falseEntry, trueEntry).iterator(); } @Override public int size() { return 2; } }; } }
...
public interface Stream<T> extends BaseStream<T, Stream<T>> {}
package java.util.stream; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; import java.util.Iterator; import java.util.Spliterator; import java.util.concurrent.ConcurrentHashMap; import java.util.function.IntConsumer; import java.util.function.Predicate; /** * Base interface for streams, which are sequences of elements supporting * sequential and parallel aggregate operations. The following example * illustrates an aggregate operation using the stream types {@link Stream} * and {@link IntStream}, computing the sum of the weights of the red widgets: * * <pre>{@code * int sum = widgets.stream() * .filter(w -> w.getColor() == RED) * .mapToInt(w -> w.getWeight()) * .sum(); * }</pre> * * See the class documentation for {@link Stream} and the package documentation * for <a href="package-summary.html">java.util.stream</a> for additional * specification of streams, stream operations, stream pipelines, and * parallelism, which governs the behavior of all stream types. * * @param <T> the type of the stream elements * @param <S> the type of the stream implementing {@code BaseStream} * @since 1.8 * @see Stream * @see IntStream * @see LongStream * @see DoubleStream * @see <a href="package-summary.html">java.util.stream</a> */ public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable { /** * Returns an iterator for the elements of this stream. * * <p>This is a <a href="package-summary.html#StreamOps">terminal * operation</a>. * * @return the element iterator for this stream */ Iterator<T> iterator(); /** * Returns a spliterator for the elements of this stream. * * <p>This is a <a href="package-summary.html#StreamOps">terminal * operation</a>. * * @return the element spliterator for this stream */ Spliterator<T> spliterator(); /** * Returns whether this stream, if a terminal operation were to be executed, * would execute in parallel. Calling this method after invoking an * terminal stream operation method may yield unpredictable results. * * @return {@code true} if this stream would execute in parallel if executed */ boolean isParallel(); /** * Returns an equivalent stream that is sequential. May return * itself, either because the stream was already sequential, or because * the underlying stream state was modified to be sequential. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @return a sequential stream */ S sequential(); /** * Returns an equivalent stream that is parallel. May return * itself, either because the stream was already parallel, or because * the underlying stream state was modified to be parallel. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @return a parallel stream */ S parallel(); /** * Returns an equivalent stream that is * <a href="package-summary.html#Ordering">unordered</a>. May return * itself, either because the stream was already unordered, or because * the underlying stream state was modified to be unordered. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @return an unordered stream */ S unordered(); /** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the {@link #close()} method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of {@code close()}, with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */ S onClose(Runnable closeHandler); /** * Closes this stream, causing all close handlers for this stream pipeline * to be called. * * @see AutoCloseable#close() */ @Override void close(); }
package java.lang; /** * An object that may hold resources (such as file or socket handles) * until it is closed. The {@link #close()} method of an {@code AutoCloseable} * object is called automatically when exiting a {@code * try}-with-resources block for which the object has been declared in * the resource specification header. This construction ensures prompt * release, avoiding resource exhaustion exceptions and errors that * may otherwise occur. 一個對象在關閉以前,會持有一些資源. 句柄之類的. 在退出塊的時候,會自動調用close() 避免資源被耗盡等異常. * * @apiNote * <p>It is possible, and in fact common, for a base class to * implement AutoCloseable even though not all of its subclasses or * instances will hold releasable resources. For code that must operate * in complete generality, or when it is known that the {@code AutoCloseable} * instance requires resource release, it is recommended to use {@code * try}-with-resources constructions. However, when using facilities such as * {@link java.util.stream.Stream} that support both I/O-based and * non-I/O-based forms, {@code try}-with-resources blocks are in * general unnecessary when using non-I/O-based forms. * * @author Josh Bloch * @since 1.7 */ public interface AutoCloseable { /** * Closes this resource, relinquishing any underlying resources. * This method is invoked automatically on objects managed by the * {@code try}-with-resources statement. * * <p>While this interface method is declared to throw {@code * Exception}, implementers are <em>strongly</em> encouraged to * declare concrete implementations of the {@code close} method to * throw more specific exceptions, or to throw no exception at all * if the close operation cannot fail. * * <p> Cases where the close operation may fail require careful * attention by implementers. It is strongly advised to relinquish * the underlying resources and to internally <em>mark</em> the * resource as closed, prior to throwing the exception. The {@code * close} method is unlikely to be invoked more than once and so * this ensures that the resources are released in a timely manner. * Furthermore it reduces problems that could arise when the resource * wraps, or is wrapped, by another resource. * * <p><em>Implementers of this interface are also strongly advised * to not have the {@code close} method throw {@link * InterruptedException}.</em> * * This exception interacts with a thread's interrupted status, * and runtime misbehavior is likely to occur if an {@code * InterruptedException} is {@linkplain Throwable#addSuppressed * suppressed}. * * More generally, if it would cause problems for an * exception to be suppressed, the {@code AutoCloseable.close} * method should not throw it. * * <p>Note that unlike the {@link java.io.Closeable#close close} * method of {@link java.io.Closeable}, this {@code close} method * is <em>not</em> required to be idempotent. In other words, * calling this {@code close} method more than once may have some * visible side effect, unlike {@code Closeable.close} which is * required to have no effect if called more than once. * * However, implementers of this interface are strongly encouraged * to make their {@code close} methods idempotent. * * @throws Exception if this resource cannot be closed */ void close() throws Exception; }
使用Example去理解這個接口
public class AutoCloseableTest implements AutoCloseable { public static void main(String[] args) { try(AutoCloseableTest autoCloseableTest = new AutoCloseableTest()) { autoCloseableTest.doSomething(); } catch (Exception e) { e.printStackTrace(); } //這種寫法.try with source. } @Override public void close() throws Exception { System.out.println("close invoked"); } public void doSomething(){ System.out.println("doSomething invoked"); } }
運行結果: (實現了這個接口的類,會自動執行 close()方法.)
不是由於要讓你開發過程當中去
看了源碼以後,你使用的時候的信心就很是足.
在遇到問題的時候,你能快速的將問題fix掉.
1.看優秀的代碼
2.去學習別人的東西
3.用的多了就會變成本身的東西.