Java多線程進階(二五)—— J.U.C之collections框架:ConcurrentSkipListMap

WechatIMG26.jpeg

本文首發於一世流雲專欄: https://segmentfault.com/blog...

1、ConcurrentSkipListMap簡介

類繼承結構

在正式講ConcurrentSkipListMap以前,咱們先來看下ConcurrentSkipListMap的類繼承圖:java

clipboard.png

咱們知道,通常的Map都是無序的,也就是隻能經過鍵的hash值進行定位。JDK爲了實現有序的Map,提供了一個SortedMap接口,SortedMap提供了一些根據鍵範圍進行查找的功能,好比返回整個Map中 key最小/大的鍵、返回某個範圍內的子Map視圖等等。node

爲了進一步對有序Map進行加強,JDK又引入了NavigableMap接口,該接口進一步擴展了SortedMap的功能,提供了根據指定Key返回最接近項、按升序/降序返回全部鍵的視圖等功能。算法

同時,也提供了一個基於NavigableMap的實現類——TreeMap,TreeMap底層基於紅黑樹設計,是一種有序的Map。關於TreeMap和NavigableMap,本文不做贅述,讀者能夠查看Oracle的官方文檔:https://docs.oracle.com/javas...segmentfault

ConcurrentSkipListMap的由來

JDK1.6時,爲了對高併發環境下的有序Map提供更好的支持,J.U.C新增了一個ConcurrentNavigableMap接口,ConcurrentNavigableMap很簡單,它同時實現了NavigableMap和ConcurrentMap接口:
clipboard.pngapi

ConcurrentNavigableMap接口提供的功能也和NavigableMap幾乎徹底一致,不少方法僅僅是返回的類型不一樣:
clipboard.png數組

J.U.C提供了基於ConcurrentNavigableMap接口的一個實現——ConcurrentSkipListMap。ConcurrentSkipListMap能夠當作是併發版本的TreeMap,可是和TreeMap不一樣是,ConcurrentSkipListMap並非基於紅黑樹實現的,其底層是一種相似跳錶(Skip List)的結構。安全

2、Skip List簡介

什麼是Skip List

Skip List(如下簡稱跳錶),是一種相似鏈表的數據結構,其查詢/插入/刪除的時間複雜度都是O(logn)數據結構

咱們知道,一般意義上的鏈表是不能支持隨機訪問的(經過索引快速定位),其查找的時間複雜度是O(n),而數組這一可支持隨機訪問的數據結構,雖然查找很快,可是插入/刪除元素卻須要移動插入點後的全部元素,時間複雜度爲O(n)併發

爲了解決這一問題,引入了樹結構,樹的增刪改查效率比較平均,一棵平衡二叉樹(AVL)的增刪改查效率通常爲O(logn),好比工業上經常使用紅黑樹做爲AVL的一種實現。oracle

可是,AVL的實現通常都比較複雜,插入/刪除元素可能涉及對整個樹結構的修改,特別是併發環境下,一般須要全局鎖來保證AVL的線程安全,因而又出現了一種相似鏈表的數據結構——跳錶

Skip List示例

在講Skip List以前,咱們先來看下傳統的單鏈表:
clipboard.png

上圖的單鏈表中(省去告終點之間的連接),當想查找七、1五、46這三個元素時,必須從頭指針head開始,遍歷整個單鏈表,其查找複雜度很低,爲O(n)

來看下Skip List的數據結構是什麼樣的:
clipboard.png

上圖是Skip List一種可能的結構,它分了2層,假設咱們要查找「15」這個元素,那麼整個步驟以下:

  1. 從頭指針head開始,找到第一個結點的最上層,發現其指向的下個結點值爲8,小於15,則直接從1結點跳到8結點。
  2. 8結點最上層指向的下一結點值爲18,大於15,則從8結點的下一層開始查找。
  3. 從8結點的最下層一直向後查找,依次通過十、13,最後找到15結點。

上述整個查找路徑以下圖標黃部分所示:
clipboard.png

同理,若是要查找「46」這個元素,則整個查找路徑以下圖標黃部分所示:
clipboard.png


上面就是跳躍表的基本思想了,每一個結點不只僅只包含指向下一個結點的指針,可能還包含不少個其它指向後續結點的指針。而且,一個結點自己能夠當作是一個鏈表(自上向下連接)。這樣就能夠跳過一些沒必要要的結點,從而加快查找、刪除等操做,這實際上是一種「空間換時間」的算法設計思想。

