JAVA-ConcurrentSkipListMap筆記

什麼是跳錶

跳錶(skip list)是在1990年由William Pugh提出的一種數據結構.JAVA裏也有對應的數據結構--ConcurrentSkipListMap.引用wiki中的一個gif來展現跳錶的插入過程.node

ConcurrentSkipListMap的數據結構

在ConcurrentSkipListMap中有三種類型的節點:Node<K,V>、Index<K,V>和HeadIndex<K,V>.數組

  • Node<K,V>:用於構建最底層的數據鏈表.裏面存儲真實的數據,K、V表示鍵值對.
static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;
    
    // 建立一個普通節點
    Node(K key, Object value, Node<K,V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }
    
    // 建立一個新的marker節點,它的value字段指向它本身,而且key是null(base-level-header的key也是null)
    Node(Node<K,V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }
    
    // 經過cas的方式設置value字段
    boolean casValue(Object cmp, Object val) {
        return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
    }
    
    // 經過cas的方式設置next字段.
    boolean casNext(Node<K,V> cmp, Node<K,V> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }
   
    boolean isMarker() {
        return value == this;
    }
    
    boolean isBaseHeader() {
        return value == BASE_HEADER;
    }
    
    // 該操做分兩步.
    // 1.建立一個marker節點,並將該marker節點的next指向f.
    // 2.將當前節點的next字段由f替換爲建立的marker節點.
    boolean appendMarker(Node<K,V> f) {
        return casNext(f, new Node<K,V>(f));
    }
    
    // 刪除當前節點
    // 若是未進行標記,則在當前節點以後添加標記節點.
    // 若是已作過標記,那麼直接調用cas操做,將b的next字段指向f.next(此時f是marker節點,f.next纔是真正的後繼節點)
    void helpDelete(Node<K,V> b, Node<K,V> f) {
         // 這裏分兩步操做,主要是減小協助刪除的線程之間的CAS干擾.
        if (f == next && this == b.next) {
            if (f == null || f.value != f) // not already marked
                casNext(f, new Node<K,V>(f));
            else
                b.casNext(this, f.next);
        }
    }
    
    V getValidValue() {
        Object v = value;
        if (v == this || v == BASE_HEADER)
            return null;
        @SuppressWarnings("unchecked") V vv = (V)v;
        return vv;
    }
    
    AbstractMap.SimpleImmutableEntry<K,V> createSnapshot() {
        Object v = value;
        if (v == null || v == this || v == BASE_HEADER)
            return null;
        @SuppressWarnings("unchecked") V vv = (V)v;
        return new AbstractMap.SimpleImmutableEntry<K,V>(key, vv);
    }
    // UNSAFE mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    private static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            // 獲取value字段的偏移量
            valueOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("value"));
            // 獲取next字段的偏移量
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
複製代碼
  • Index<K,V>:用於構建上層的鏈表層.
