併發容器學習—ConcurrentSkipListMap與ConcurrentSkipListSet

1、ConcurrentSkipListMap併發容器
1.ConcurrentSkipListMap的底層數據結構
    要學習ConcurrentSkipListMap,首先要知道什麼是跳錶或跳躍表(SkipList),跳錶鏈表的升級版,且是有序的,另外跳錶仍是一種不論查找、添加或是刪除效率能夠和平衡二叉樹媲美的層次結構(其時間複雜度是O(n)=log2n),更重要的是,跳錶的實現要簡單明瞭的多。
    上圖就是一個跳錶的層次結構圖,跳錶的特徵有:
    1.跳錶分爲若干層,層級越高跳躍性越大。
    2.跳錶的最底層的鏈表包含全部數據。
    3.跳錶是有序的。
    4.跳錶的頭結點永遠指向最高層的第一個元素。
    下面在來看看跳錶的索引結點的結構:通常分爲三個區域:兩個個指針域和一個數據存儲區域;兩個指針域分別是指向同層級下個索引的的next指針,和指向下個層級擁有相同數據的的down指針,以下圖所示。
    跳錶的查找相對簡單明瞭的多,從最高層的頭指針開始查詢,例以下圖查找14,從最高層3開始找,14大於3,找下個22,14小於22,說明最高層沒有14這個數。進入下一層,14比8大,比22小說明這一層也沒有該元素,在進入下一層。比8大日後找,找到14。
    下圖展現了14的查找路線,紅色線條表示對比後比14大,不進入索引的線路。
 
2.底層跳錶的實現
    最底層鏈表結點Node的定義:
 
static final class Node<K,V> {
    final K key;    //存儲鍵
    volatile Object value;    //存儲值
    volatile Node<K,V> next;    //下個結點

    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }

    Node(Node<K,V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }

        //不安全的操做變量,用於實現CAS操做
    private static final sun.misc.Unsafe UNSAFE;
    //value的內存地址偏移量
    private static final long valueOffset;
    //下個結點的內存地址偏移量
    private static final long nextOffset;


    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            valueOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("value"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

    Index的定義:
static class Index<K,V> {
    final Node<K,V> node;    //鏈表中結點的引用
    final Index<K,V> down;    //指向下一層的Index索引
    volatile Index<K,V> right;    //右邊的索引


    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }


    //不安全的操做變量
    private static final sun.misc.Unsafe UNSAFE;
    private static final long rightOffset;    //right的內存偏移量
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Index.class;
            rightOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("right"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
 
 
    HeadIndex是專門用在每一個層級的頭結點上的定義,它是繼承了Index的基礎上增長一個表明層級的level屬性:
 
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;    //層級索引
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

 

 
    HeadIndex、Index和Node之間的聯繫,以下圖所示:
 
3.ConcurrentSkipListMap的繼承關係
    ConcurrentSkipListMap的繼承關係以下圖所示,它繼承了AbstractMap類,而且實現了 ConcurrentNavigableMap和 ConcurrentMap接口。由此可知,ConcurrentSkipListMap必然是個有序的、併發操做安全的、具備伸縮視圖功能的Map集合。
    以前數據結構及源碼的學習過程當中,上圖的大部分接口及抽象類都已經學習過,僅有一個 ConcurrentNavigableMap接口沒有了解過:
 
public interface ConcurrentNavigableMap<K,V>
    extends ConcurrentMap<K,V>, NavigableMap<K,V>
{
    //返回當前map的子map,即將fromKey到toKey的子集合截取出來,fromInclusive和toInclusive
    //表示子集合中是否包含fromKey和toKey這兩個臨界鍵值對
    ConcurrentNavigableMap<K,V> subMap(K fromKey, boolean fromInclusive,
                                       K toKey,   boolean toInclusive);

    //返回不大於toKey的子集合,是否包含toKey由inclusive決定
    ConcurrentNavigableMap<K,V> headMap(K toKey, boolean inclusive);

    //返回不小於fromKey的子集合,是否包含fromKey由inclusive決定
    ConcurrentNavigableMap<K,V> tailMap(K fromKey, boolean inclusive);

    //返回大約fromKey,且小於等於toKey的子集合
    ConcurrentNavigableMap<K,V> subMap(K fromKey, K toKey);

    //返回小於toKey的子集合
    ConcurrentNavigableMap<K,V> headMap(K toKey);

    //返回大於等於fromKey的子集合
    ConcurrentNavigableMap<K,V> tailMap(K fromKey);

        //返回一個排序是反序的map(即將本來正序的Map進行反序排列)
    ConcurrentNavigableMap<K,V> descendingMap();

    //返回全部key組成的set集合
    public NavigableSet<K> navigableKeySet();

    //返回全部key組成的set集合
    NavigableSet<K> keySet();

    //返回一個key的set視圖,而且這個set中key是反序的
    public NavigableSet<K> descendingKeySet();
}

 

    瞭解了接口,再來看ConcurrentSkipListMap的構造方法及一些重要的屬性:
 
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
    //特殊值,用於
    private static final Object BASE_HEADER = new Object();

    //跳錶的頭索引
    private transient volatile HeadIndex<K,V> head;

    //比較器,用於key的比較,若爲null,則使用天然排序(key自帶的Comparable實現)
    final Comparator<? super K> comparator;

    //鍵的視圖
    private transient KeySet<K> keySet;
    
    //鍵值對的視圖
    private transient EntrySet<K,V> entrySet;
    
    //值的視圖
    private transient Values<V> values;

    //反序排列的map
    private transient ConcurrentNavigableMap<K,V> descendingMap;

    private static final int EQ = 1;
    private static final int LT = 2;
    private static final int GT = 0;


    //空構造
    public ConcurrentSkipListMap() {
        this.comparator = null;
        initialize();
    }


    //帶比較器的構造方法
    public ConcurrentSkipListMap(Comparator<? super K> comparator) {
        this.comparator = comparator;
        initialize();
    }

    //帶初始元素集合的構造方法
    public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
        this.comparator = null;
        initialize();
        putAll(m);
    }

    //使用m集合的比較器構造方法
    public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
        this.comparator = m.comparator();
        initialize();
        buildFromSorted(m);
    }

    //初始化變量
    private void initialize() {
        keySet = null;    
        entrySet = null;
        values = null;
        descendingMap = null;

        //新建HeadIndex結點,head指向該頭結點
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
    }
}

 