那麼一個結點能夠包含多少層呢? 好比,Skip List也多是下面這種包含3層的結構(在一個3層Skip List中查找元素「46」):
clipboard.png

層數是根據一種隨機算法獲得的,爲了避免讓層數過大,還會有一個最大層數MAX_LEVEL限制,隨機算法生成的層數不得大於該值。後面講ConcurrentSkipListMap時,咱們會具體分析。

以上就是Skip List的基本思想了,總結起來,有如下幾點:

  1. 跳錶由不少層組成;
  2. 每一層都是一個有序鏈表;
  3. 對於每一層的任意結點,不只有指向下一個結點的指針,也有指向其下一層的指針。

3、ConcurrentSkipListMap的內部結構

介紹完了跳錶,再來看ConcurrentSkipListMap的內部結構就容易得多了:

clipboard.png

ConcurrentSkipListMap內部一共定義了3種不一樣類型的結點,元素的增刪改查都從最上層的head指針指向的結點開始:

public class ConcurrentSkipListMap2<K, V> extends AbstractMap<K, V>
    implements ConcurrentNavigableMap<K, V>, Cloneable, Serializable {
    /**
     * 最底層鏈表的頭指針BASE_HEADER
     */
    private static final Object BASE_HEADER = new Object();

    /**
     * 最上層鏈表的頭指針head
     */
    private transient volatile HeadIndex<K, V> head;

    /* ---------------- 普通結點Node定義 -------------- */
    static final class Node<K, V> {
        final K key;
        volatile Object value;
        volatile Node<K, V> next;

        // ...
    }

    /* ---------------- 索引結點Index定義 -------------- */
    static class Index<K, V> {
        final Node<K, V> node;      // node指向最底層鏈表的Node結點
        final Index<K, V> down;     // down指向下層Index結點
        volatile Index<K, V> right; // right指向右邊的Index結點

        // ...
    }

    /* ---------------- 頭索引結點HeadIndex -------------- */
    static final class HeadIndex<K, V> extends Index<K, V> {
        final int level;    // 層級

        // ...
    }
}

咱們來看下這3類結點的具體定義。

結點定義

普通結點:Node

普通結點——Node,也就是ConcurrentSkipListMap最底層鏈表中的結點,保存着實際的鍵值對,若是單獨看底層鏈,其實就是一個按照Key有序排列的單鏈表:

static final class Node<K, V> {
    final K key;
    volatile Object value;
    volatile Node<K, V> next;

    /**
     * 正常結點.
     */
    Node(K key, Object value, Node<K, V> next) {
        this.key = key;
        this.value = value;
        this.next = next;
    }

    /**
     * 標記結點.
     */
    Node(Node<K, V> next) {
        this.key = null;
        this.value = this;
        this.next = next;
    }

    /**
     * CAS更新結點的value
     */
    boolean casValue(Object cmp, Object val) {
        return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
    }

