java8 Collector 接口

java8的Stream中的collect方法,用於對流中的數據進行歸集操做,collect方法接受的參數是一個Collector,忽略掉靜態方法後,Collector接口內容以下:java

public interface Collector<T, A, R>
    // 用於生成空的累加器實例,這個累加器的類型是A
    Supplier<A> supplier();
    // 生成一個用於執行歸約操做的BiConsumer<A,T>,A是supplier生成的累加器,T是數據流中的每一個元素的數據類型,能夠看做,把T累加到A
    BiConsumer<A, T> accumulator();
    // 並行歸集時,須要對多個累加器進行合併操做
    BinaryOperator<A> combiner();
    // 做用是把A轉換爲R作爲最終的返回值。
    Function<A, R> finisher();
    // 特性列表,
    Set<Characteristics> characteristics();

    /** 特性:如下注釋基本有道翻譯自原始文檔註釋[手動笑哭臉] */
    enum Characteristics {
        /** 支持並行歸集*/
	/** 若是收集器不一樣時是無序的(UNORDERED),那麼只在應用於無序數據源時,才應該並行歸集 */
        CONCURRENT,
        /** 無序的,收集器並不按照Stream中的元素輸入順序執行 */
        UNORDERED,
	/** 表示完成器finisher方法返回的函數是一個恆等函數,這時,累加器對象就是歸約操做的最終結果*/
	/** 若是設置,則必須是這樣一種狀況:從A到R的未檢查強制轉換將會成功。 */
        IDENTITY_FINISH
    }
}

這個接口仍是有一點點複雜的,3個泛型,5個方法,其中一個characteristics方法用於提供特性列表,其中最重要的就是CONCURRENT,是否容許並行歸集; 而另外4個方法,能夠返回4個函數,這4個函數各有用途;app

咱們梳理一下歸集操做的主要流程,看看在這個過程當中如何使用Collector函數

  1. 對一個數據集進行歸集,首先要進行遍歷,遍歷的過程當中,對數據集的每個元素,進行某種操做,這個操做動做,由accumulator方法提供,該方法返回一個BiConsumer<A, T>函數,此函數消費兩個參數,不返回值;該函數中<A, T>兩個泛型,其中 T 表示數據集中的元素,而 A 表示對元素操做過程當中產生的中間值進行存儲的臨時變量,即上面說的累加器;
  2. 上一步中提到了對數據操做的過程當中須要對計算結果進行臨時存儲,那就須要一個存儲的容器,該容器由supplier方法提供,該方法返回一個Supplier<A>函數,可產生一個類型爲A的對象;
  3. 若是是並行歸集的話,須要一個方法把各個子任務的歸集結果進行合併,combiner方法派上用場;
  4. 最後,計算完成後,還要返回一個歸集結果,finisher方法可獲得一個Function函數,把累加器A轉換成最終的響應結果R並返回;

看一下Stream.collect的源碼:工具


public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
        // 聲明一個存儲中間值的容器變量(累加器)
	A container;
	// 若是是並行流,而且知足:[1. collector特性中包含CONCURRENT(並行操做);2. 流是無序的,或者collector特性中包含了UNORDERED(無需按順序進行歸集);],執行下面的代碼進行並行歸集,由於這種狀況下,不須要專門提供一個將各個子任務進行合併的方法
        if (isParallel()
                && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
                && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
	    // 首先從supplier方法中得到一個容器
            container = collector.supplier().get();
	    // 拿到執行歸集操做的BiConsumer函數
            BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
	    // 遍歷集合,調用BiConsumer函數的accept方法,而且傳入存儲臨時值的累加器和當前遍歷到的元素,並行流中,forEach遍歷是並行的
            forEach(u -> accumulator.accept(container, u));
        }
        else {
	    // 不知足上述條件,就調用evaluate方法進行歸集
	    // evaluate的細節先不說了,由於說來話長
            container = evaluate(ReduceOps.makeRef(collector));
        }
	// 返回值,若是collector特性列表中包含了IDENTITY_FINISH,就返回容器自身,不然, collector.finisher()得到完成器,並傳入累加器進行完成操做,最終的結果做爲返回值
        return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
               ? (R) container
               : collector.finisher().apply(container);
    }

另外,Stream中的collect方法還有另外一個重載方法:ui

public final <R> R collect(Supplier<R> supplier,
                                   BiConsumer<R, ? super P_OUT> accumulator,
                                   BiConsumer<R, R> combiner) {
        	return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
    }

