Java基礎提升之Spliterator

        本篇系Java基礎提升第二篇,在上一篇中咱們介紹課java集合的快速失敗機制,其實上一篇與這一篇都是爲了咱們之後的集合類複習打基礎。         在Java8的集合類中實現了Collection接口的都有spliterator()這個方法,而這個方法又是Collection繼承了Iterable接口所得到的方法。因此爲了之後的集合類源碼的複習,咱們此次看一下Spliterator。java

        這篇文章咱們主要講解Spliterator這個接口以及使用方法。算法

總述

上面是源碼中對這個類的說明,這個類的對象用於遍歷很對數據源的切割。這個數據源能夠是數組、實現了Collection的類、IO通道或者生成器函數。         對於源碼的註釋咱們不一一的貼在這裏了,這裏咱們直接說說源碼中對這個接口的解釋。數組

  1. 一個Spliterator既可使用tryAdvance()對元素進行獨立的遍歷也能夠根據forEachRemaining()對元素進行批量的遍歷安全

  2. 一個Spliterator能夠經過trySplit將他的一些元素分割出去做爲另外一個Spliterator,去作可能的並行的操做。可是使用一個不可分割、高度不平衡或者效率低下的Spliterator不太可能從並行操做中受益。數據結構

  3. Spliterator中包含一個特徵集,這個特徵集的元素有如下幾種:ORDERED、DISTINCT、SORTED、SIZED、NONNULL、IMMUTABLE、CONCURRENT、SUBSIZED。Spliterator可使用他們來控制、指定或簡化計算。這些單詞表明的意思從其字面上也能夠看出來。例如一個從Collection中生成的spliterator會被標記爲SIZED表明是有界的,一個從Set中生成的Spliterator會被標記爲DISTINCT的,表明元素不重複。若是是從SortedSet中生成的Spliterator會被標記爲SORTED的表明元素根據必定規則被分類。 一些特徵量會額外的限制方法的行爲。好比若是Spliterator被標記爲ORDERED,既是有序的,那麼遍歷方法必須符合其記錄的順序。將來可能會定義新的特徵量,因此不該該將新的特徵量分配給未遍歷完的元素。框架

  4. 一個Spliterator 並無被標記爲IMMUTABLE(不可變)或CONCURRENT(同步的)應該有如下幾個內容的策略:當Spliterator綁定數據源的時候,而且對數據源結構干擾的檢測發生在綁定以後,一個在第一次遍歷、第一次分裂或者第一次估算數據源元素的大小才綁定數據源的後期綁定(late-bingding)的Spliterator比一建立就綁定數據源的Spliterator好。若是一個在構造函數或者調用任何方法以後就綁定數據源的非後期綁定的Spliterator,當Spliterator在遍歷的時候會反應綁定以前對數據源的修改。在綁定以後,若是結構干擾被檢測到一個Spliterator應該盡最大努力拋出ConcurrentModificationException異常,這種方式也叫快速失敗(fail-fast 咱們上篇文章介紹過)。批遍歷方法forEachRemaing()能夠優化遍歷在全部的元素遍歷完後檢查結構干擾,而不是對每個元素檢查結構干擾並當即失敗。ide

  5. Spliterator提供了一個estimateSize()方法來估算生於元素的數量。理想狀況下如特徵量中SIZED提到的,這個方法返回的值與成功便利所遇到的全部元素的數量一致。不過,即便這個值不是正確的,這個估算的值對於源上執行的操做也是有用的。好比可以幫助決定是否進一步分割或者順序遍歷其他元素。函數

  6. 儘管他們在並行算法中具備明顯的實用性,可是Spliterator不是線程安全的,相反,使用Spliterator的並行算法應確保Spliterator一次僅由一個線程使用。這一般很容易經過串行線程限制來實現。一個線程調用trySplit()可能會致使返回的Spliterator被移交到另外一個線程,在那個線程裏這個Spliterator可能會被遍歷也可能會被分割。若是兩個或者多個線程操做同一個Spliterator是遍歷仍是分割的表現是不肯定的。若是一個源線程把一個Spliterator交給另外一個線程去處理,那麼在移交以前最好用tryAdvance()把元素消費完,做爲某些保證(好比estimateSize()對被SIZED標記的spliterator的準確性)僅在遍歷開始以前有效。性能

  7. Spliterator的原始子類被OfInt、OfLong、OfDouble去提供,子類默認實現tyrAdvance(java.util.function.Consumer)以及forEachRemaining(java.util.funcion.Consumer)封裝原始值到其對應的包裝類型。這樣的裝箱過程可能會破壞使用原始數據類型所帶來的任何性能優點,爲了不裝箱,應使用相應的基於原始類型的方法。好比,  Spliterator.OfInt#tryAdvance(java.util.function.IntConsumer) 和    Spliterator.OfInt#forEachRemaining(java.util.function.IntConsumer)應該優先於    Spliterator.OfInt#tryAdvance(java.util.function.Consumer) 和   Spliterator.OfInt#forEachRemaining(java.util.function.Consumer).使用優化

  8. 遍歷原始類型值使用基於封箱的方法tryAdvance tryAdvance()} 和 forEachRemaining(java.util.function.Consumer) forEachRemaining() 並不影響所遇到的原始值轉到包裝值的順序。

  9. Spliterators 像Iterator 都是爲了遍歷數據源的元素。Spliterator接口被設計爲除順序遍歷外支持高效的並行遍歷 經過支持分解以及單元素的迭代。此外,經過Spliterator訪問元素的協議旨在實現比Iterator更小的單元素開銷,並避免涉及hasNext()與next()方法的固有競爭。

  10. 對於可變數據源,若是數據源在Spliterator綁定到其數據源與遍歷結束之間的時間中收到結構干擾(元素添加、替換或刪除)則可能發生任意和非肯定的行爲。例如,這樣的干擾會致使任意的、非肯定的結果當使用java.util.stream框架的時候。

  11. 一個數據源結構方面的干擾能夠經過如下幾個方式進行管理:

    1. 源不能在結構上收到干擾: 好比一個java.util.current.CopyOnWriteArrayList是一個不可變的源。從這個源上建立的Sqliterator被特徵量IMMUTABLE標表明不可變
    2. 源能夠同步修改: 例如java.util.concurrent.ConcurrentHashMap的key集是一個同步源。從這個源中建立的Spliterator被特徵量CONCURRENT標記表明能夠同步修改。
    3. 可變的源提供後期綁定和快速失敗機制的Spliterator: 後期綁定可以減小結構干擾影響計算的時間,快速失敗機制檢測檢測到遍歷開始之後結構干擾發生盡最大努力拋出ConcurrentModificationException。 例如 ArrayList 和許多JDK中非同步的類(Collection)提供有後期綁定和快速失敗機制的Spliterator
    4. 可變的源提供一個非後期綁定可是有快速失敗機制的Spliterator: 可是因爲不是後期綁定源,源結構受到干擾從而可能影響Spliterator的時間跨度變大了。
    5. 可變的源提供一個後期綁定可是沒有快速失敗機制的Spliterator: 在遍歷開始以後,由於干擾沒有被檢測,這個源將會表現出任意的,非肯定行的行爲。
    6. 可變的源提供一個既不是後期綁定也沒有快速失敗機制的Spliterator: 源增長了任意,非肯定性行爲的風險,由於在構造以後可能會發生未檢測到的干擾。