    /**
     * CAS更新結點的next
     */
    boolean casNext(Node<K, V> cmp, Node<K, V> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    /**
     * 判斷當前結點是否爲[標記結點]
     */
    boolean isMarker() {
        return value == this;
    }

    /**
     * 判斷當前結點是不是最底層鏈表的頭結點
     */
    boolean isBaseHeader() {
        return value == BASE_HEADER;
    }

    /**
     * 在當前結點後面插入一個標記結點.
     *
     * @param f 當前結點的後繼結點
     * @return true 插入成功
     */
    boolean appendMarker(Node<K, V> f) {
        return casNext(f, new Node<K, V>(f));
    }

    /**
     * 輔助刪除結點方法.
     *
     * @param b 當前結點的前驅結點
     * @param f 當前結點的後繼結點
     */
    void helpDelete(Node<K, V> b, Node<K, V> f) {
        /*
         * 從新檢查一遍結點位置
         * 確保b和f分別爲當前結點的前驅/後繼
         */
        if (f == next && this == b.next) {
            if (f == null || f.value != f)  // f爲null或非標記結點
                casNext(f, new Node<K, V>(f));
            else                            // 刪除當前結點
                b.casNext(this, f.next);
        }
    }

    /**
     * 返回結點的value值.
     *
     * @return 標記結點或最底層頭結點,直接返回null
     */
    V getValidValue() {
        Object v = value;
        if (v == this || v == BASE_HEADER)  // 標記結點或最底層頭結點,直接返回null
            return null;
        V vv = (V) v;
        return vv;
    }

    /**
     * 返回當前結點的一個Immutable快照.
     */
    AbstractMap.SimpleImmutableEntry<K, V> createSnapshot() {
        Object v = value;
        if (v == null || v == this || v == BASE_HEADER)
            return null;
        V vv = (V) v;
        return new AbstractMap.SimpleImmutableEntry<K, V>(key, vv);
    }

    // UNSAFE mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            valueOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("value"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

索引結點:Index

Index結點是除底層鏈外,其他各層鏈表中的非頭結點(見示意圖中的藍色結點)。每一個Index結點包含3個指針:downrightnode
down和right指針分別指向下層結點和後繼結點,node指針指向其最底部的node結點。

static class Index<K, V> {
    final Node<K, V> node;      // node指向最底層鏈表的Node結點
    final Index<K, V> down;     // down指向下層Index結點
    volatile Index<K, V> right; // right指向右邊的Index結點

    Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }

    /**
     * CAS更新右邊的Index結點
     *
     * @param cmp 當前結點的右結點
     * @param val 但願更新的結點
     */
    final boolean casRight(Index<K, V> cmp, Index<K, V> val) {
        return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
    }

    /**
     * 判斷Node結點是否已經刪除.
     */
    final boolean indexesDeletedNode() {
        return node.value == null;
    }

    /**
     * CAS插入一個右邊結點newSucc.
     *
     * @param succ    當前的後繼結點
     * @param newSucc 新的後繼結點
     */
    final boolean link(Index<K, V> succ, Index<K, V> newSucc) {
        Node<K, V> n = node;
        newSucc.right = succ;
        return n.value != null && casRight(succ, newSucc);
    }

    /**
     * 跳過當前結點的後繼結點.
     *
     * @param succ 當前的後繼結點
     */
    final boolean unlink(Index<K, V> succ) {
        return node.value != null && casRight(succ, succ.right);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long rightOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Index.class;
            rightOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("right"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

頭索引結點:HeadIndex

HeadIndex結點是各層鏈表的頭結點,它是Index類的子類,惟一的區別是增長了一個level字段,用於表示當前鏈表的級別,越往上層,level值越大。

static final class HeadIndex<K, V> extends Index<K, V> {
    final int level;    // 層級

    HeadIndex(Node<K, V> node, Index<K, V> down, Index<K, V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

構造器定義和初始化

ConcurrentSkipListMap一共定義了4種構造器:

空構造器

/**
 * 構造一個新的空Map.
 */
public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}

指定比較器的構造器

/**
 * 構造一個新的空Map.
 * 並指定比較器.
 */
public ConcurrentSkipListMap(Comparator<? super K> comparator) {
    this.comparator = comparator;
    initialize();
}

從給定Map構建的構造器

/**
 * 從已給定的Map構造一個新Map.
 */
public ConcurrentSkipListMap(Map<? extends K, ? extends V> m) {
    this.comparator = null;
    initialize();
    putAll(m);
}

從給定SortedMap構建的構造器

/**
 * 從已給定的SortedMap構造一個新Map.
 * 而且Key的順序與原來保持一致.
 */
public ConcurrentSkipListMap(SortedMap<K, ? extends V> m) {
    this.comparator = m.comparator();
    initialize();
    buildFromSorted(m);
}
注:ConcurrentSkipListMap會基於比較器——Comparator ,來進行鍵Key的比較,若是構造時未指定Comparator ,那麼就會按照Key的天然順序進行比較,所謂Key的天然順序是指key實現Comparable接口。

上述全部構造器都調用了initialize方法:

private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    head = new HeadIndex<K, V>(new Node<K, V>(null, BASE_HEADER, null),null, null, 1);
}

initialize方法將一些字段置初始化null,而後將head指針指向新建立的HeadIndex結點。初始化完成後,ConcurrentSkipListMap的結構以下:

clipboard.png

其中,headBASE_HEADER都是ConcurrentSkipListMap的字段:

/**
 * 最底層鏈表的頭指針BASE_HEADER
 */
private static final Object BASE_HEADER = new Object();

/**
 * 最上層鏈表的頭指針head
 */
private transient volatile HeadIndex<K, V> head;

4、ConcurrentSkipListMap的核心操做

put操做

put操做自己很簡單,須要注意的是ConcurrentSkipListMap在插入鍵值對時,Key和Value都不能爲null:

/**
 * 插入鍵值對.
 *
 * @param key   鍵
 * @param value 值
 * @return 若是key存在,返回舊value值;不然返回null
 */
public V put(K key, V value) {
    if (value == null)          // ConcurrentSkipListMap的Value不能爲null
        throw new NullPointerException();
    return doPut(key, value, false);
}

上述方法內部調用了doPut來作實際的插入操做:

/**
 * 插入鍵值對.
 *
 * @param onlyIfAbsent true: 僅當Key不存在時才進行插入
 */
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K, V> z;             // z指向待添加的Node結點
    if (key == null)          // ConcurrentSkipListMap的Key不能爲null
        throw new NullPointerException();

    Comparator<? super K> cmp = comparator;
    outer:
    for (; ; ) {
        // b是「是小於且最接近給定key」的Node結點(或底層鏈表頭結點)
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {
            if (n != null) {                    // b存在後驅結點:  b -> n -> f
                Object v;
                int c;
                Node<K, V> f = n.next;          // f指向b的後驅的後驅
                if (n != b.next)                // 存在併發修改,放棄並重試
                    break;
                if ((v = n.value) == null) {    // n爲標記刪除結點
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b爲標記刪除結點
                    break;
                if ((c = cpr(cmp, key, n.key)) > 0) {   // 向後遍歷,找到第一個大於key的結點
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {                           // 存在Key相同的結點
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        V vv = (V) v;
                        return vv;
                    }
                    break; // CAS更新失敗,則重試
                }
            }

            z = new Node<K, V>(key, value, n);
            if (!b.casNext(n, z))  // 嘗試插入z結點: b -> z -> n
                break;         // CAS插入失敗,則重試
            break outer;           // 跳出最外層循環
        }
    }

    int rnd = ThreadLocalRandom.nextSecondarySeed();    // 生成一個隨機數種子
    if ((rnd & 0x80000001) == 0) {                      // 爲true表示須要增長層級

        /**
         * 如下方法用於建立新層級
         */
        int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0) // level表示新的層級,經過下面這個while循環能夠確認新的層級數
            ++level;

        Index<K, V> idx = null;
        HeadIndex<K, V> h = head;

        if (level <= (max = h.level)) {         // CASE1: 新層級level沒有超過最大層級head.level(head指針指向最高層)
            // 以「頭插法」建立level個Index結點,idx最終指向最高層的Index結點
            for (int i = 1; i <= level; ++i)
                idx = new Index<K, V>(z, idx, null);
        }
        else {                                  // CASE2: 新層級level超過了最大層級head.level
            level = max + 1;    // 重置level爲最大層級+1

            // 生成一個Index結點數組,idxs[0]不會使用
            Index<K, V>[] idxs = (Index<K, V>[]) new Index<?, ?>[level + 1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K, V>(z, idx, null);

            // 生成新的HeadIndex結點
            for (; ; ) {
                h = head;
                int oldLevel = h.level;         // 原最大層級
                if (level <= oldLevel)
                    break;
                HeadIndex<K, V> newh = h;
                Node<K, V> oldbase = h.node;    // oldbase指向最底層鏈表的頭結點
                for (int j = oldLevel + 1; j <= level; ++j)
                    newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }

        /**
         * 如下方法用於連接新層級的各個HeadIndex和Index結點
         */
        splice:
        for (int insertionLevel = level; ; ) {  // 此時level爲oldLevel,即原最大層級
            int j = h.level;
            for (Index<K, V> q = h, r = q.right, t = idx; ; ) {
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K, V> n = r.node;

                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    if (!q.link(r, t))      // 在q和r之間插入t,即從 q -> r 變成 q -> t -> r
                        break;
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

咱們先不急着看doPut方法,而是看下其內部的findPredecessor方法,findPredecessor用於查找「小於且最接近給定key」的Node結點,而且這個Node結點必須有上層結點

/**
 * 返回「小於且最接近給定key」的數據結點.
 * 若是不存在這樣的數據結點,則返回底層鏈表的頭結點.
 *
 * @param key 待查找的鍵
 */
private Node<K, V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException();

    /**
     * 從最上層開始,往右下方向查找
   */
    for (; ; ) {
        for (Index<K, V> q = head, r = q.right, d; ; ) {    // 從最頂層的head結點開始查找
            if (r != null) {             // 存在右結點
                Node<K, V> n = r.node;
                K k = n.key;
                if (n.value == null) {   // 處理結點」懶刪除「的狀況
                    if (!q.unlink(r))
                        break;
                    r = q.right;
                    continue;
                }
                if (cpr(cmp, key, k) > 0) { // key大於k,繼續向右查找
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            //已經到了level1的層
            if ((d = q.down) == null)   // 不存在下結點,說明q已是level1鏈表中的結點了
                return q.node;          // 直接返回對應的Node結點

       // 轉到下一層,繼續查找(level-1層)
            q = d;
            r = d.right;
        }
    }
}

看代碼不太直觀,咱們仍是看下面這個圖:
clipboard.png

上圖中,假設要查找的Key爲72,則步驟以下:

  1. 從最上方head指向的結點開始,比較①號標紅的Index結點的key值,發現3小於72,則繼續向右;
  2. 比較②號標紅的Index結點的key值,發現62小於72,則繼續向右
  3. 因爲此時右邊是null,則轉而向下,一直到⑥號標紅結點;
  4. 因爲⑥號標紅結點的down字段爲空(不能再往下了,已是level1最低層了),則直接返回它的node字段指向的結點,即⑧號結點。
注意:若是咱們要查找key爲59的Node結點,返回的不是Key爲45的結點,而是key爲23的結點。讀者能夠本身在紙上比劃下。

回到doPut方法,假設如今待插入的Key爲3,則當執行完下面這段代碼後,ConcurrentSkipListMap的結構以下:
clipboard.png

/**
 * 插入鍵值對.
 *
 * @param onlyIfAbsent true: 僅當Key不存在時才進行插入
 */
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K, V> z;             // z指向待添加的Node結點
    if (key == null)          // ConcurrentSkipListMap的Key不能爲null
        throw new NullPointerException();

    Comparator<? super K> cmp = comparator;
    outer:
    for (; ; ) {
        // b是「是小於且最接近給定key」的Node結點(或底層鏈表頭結點)
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {
            if (n != null) {    // b存在後驅結點:  b -> n -> f
                Object v;
                int c;
                Node<K, V> f = n.next;          // f指向b的後驅的後驅
                if (n != b.next)                // 存在併發修改,放棄並重試
                    break;
                if ((v = n.value) == null) {    // n爲標記刪除結點
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b爲標記刪除結點
                    break;
                if ((c = cpr(cmp, key, n.key)) > 0) {   // 向後遍歷,找到第一個大於key的結點
                    b = n;
                    n = f;
                    continue;
                }
                if (c == 0) {                           // 存在Key相同的結點
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        V vv = (V) v;
                        return vv;
                    }
                    break; // CAS更新失敗,則重試
                }
            }

            z = new Node<K, V>(key, value, n);
            if (!b.casNext(n, z))  // 嘗試插入z結點: b -> z -> n
                    break;         // CAS插入失敗,則重試
            break outer;           // 跳出最外層循環
        }

          // ...
    }
}

上面是doPut中的第一個循環,做用就是找到底層鏈表的插入點,而後插入結點(在查找過程當中可能會刪除一些已標記的刪除結點)。

插入完成後,doPut方法並沒結束,咱們以前說過ConcurrentSkipListMap的分層數是經過一個隨機數生成算法來肯定,doPut的後半段,就是這個做用:判斷是否須要增長層級,若是須要就在各層級中插入對應的Index結點

/**
 * 插入鍵值對.
 *
 * @param onlyIfAbsent true: 僅當Key不存在時才進行插入
 */
private V doPut(K key, V value, boolean onlyIfAbsent) {
    // ...
    
    int rnd = ThreadLocalRandom.nextSecondarySeed();    // 生成一個隨機數種子
    if ((rnd & 0x80000001) == 0) {                      // 爲true表示須要增長層級

        /**
         * 如下方法用於建立新層級
         */
        int level = 1, max;
        while (((rnd >>>= 1) & 1) != 0) // level表示新的層級,經過下面這個while循環能夠確認新的層級數
            ++level;

        Index<K, V> idx = null;
        HeadIndex<K, V> h = head;

        if (level <= (max = h.level)) {         // CASE1: 新層級level沒有超過最大層級head.level(head指針指向最高層)
            // 以「頭插法」建立level個Index結點,idx最終指向最高層的Index結點
            for (int i = 1; i <= level; ++i)
                idx = new Index<K, V>(z, idx, null);
        }
        else {                                  // CASE2: 新層級level超過了最大層級head.level
            level = max + 1;    // 重置level爲最大層級+1

            // 生成一個Index結點數組,idxs[0]不會使用
            Index<K, V>[] idxs = (Index<K, V>[]) new Index<?, ?>[level + 1];
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K, V>(z, idx, null);

            // 生成新的HeadIndex結點
            for (; ; ) {
                h = head;
                int oldLevel = h.level;         // 原最大層級
                if (level <= oldLevel)
                    break;
                HeadIndex<K, V> newh = h;
                Node<K, V> oldbase = h.node;    // oldbase指向最底層鏈表的頭結點
                for (int j = oldLevel + 1; j <= level; ++j)
                    newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j);
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }

        /**
         * 如下方法用於連接新層級的各個HeadIndex和Index結點
         */
        splice:
        for (int insertionLevel = level; ; ) {  // 此時level爲oldLevel,即原最大層級
            int j = h.level;
            for (Index<K, V> q = h, r = q.right, t = idx; ; ) {
                if (q == null || t == null)
                    break splice;
                if (r != null) {
                    Node<K, V> n = r.node;

                    int c = cpr(cmp, key, n.key);
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }

                if (j == insertionLevel) {
                    if (!q.link(r, t))      // 在q和r之間插入t,即從 q -> r 變成 q -> t -> r
                        break;
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}

最終ConcurrentSkipListMap的結構以下所示:

clipboard.png


remove操做

ConcurrentSkipListMap在刪除鍵值對時,不會當即執行刪除,而是經過引入「標記結點」,以「懶刪除」的方式進行,以提升併發效率。

public V remove(Object key) {
    return doRemove(key, null);
}

remove方法很簡單,內部調用了doRemove方法:

final V doRemove(Object key, Object value) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer:
    for (; ; ) {
        // b指向「小於且最接近給定key」的Node結點(或底層鏈表頭結點)
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {    // b -> n
            Object v;
            int c;
            if (n == null)
                break outer;
            Node<K, V> f = n.next;              // b -> n -> f
            if (n != b.next)                    // 一致性判斷
                break;
            if ((v = n.value) == null) {        // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)      // b is deleted
                break;
            if ((c = cpr(cmp, key, n.key)) < 0)
                break outer;
            if (c > 0) {
                b = n;
                n = f;
                continue;
            }

            // 此時n指向查到的結點

            if (value != null && !value.equals(v))
                break outer;
            if (!n.casValue(v, null))       // 更新查找到的結點的value爲null
                break;

            // 在n和f之間添加標記結點,並將b直接指向f
            if (!n.appendMarker(f) || !b.casNext(n, f))  // n -> marker -> f
                findNode(key);                  // retry via findNode
            else {
                findPredecessor(key, cmp);      // 刪除Index結點
                if (head.right == null)         // 減小層級
                    tryReduceLevel();
            }
            V vv = (V) v;
            return vv;
        }
    }
    return null;
}

仍是經過示例來理解上述代碼,假設如今要刪除Key==23的結點,刪除前ConcurrentSkipListMap的結構以下:
clipboard.png

doRemove方法首先會找到待刪除的結點,在它和後繼結點之間插入一個value爲null的標記結點(以下圖中的綠色結點),而後改變其前驅結點的指向:
clipboard.png

最後,doRemove會從新調用一遍findPredecessor方法,解除被刪除結點上的Index結點之間的引用:
clipboard.png

這樣Key==23的結點其實就被孤立,再後續查找或插入過程當中,會被徹底清除或被GC回收。


get操做

最後,咱們來看下ConcurrentSkipListMap的查找操做——get方法。

public V get(Object key) {
    return doGet(key);
}

內部調用了doGet方法:

private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer:
    for (; ; ) {

        // b指向「小於且最接近給定key」的Node結點(或底層鏈表頭結點)
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {
            Object v;
            int c;
            if (n == null)
                break outer;
            Node<K, V> f = n.next;          // b -> n -> f
            if (n != b.next)
                break;
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  // b is deleted
                break;
            if ((c = cpr(cmp, key, n.key)) == 0) {
                V vv = (V) v;
                return vv;
            }
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

doGet方法很是簡單:

首先找到「小於且最接近給定key」的Node結點,而後用了三個指針:b -> n -> f,
n用於定位最終查找的Key,而後順着鏈表一步步向下查,好比查找KEY==45,則最終三個指針的位置以下:
clipboard.png

相關文章
相關標籤/搜索