省去了Collector,入參直接接受三個函數,正是上面講到的三個函數,省去了對結果進行轉換的finisher,而實現方法更是直接調用了evaluate,省去了是否並行流和Collector的特徵判斷;lua


下面看一下Collector的工具類Collectors中提供的幾個經常使用實現;翻譯


Collectors.toList()code

public static <T> Collector<T, ?, List<T>> toList() {
            return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new,
					List::add,
                    			(left, right) -> {left.addAll(right); return left;},
					CH_ID);
    	}
		
	static final Set<Collector.Characteristics> CH_ID
            = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH));
  1. ArrayList::new,建立一個ArrayList做爲累加器;
  2. List::add,對流中元素的操做就是直接添加到累加器中;
  3. 若是是並行歸集,對子任務歸集結果進行全並的方法是 addAll,後一個子任務的結果直接所有添加到前一個子任務結果中;
  4. CH_ID是一個預約義好的特性列表,只有一個,IDENTITY_FINISH,表示累加器就是最終要返回的結果,不須要轉換;

也能夠看出Collectors.toList()歸集器的歸集結果是一個ArrayList,若是想轉換爲其它List的實現,還須要本身操做,或者定製一個歸集器;對象


Collectors.toMap接口

public static <T, K, U>
    Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                    Function<? super T, ? extends U> valueMapper) {
	// 調用另外一個toMap方法,最後一個參數表示supplier,生成累加器,一個HashMap
        return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
    }
	
	public static <T, K, U, M extends Map<K, U>>
    Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
                                Function<? super T, ? extends U> valueMapper,
                                BinaryOperator<U> mergeFunction,
                                Supplier<M> mapSupplier) {
	// 這個是要對元素進行的操做,調用倆個映射函數分別生成k和v,而後加入到累加器中
	// map.merge:若是給定key沒綁定值或值爲null,則綁定給定值,不然,執行重映射方法替換原來值或者刪除原來的值。
        BiConsumer<M, T> accumulator
                = (map, element) -> map.merge(keyMapper.apply(element),
                                              valueMapper.apply(element), mergeFunction);
	// 最後特性表仍然是CH_ID,直接返回累加器
        return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
    }

很少說,看註釋就好


分組,這個比較複雜,是組合歸集器:Collectors.groupingBy

// 傳入一個映射函數,用於分紅key,也就是咱們的分組依據
    public static <T, K> Collector<T, ?, Map<K, List<T>>>
    groupingBy(Function<? super T, ? extends K> classifier) {
	// 這裏除了分組依據,還傳入一個Collector,咱們稱之爲子歸集器,用於對分組後的數據進行歸集
        return groupingBy(classifier, toList());
    }
	public static <T, K, A, D>
    Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                          Collector<? super T, A, D> downstream) {
	// 這裏HashMap::new就是supplier,用於生成累加器
        return groupingBy(classifier, HashMap::new, downstream);
    }

	public static <T, K, D, A, M extends Map<K, D>>
    Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                  Supplier<M> mapFactory,
                                  Collector<? super T, A, D> downstream) {
		// 子歸集器的累加器
		Supplier<A> downstreamSupplier = downstream.supplier();
		// 子歸集器要進行的操做
		BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
		// 父歸集器的操做m表示累加器,t表示要操做的數據
        BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
			// 把數據映射成Key
            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
			// 從累加器中取出子累加器,若是沒有,用後面的函數生成一個並綁定
            A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
			// 子歸集器對子累加器和數據進行操做
            downstreamAccumulator.accept(container, t);
        };
	// 並行歸集的合併函數
        BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
        @SuppressWarnings("unchecked")
	// 累加器
        Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;

	// 若是子歸集器特性中包含IDENTITY_FINISH,默認狀況下子歸集器是一個toList()的結果,自己是IDENTITY_FINISH的
        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
	    // 最終仍是建立一個CollectorImpl
            return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
        }
        else {
	    // 子歸集器的完成函數
            @SuppressWarnings("unchecked")
            Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
	    // 父歸集器的完成函數
            Function<Map<K, A>, M> finisher = intermediate -> {
                intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                @SuppressWarnings("unchecked")
                M castResult = (M) intermediate;
                return castResult;
            };
            return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
        }
    }

比較複雜,大概的過程,不過結合註釋應該仍是能看的懂;

相關文章
相關標籤/搜索