4.put的過程
    
public V put(K key, V value) {
    if (value == null)    //這裏能夠知道Value要求不能爲null
        throw new NullPointerException();
    return doPut(key, value, false);    //實際執行put操做的方法
}

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)    //有序的map,key不能爲null,不然沒法比較
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;    //獲取比較器,若無則爲null,使用天然排序
    outer: for (;;) {    //外層循環

        //findPredecessor查找key的前驅索引對應的結點b
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            //判斷b結點是不是鏈表的尾結點,即b結點是否還有後繼結點
            //如有後繼結點(n不爲null表示有後繼結點),那麼要將key對應的結點插入b和n之間
            //若b爲尾結點,則key對應結點將爲新的尾結點
            if (n != null) {
                Object v; int c;
                Node<K,V> f = n.next;    //獲取n的後繼結點

                //判斷n仍是不是b的後繼結點
                //若n不在是b的後繼結點,說明已經有線程搶先在b結點以後插入一個新結點
                //退出內層循環,從新定位插入
                if (n != b.next)              
                    break;

                //判斷n結點是否要被刪除(value爲null表示邏輯刪除),輔助刪除後從新定位key
                if ((v = n.value) == null) {   // n is deleted
                    n.helpDelete(b, f);    //讓當前線程去輔助刪除(物理刪除)
                    break;
                }

                //判斷b是否要被刪除,若要被刪除,則退出當前循環,從新定位key
                if (b.value == null || v == n) // b is deleted
                    break;
                //判斷n結點的鍵與key的大小
                //這裏開始精準定位key在鏈表中的位置,找出key實際在鏈表中的前驅結點
                //key比n結點的key大,則繼續比較後續結點的key
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                //若key與n.key相等(c==0),說明key對應的結點已經存在
                //那麼根據onlyIfAbsent來決定是否覆蓋舊value值
                if (c == 0) {
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;    //更新成功,直接返回舊值
                    }
                    break;     //更新失敗繼續循環嘗試
                }
                // else c < 0; fall through
            }

            //到這說明b就是key的前驅結點
            //新建key的結點z,並嘗試更新b的next爲z,成功就退出外層循環
            //更新失敗繼續嘗試
            z = new Node<K,V>(key, value, n);
            if (!b.casNext(n, z))
                break;         // restart if lost race to append to b
            break outer;
        }
    }

    /**
    * 上面的步驟是將新結點插入到跳錶最底層的鏈表中
    * 接下來則是要向上生成各層的索引,是否生成某一層次的索引經過拋硬幣的方式
    * 決定,即從最底層開始隨機決定是否在上一層建議索引,若爲true則在上一層生成索引
    * 而且繼續拋硬幣決定是否再往上層創建索引;若失敗則布創建索引,直接結束
    */
    //生成一個隨機數
    int rnd = ThreadLocalRandom.nextSecondarySeed();

    //判斷是否要更新層級(隨機數必須是正偶數才更新層級)
    //這裏決定的是當前新增的結點是否要向上創建索引
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        int level = 1, max;
        
        //決定level的等級,也就是結點要向上創建基層索引
        //level的值有rnd從低二位開始有幾個連續的1決定
        //例如:rnd==0010 0100 0001 1110 那麼level就爲5
        //由此能夠看出跳錶的最高層次不能超過31層(level<=31)
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        HeadIndex<K,V> h = head;
        
        //判斷level是否超過當前跳錶的最高等級(便是否大於頭索引的level)
        //若未超過,那麼直接創建一個從1到level的縱向索引列
        if (level <= (max = h.level)) {
            //新建一個索引列,後建的索引的down指針指向前一個索引
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }

        //若新建結點的level大於頭結點的level,則要新建一個新的level層
        else { // try to grow by one level
            level = max + 1; // 只能新增一層,爲一個結點就新增2層以上沒有意義
            @SuppressWarnings("unchecked")
            Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1];

            //新建一個結點z的索引列,並將這個索引列的全部索引都放到數組中
            for (int i = 1; i <= level; ++i)    
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                h = head;    //獲取頭索引
                int oldLevel = h.level;      //獲取舊層次level  
                //判斷舊層次level是否發生改變(便是否有其餘線程搶先更新了跳錶的level,出現競爭)
                if (level <= oldLevel) // lost race to add level
                    break;    //競爭失敗,直接結束當前循環
                HeadIndex<K,V> newh = h;
                Node<K,V> oldbase = h.node;
                //生成新的HeadIndex節點
                //正常狀況下,循環只會執行一次,若是因爲其餘線程的併發操做致使 oldLevel 的值不穩定,那麼會執行屢次循環體
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);

                //嘗試更新頭索引的值
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        //將新增的idx插入跳錶之中
        splice: for (int insertionLevel = level;;) {
            int j = h.level;    //獲取最高level

            //查找須要清理的索引
            for (Index<K,V> q = h, r = q.right, t = idx;;) {

                //其餘線程併發操做致使頭結點被刪除,直接退出外層循環
                //這種狀況發生的機率很小,除非併發量實在太大
                if (q == null || t == null)
                    break splice;
                //判斷q是否爲尾結點
                if (r != null) {
                    Node<K,V> n = r.node;    //獲取索引r對應的結點n
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);    //比較新增結點的key與n.key的大小

                    //判斷n是否要被刪除
                    if (n.value == null) {
                        if (!q.unlink(r))    //刪除r索引
                            break;
                        r = q.right;
                        continue;
                    }

                    //c大於0,說明當前結點n.key小於新增結點的key
                    // 繼續向右查找一個結點的key大於新增的結點key的索引
                    //新結點對應的索引要插入q,r之間
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                //判斷是否
                if (j == insertionLevel) {
                    //嘗試着將 t 插在 q 和 r 之間,若是失敗了,退出內循環重試
                    if (!q.link(r, t))
                        break; // restart
                    //若是插入完成後,t 結點被刪除了,那麼結束插入操做
                    if (t.node.value == null) {
                        // 查找節點,查找過程當中會刪除須要刪除的節點
                        findNode(key);
                        break splice;
                    }
                    //到達最底層,沒有要插入新索引的索引層了
                    if (--insertionLevel == 0)
                        break splice;
                }

                //向下繼續連接其它index 層
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

//查找key在map中的位置的前驅結點
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        //獲取head頭索引,遍歷跳錶
        for (Index<K,V> q = head, r = q.right, d;;) {
            //判斷是否有後繼索引,如有
            if (r != null) {    
                Node<K,V> n = r.node;    //獲取索引對應的結點
                K k = n.key;    //獲取結點對應的key

                //判斷結點的value是夠爲null,爲null表示結點已經被刪除了
                //一個結點從鏈表中被刪除時,會將value賦爲null,由於此時跳錶其餘層級的索引還可能
                //會引用着該結點,value賦null,標識該結點已經被廢棄,對應的索引(若是存在)也應該從跳錶中刪除
                //那麼調用unlink方法刪除索引
                if (n.value == null) {
                    //嘗試將r索引移除出跳表,成功就繼續查找
                    //失敗就繼續嘗試
                    if (!q.unlink(r))
                        break;           // restart
                    r = q.right;         // reread r
                    continue;
                }

                //比較當前索引中結點的鍵k與key的大小
                //若key大於k,則繼續查找比較下個索引
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;    //下個右邊索引
                    continue;
                }
            }

            //如果key小於k,則判斷是夠到達跳錶的最底層的上一層,
            //如果到達最底層的上一層,則說明當前索引就是key的前驅索引,node就是對應的結點
            //若不是最底層的上一層,則繼續比較查找
            if ((d = q.down) == null)
                return q.node;
            q = d;
            r = d.right;
        }
    }
}