static class Index<K,V> {
    final Node<K,V> node; // 指向base-level Node節點
    final Index<K,V> down; // 指向下一層Index節點.
    volatile Index<K,V> right; // 指向右側Index節點.
    /**
     * Creates index node with given values.
     */
    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }
    
    // cas right字段
    final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
        return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
    }
    
    final boolean indexesDeletedNode() {
        return node.value == null;
    }
    
    final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
        Node<K,V> n = node;
        newSucc.right = succ;
        // 當前Index節點的node節點未被刪除,才經過cas將right字段指向newSucc節點.
        return n.value != null && casRight(succ, newSucc);
    }
    
    // 解綁,使當前Index節點的right字段跳過succ節點.
    final boolean unlink(Index<K,V> succ) {
        return node.value != null && casRight(succ, succ.right);
    }
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long rightOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Index.class;
            rightOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("right"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
複製代碼
  • HeadIndex<K,V>:頭結點.其屬性level用於記錄層數.bash

  • 結構數據結構

ConcurrentSkipListMap的結構相似於上圖.

1.head頭結點位於最左側.HeadIndex節點的個數決定了層數.
2.上層的鏈表由Index節點組成.
3.最底層的鏈表結構由Node節點組成.
4.其中淺黃的N表明指向底層Node節點的指針.
5.D表明Down指針,指向下一層的Index節點.
6.淺綠色的N表明next指針,指向下個一個Node節點.
7.L1和L2後面的數字表明HeadIndex節點處於的層數.
複製代碼

put操做

ConcurrentSkipListMap的put操做(doPut方法)能夠分解爲兩個步驟.app

  • 底層Node鏈表的構造
Node<K,V> z;             // added node
if (key == null)
    throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 0.外層outer循環.
outer: for (;;) {
    // 1.每次循環查詢key的前置Node節點(b),並獲取前置節點的下一個節點(n)
    for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
        //2.判斷b的後繼節點n是否爲null.爲null跳轉到8
        if (n != null) {
            Object v; int c;
            Node<K,V> f = n.next;
            // 3.若是n再也不是b的下一個節點,說明結構已被其餘線程修改.跳到1繼續循環
            if (n != b.next)               // inconsistent read
                break;
            //4. 節點n被刪除的狀況,刪除節點n,跳到1繼續循環
            if ((v = n.value) == null) {   // n is deleted
                n.helpDelete(b, f);
                break;
            }
            //5. 節點b被刪除,跳到1繼續執行
            if (b.value == null || v == n) // b is deleted
                break;
            
            //6. 比較傳入的key和n的key,比較結果大於0.將b和n都後移,而後跳到2繼續執行
            if ((c = cpr(cmp, key, n.key)) > 0) {
                b = n;
                n = f;
                continue;
            }
            //7. key相等
            if (c == 0) {
                // 若是onlyIfAbsent爲true或者value替換成功,則返回老的value值,結束doPut方法.
                if (onlyIfAbsent || n.casValue(v, value)) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                // 跳轉到1.
                break; // restart if lost race to replace value
            }
            // else c < 0; fall through
  
        //8. 若是n爲null,直接新建Node節點z
        z = new Node<K,V>(key, value, n);
        //9. 經過cas操做將b的next指針指向z.
        if (!b.casNext(n, z))
            //10. cas操做失敗,跳到1繼續執行.
            break;         // restart if lost race to append to b
        //11. cas成功,跳出outer循環,進行Index節點的構建.
        break outer;
    }
}
複製代碼

流程dom

  • 上層Index鏈表的構造(可能有多層)
// 獲取隨機數,用於計算層數
int rnd = ThreadLocalRandom.nextSecondarySeed();
//0. 判斷最高位和最低位是否都爲0,rnd的最高位和最低位都爲0的狀況才構建Index
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
    int level = 1, max;
    //1. 對rnd的每一位進行判斷,當bit位爲1時將level+1
    while (((rnd >>>= 1) & 1) != 0)
        ++level;
    Index<K,V> idx = null;
    HeadIndex<K,V> h = head;
    //2. 判斷level是否小於等於當前的層數.true,執行3;false,執行4.
    if (level <= (max = h.level)) {
        //3. 建立level個Index節點,每一個Index節點的down指針指向上一個Index節點.第一個Index節點的down指針指向null.
        for (int i = 1; i <= level; ++i)
            // z爲須要插入的Node節點,讓全部建立的index節點都指向z.
            idx = new Index<K,V>(z, idx, null);
    }
    else { // try to grow by one level
        //4. 將level設置爲max + 1.若是當前跳錶層數是4,那麼level就是5.
        level = max + 1; // hold in array and later pick the one to use
        //5. 建立Index數組,這裏的數組大小是level+1,由於下標0不存儲數據.
        @SuppressWarnings("unchecked")Index<K,V>[] idxs =
            (Index<K,V>[])new Index<?,?>[level+1];
        //6. 建立Index節點,並放入idxs數組中.
        for (int i = 1; i <= level; ++i)
            idxs[i] = idx = new Index<K,V>(z, idx, null);
        //7. 循環.
        for (;;) {
            //8. 從新獲取頭節點
            h = head;
            //9. 從新獲取跳錶的層數
            int oldLevel = h.level;
            //10. 比較level和跳錶層數.true,執行14;false,執行11.
            if (level <= oldLevel) // lost race to add level
                break;
            HeadIndex<K,V> newh = h;
            Node<K,V> oldbase = h.node;
            //11. 構造新的Head節點.此時level是大於oldLevel的.這裏舉個例子(E1):好比原來的層數是3,而level是4,那麼就須要新增1個HeadIndex節點,那麼HeadIndex也就變成了4層.最後獲得的newh也就是最上層的HeadIndex節點.
            for (int j = oldLevel+1; j <= level; ++j)
                newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                
            //12. 經過cas操做將head節點替換爲新的最上層的headIndex節點.操做成功,執行14;操做失敗,跳到7繼續循環.
            if (casHead(h, newh)) {
                h = newh;
                //*13*. 這裏將level設置爲了oldLevel.並將idx指向idxs[level],參照E1,那麼這裏idx就是指向idxs[3].之因此要這樣作,是由於以前跳錶只有3層,如今變成了4層,而且在step11的時候建立了新的HeadIndex,而且直接將右指針指向了idxs[4],所以後面的操做只須要將idxs[1]、idxs[2]、idxs[3]插入到對應層的Index鏈表中便可.
                idx = idxs[level = oldLevel];
                break;
            }
        }
    }
    // find insertion points and splice in
    splice: for (int insertionLevel = level;;) {
        //14.
        int j = h.level;
        //15. 初始化數據.
        for (Index<K,V> q = h, r = q.right, t = idx;;) {
            //16. q(當前層的headIndex節點)爲null或t(要插入的Index節點)爲null.跳出splice循環,結束方法.
            if (q == null || t == null)
                break splice;
            //17. 判斷r是否爲null.true,執行18;false,執行25.
            if (r != null) {
                //18. n爲r所指向的Node節點.
                Node<K,V> n = r.node;
                // compare before deletion check avoids needing recheck
                //19. 獲取key值比較的結果.
                int c = cpr(cmp, key, n.key);
                //20. 判斷n是否被刪除.
                if (n.value == null) {
                    //21.斷開q與r的鏈接,也就是經過cas操做使q的right指針指向r的右繼Index節點.
                    if (!q.unlink(r))
                        //22. 操做失敗,有其餘線程在修改,跳到14繼續執行.
                        break;
                    //23. 操做成功,讓r從新指向q的右繼節點,跳到16繼續執行.
                    r = q.right;
                    continue;
                }
                if (c > 0) {
                    //24. c大於0.將q和r右移,跳到16繼續執行.
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            //25. 判斷j和insertionLevel是否相等.相等才插入Index節點.
            // 這裏j爲當前頭結點h的層數(有可能執行到這裏時,其餘線程又增長了HeadIndex節點的個數,所以此時的h指向的並不必定是最高層的HeadIndex節點,可是不要緊,咱們此次的插入操做只須要關心h所指向的HeadIndex節點及其下層的HeadIndex節點所在的Index鏈表).
            // insertionLevel指的是須要插入t的層數.
            // 仍是拿E1來舉例,若是操做都成功,那麼j就是4,而insertionLevel爲3,因此這時不能插入,由於此時t指向的Index節點是屬於第3層的,須要將q = q.down,直到q也是第3層時才能對t進行插入操做.
            if (j == insertionLevel) {
                //26. 插入t
                if (!q.link(r, t))
                    // 不成功則跳到14繼續執行.
                    break; // restart
                // 若是新插入的節點被刪除
                if (t.node.value == null) {
                    // 這裏的findNode方式實際上是爲了輔助刪除,在遍歷的鏈路上的被標記刪除的節點都會被完全刪除掉.並跳出splice循環,結束put操做.
                    findNode(key);
                    break splice;
                }
                //27. insertionLevel自減,等於0說明操做完成,跳出splice循環,結束方法.
                if (--insertionLevel == 0)
                    break splice;
            }

            //*28*. j自減.
            // 正常狀況,當執行完27以後,--j應該是等於insertionLevel,而且j也是小於level的,這個時候將t下移沒有問題.
            // 可是有一種異常狀況,仍是拿E1舉例.當j爲4,insertionLevel爲3時,因爲j != insertionLevel,所以j會一直自減直到j也爲3的時候,會執行25.
            // 若是j爲3的時候執行插入成功,那麼27執行後insertionLevel會變爲2,j也會變爲2,此時t將下移.
            // 而後繼續循環,好比當insertionLevel = 1的時候,若是走到26的時候插入失敗,那麼就會跳到14繼續執行,此時j和t都會被初始化.也就是說此時j又變成4了,而且t也再也不指向第1層的Index節點,而是指向第3層的Index節點(而此時第3層節點其實已經插入成功了),可是insertionLevel仍是1,因此在這種狀況下,--j > insertionLevel && j < level的狀況下,也須要將t下移,使其指向最後將要插入的Index節點.
            // 經過上面兩種狀況,所以if表達式中須要寫成 --j >= insertionLevel && j < level,而不是--j == insertionLevel && j < level
            if (--j >= insertionLevel && j < level)
                t = t.down;
            
            // q下移,以進行下一層的插入操做.
            q = q.down;
            r = q.right;
        }
    }
}
複製代碼

流程ui

Remove操做

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        // 0. 獲取前置Node節點b以及b的next節點n.
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            // 1.
            Object v; int c;
            if (n == null)
                // n爲null,由於b.key < key <= n.key,所以若是n爲null,那麼key所對應的節點應該也被刪除,所以直接跳到20.
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                    // inconsistent read
                // 2. 說明在以前有線程對跳錶的數據結構進行了修改,致使n != b.next,因此跳到0繼續執行.
                break;
            if ((v = n.value) == null) {        // n is deleted
                //3.value爲null,說明節點n被刪除了,因此此時協助刪除.參考數據結構中的Node類的方法解析.
                // 跳轉到0繼續執行.
                n.helpDelete(b, f);
                break;
            }
            
            if (b.value == null || v == n)      // b is deleted
                //4. b的value爲null或者v == n(說明n是一個marker節點)都說明節點b已被刪除.
                // 跳轉到0繼續執行.
                break;
            if ((c = cpr(cmp, key, n.key)) < 0)
                //5. key < n.key說明沒有須要刪除的節點,直接跳轉到20返回.
                break outer;
            if (c > 0) {
                //6. key > n.key,將b、n右移,而後跳轉到1繼續執行.
                b = n;
                n = f;
                continue;
            }
            // 參考remove方法
            if (value != null && !value.equals(v))
                break outer;
            //7. 經過cas操做將n的value設置爲null
            if (!n.casValue(v, null))
                //8. 失敗,跳轉到0繼續執行.
                break;
            
            if (!n.appendMarker(f) || !b.casNext(n, f))
                //9. 添加marker節點失敗或直接刪除節點n失敗,調用findNode方法
                findNode(key);                  // retry via findNode
            else {
                //10. 調用findPredecessor
                findPredecessor(key, cmp);      // clean index
                if (head.right == null)
                    //11. head的右邊節點爲null,則嘗試減少level.
                    tryReduceLevel();
            }
            @SuppressWarnings("unchecked") V vv = (V)v;
            return vv;
        }
    }
    // 20.
    return null;
}
複製代碼
相關文章
相關標籤/搜索