方法詳解

Spliterator包含了一下幾個主要方法:

  1. boolean tryAdvance(Consumer<? super T> action); 若是下一個元素存在,對這個存在的元素執行給定的action而且返回true,不然返回false.若是這個Spliterator被標記爲ORDERED,那麼則會按順序用給定的action執行下一個元素。 這個Consumer是java8中的概念,也就是函數接口,用法能夠自行百度,其實不用想太多,這就是至關於傳進來一個方法。後面的例子咱們會用到。也能夠從下面的例子中去體會。

  2. void forEachRemaining(Consumer<? super T> action); 這個就是Spliterator中的批操做了。在源碼中他是長這樣的:

這個方法會對剩下的每一個元素執行給定的操做,在當前線程中順序的執行,直到全部元素被處理完成或者這個action中拋出一個錯誤。若是Spliterator被標記爲ORDERED,action按照遭遇順序執行。action中拋出的錯誤會中繼給調用者。 這個方法的默認實現是調用tryAdvance()方法直到這個方法返回false。應該儘量的覆蓋它。

  1. Spliterator trySplit();

    若是源Spliterator能被分割,那麼返回一個Spliterator,這個Spliterator中的元素應該是從源Spliterator中分割出去的,兩個Spliterator中的元素不該該有交集。 若是Spliterator被標記爲ORDERED,那麼返回的Splitertor中的元素應該是總元素中的前面的部分。 除非Spliterator裏面有無數的元素,那麼重複調用trySplit()最後必定會返回null。若是不是null的話則:

    1. 分割前estimateSize()返回的值必定大於或等於分割後這個Spliterator調用estimateSize()返回的值,同時也會大於等於返回的那個Spliterator調用estimateSize()返回的值。

    2. 若是這個Spliterator被標記爲SUBSIZED,那麼這個Splitertor在分割前調用estimateSize()返回的值必定等於分割後這個Splitertor和返回的那個Splitertor調用estimateSize()值的和。

    3. 這個方法可能返回null在某些狀況下,包括元素爲空,在遍歷開始以後不能再分割,數據結構約束和效率考慮因素。

    4. 理想狀況下,這個方法將元素準確的分割成兩半從而實現平衡並行計算,可是不少偏離這種理想的狀況下也是很是有效的。例如大體分割近似平衡的樹,或者葉子節點可能包含一個或兩個元素,沒法進一步分割的樹。雖然在上述那種相似狀況下,效率也是不錯的,可是太低的平衡性和/或太低的效率一般會使trySplit()出現較低的並行性能。

  2. long estimateSize();

    1. 返回調用forEachRemaining()時,可能會遇到的元素數量的大約數。或者返回LONG.MAX_VALUE若是計算成本無限,未知或者太昂貴。

    2. 若是Spliterator被標記爲SIZED 並且並無被遍歷或者分割,又或者Spliterator被標記爲SUBSIZED而且還沒有部分遍歷,那麼這個返回值應該是一個準確的數字描述了一個完成的遍歷中會遇到的元素的數量。不然這個數多是任意的不許確的,可是必定會減小當調用了trySplit()方法後。

    3. 即便是不許確的估計一般也頗有用而且計算成本低廉。例如,近似平衡的二叉樹的子分裂器能夠返回一個值,該值估計元素的數量是其父元素的一半; 若是根Spliterator沒有保持準確的計數,它能夠估計大小爲對應於其最大深度的2的冪。

  3. int characteristics();

    返回這個Spliterator和他的原色的一個特徵集。具體的能夠查看源碼來搭配特徵。