//Index中的方法,斷開兩個索引之間的聯繫
final boolean unlink(Index<K,V> succ) {
    return node.value != null && casRight(succ, succ.right);
}

//CAS方式更新當前索引的右邊索引
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}

//幫助將b結點從鏈表中刪除
void helpDelete(Node<K,V> b, Node<K,V> f) {
    //判斷是否有其餘線程已經將n刪除(有競爭),如果已經刪除直接返回
    if (f == next && this == b.next) {
        //判斷f是否爲null,即當前要刪除的this結點是否爲尾結點
        //若this爲尾結點,則嘗試更新this的後繼爲new Node<K,V>(f),這裏沒看懂。。
        if (f == null || f.value != f) // not already marked
            casNext(f, new Node<K,V>(f));
        else
            b.casNext(this, f.next);    //嘗試將當前結點this的前驅結點b的next更新爲this的後繼結點f
    }
}

//CAS方式更新當前結點的後繼爲val
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

 

    上面doPut的過程代碼比較多,總的來講幹了這麼幾件事:1.查找確認key在跳錶中的位置。2.清理跳錶,並檢查跳錶是否被其餘線程改變,如果改變從新定位key。3.判斷key是否已經在跳錶中存在,存在就根據onlyIfAbsent決定是否覆蓋。4。將key生成的結點node插入底層鏈表中。5.決定是否建立key對應的索引列,索引列要建幾層。6.最後將新增的索引列連接到跳錶中。
 
5.get的過程
 
public V get(Object key) {
    return doGet(key);
}

private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {    //外層循環
        //查找key的前驅索引對應的結點
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //n爲null,說明b是爲結點,則key不存在
            if (n == null)
                break outer;
            Node<K,V> f = n.next;

            //b的後繼不爲n,說明b的後繼別其餘線程改變,從新定位key
            if (n != b.next)                // inconsistent read
                break;
            //判斷n是否已經被刪除,如果則輔助刪除,並從新定位key
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
        
            //判斷b是否已經被刪除,如果則輔助刪除,並從新定位key
            if (b.value == null || v == n)  // b is deleted
                break;
            
            //判斷n是不是要查找的結點
            //c==0,表示key==n.key,找到要查找的額結點
、          //c如果小於0,那麼key就不存在,結束查找
            //c大於0,說明還沒找到,則繼續遍歷查找
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

 

6.remove的過程
 
public V remove(Object key) {
    return doRemove(key, null);
}

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //findPredecessor查找key的前驅索引對應的結點
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //n爲null,說明b是爲結點,則key不存在
            if (n == null)
                break outer;
            Node<K,V> f = n.next;


            //b的後繼不爲n,說明b的後繼別其餘線程改變,從新定位key
            if (n != b.next)                // inconsistent read
                break;
            //判斷n是否已經被刪除,如果則輔助刪除,並從新定位key
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            //判斷b是否已經被刪除,如果則輔助刪除,並從新定位key
            if (b.value == null || v == n)  // b is deleted
                break;

            //key不存在,直接結束刪除
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;

            //當前結點不是要刪除的結點,繼續遍歷查找
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }

            //找到要刪除的結點,但已被別的線程搶先刪除,則直接結束刪除
            if (value != null && !value.equals(v))
                break outer;
            //嘗試邏輯刪除,失敗則繼續嘗試
            if (!n.casValue(v, null))
                break;
        
            //給節點添加刪除標識(next節點改成一個指向自身的節點)
            //而後把前繼節點的next節點CAS修改成next.next節點(完全解除n節點的連接)
            if (!n.appendMarker(f) || !b.casNext(n, f))
                findNode(key);                  // 清除已刪除的結點
            else {
                //刪除n節點對應的index,findPredecessor中有清除索引的步驟
                findPredecessor(key, cmp);      // clean index

                //判斷是否要減小跳錶的層級
                if (head.right == null)
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;    //返回刪除結點的value
        }
    }
    return null;
}

//自環起來標誌該結點要被刪除
boolean appendMarker(Node<K,V> f) {
    return casNext(f, new Node<K,V>(f));
}

 

7.size的統計
    ConcurrentSkipListMap的size也是個瞬時值,並能過度依靠:
//遍歷底層鏈表來統計結點的個數
public int size() {
    long count = 0;
    for (Node<K,V> n = findFirst(); n != null; n = n.next) {
        if (n.getValidValue() != null)
            ++count;
    }
    return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}

//獲取底層鏈表的頭結點
final Node<K,V> findFirst() {
    for (Node<K,V> b, n;;) {
        if ((n = (b = head.node).next) == null)
            return null;
        if (n.value != null)
            return n;
        n.helpDelete(b, n.next);    //協助刪除已被標記刪除的結點
    }
}
 
2、ConcurrentSkipListSet併發容器
1.ConcurrentSkipListSet
    ConcurrentSkipListSet的底層實現就是利用的ConcurrentSkipListMap,以加入的數據做爲ConcurrentSkipListMap的key,而value則恆爲一個布爾對象 Boolean. TRUE。所以其源碼就不在過多分析。
相關文章
相關標籤/搜索