代碼示例

此代碼爲Spliterator源碼中的例子:

package com.lichaobao.collectionsown.spliterators;

import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;

/** * @author lichaobao * @date 2019/5/20 * @QQ 1527563274 */
public class SourceExmple {
    public static void main(String[] args){
        Object[] a = new  Object[] {1,2,3,4,5,6,7,8,9};
        Object[] tag = new Object[]{"a","b","c","d","e","f","g","h","i"};
        TaggedArray taggedArray = new TaggedArray(a,tag);
        parEach(taggedArray,value ->{
            System.out.print(value+",");
        });
    }
    static <T> void parEach(TaggedArray<T> a,Consumer<T> action){
        Spliterator<T> s = a.spliterator();
        long targetBatchSize = s.estimateSize() / (ForkJoinPool.getCommonPoolParallelism() * 8);
        new ParEach(null, s, action, targetBatchSize).invoke();
    }
    static class ParEach<T> extends CountedCompleter<Void>{
       final Spliterator<T> spliterator;
       final Consumer<T> action;
       final long targetBatchSize;
       ParEach(ParEach<T> parent,Spliterator<T> spliterator,Consumer<T> action,long targetBatchSize){
             super(parent);
             this.spliterator = spliterator; this.action = action;
             this.targetBatchSize = targetBatchSize;
       }
        @Override
        public void compute() {
             Spliterator<T> sub;
             while (spliterator.estimateSize() > targetBatchSize &&
                    (sub = spliterator.trySplit()) != null) {
               addToPendingCount(1);
               new ParEach<>(this, sub, action, targetBatchSize).fork();
             }
             spliterator.forEachRemaining(action);
             propagateCompletion();
        }
    }
}
class TaggedArray<T>{
    private final Object[] elements;
    TaggedArray(T[] data,Object[] tags){
        int size = data.length;
        if(tags.length!=size) throw new IllegalArgumentException();
        this.elements = new Object[2*size];
        for(int i =0,j=0;i<size;++i){
            elements[j++] = data[i];
            elements[j++] = tags[i];
        }
    }
    public Spliterator<T> spliterator(){
        return new TaggedArraySpliterator<>(elements,0,elements.length);
    }
    static class TaggedArraySpliterator<T> implements Spliterator<T>{
        private final Object[] array;
        private int origin;
        private final int fence;

        public TaggedArraySpliterator(Object[] array, int origin, int fence) {
            this.array = array;
            this.origin = origin;
            this.fence = fence;
        }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            for (; origin < fence; origin += 2)
                action.accept((T) array[origin]);
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if(origin <fence){
                action.accept((T)array[origin]);
                origin+=2;
                return true;
            }else
                return false;
        }

        @Override
        public Spliterator<T> trySplit() {
             int lo = origin; // divide range in half
             int mid = ((lo + fence) >>> 1) & ~1; // force midpoint to be even
             if (lo < mid) { // split out left half
                origin = mid; // reset this Spliterator's origin
                return new TaggedArraySpliterator<>(array, lo, mid);
             }else       // too small to split
                return null;
        }
        @Override
        public long estimateSize() {
            return (long)((fence-origin)/2);
        }

        @Override
        public int characteristics() {
            return ORDERED|SIZED|IMMUTABLE|SUBSIZED;
        }
    }
}


複製代碼

運行結果以下:

相關文章
相關標籤/搜索