ConcurrentSkipListMap - 秒懂


JUC 高併發工具類(3文章)與高併發容器類(N文章) :

說明:閱讀本文以前,請先掌握本文前置知識: 跳錶 核心原理 圖解編程

ConcurrentSkipListMap 的結構

ConcurrentSkipListMap 的節點主要由 Node, Index, HeadIndex 構成;下面是一個典型的ConcurrentSkipListMap 的實例的結構圖:
在這裏插入圖片描述安全

1 ConcurrentSkipListMap2結構

ConcurrentSkipListMap 的節點主要由 Node, Index, HeadIndex 構成,
在這裏插入圖片描述數據結構

下面是Node, Index, HeadIndex 的介紹併發

11 普通結點Node

/**
     * 最上層鏈表的頭指針head
     */
    private transient volatile HeadIndex<K, V> head;
    /* ---------------- 普通結點Node定義 -------------- */
    static final class Node<K, V> {
        final K key;
        volatile Object value;
        volatile Node<K, V> next;


        // ...

    }

1.2 索引結點Index

/* ---------------- 索引結點Index定義 -------------- */
  static class Index<K, V> {
     final Node<K, V> node;      // node指向最底層鏈表的Node結點
    final Index<K, V> down;     // down指向下層Index結點
    volatile Index<K, V> right; // right指向右邊的Index結點
     // ...

    }

1.3 頭索引結點HeadIndex

/* ---------------- 頭索引結點HeadIndex -------------- */
    static final class HeadIndex<K, V> extends Index<K, V> {
       final int level;    // 層級
       // ...
    }
}

1.1.4 ConcurrentSkipListMap2內部類與成員彙總

public class ConcurrentSkipListMap2<K, V> extends AbstractMap<K, V>
    implements ConcurrentNavigableMap<K, V>, Cloneable, Serializable {
    /**
   * 最底層鏈表的頭指針BASE_HEADER
     */
    private static final Object BASE_HEADER = new Object();

    /**
   * 最上層鏈表的頭指針head
     */
    private transient volatile HeadIndex<K, V> head;
    /* ---------------- 普通結點Node定義 -------------- */
    static final class Node<K, V> {
        final K key;
        volatile Object value;
        volatile Node<K, V> next;
       // ...
    }

    /* ---------------- 索引結點Index定義 -------------- */
    static class Index<K, V> {
        final Node<K, V> node;      // node指向最底層鏈表的Node結點
       final Index<K, V> down;     // down指向下層Index結點
        volatile Index<K, V> right; // right指向右邊的Index結點
        // ...
    }

   /**
   *Nodes heading each level keep track of their level.
  */
    /* ---------------- 頭索引結點HeadIndex -------------- */
    static final class HeadIndex<K, V> extends Index<K, V> {
        final int level;    // 層級

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
      // ...
  }

3 ConcurrentSkipListMap 的幾個特色:

  • ConcurrentSkipListMap 的節點主要由 Node, Index, HeadIndex 構成;app

  • ConcurrentSkipListMap 的數據結構橫向縱向都是鏈表less

  • 最下面那層是Node層(數據節點)層, 上面幾層都是Index(索引)層dom

  • 從縱向鏈表來看, 最左邊的是 HeadIndex 層, 右邊的都是Index 層, 且每層的最底端都是對應Node, 縱向上的索引都是指向最底端的Node

4 ConcurrentSkipListMap在新建時的初始狀態

ConcurrentSkipListMap在初始時, 只存在 HeadIndex 和 Base_Header 節點,初始狀態以下:

在這裏插入圖片描述

下面來看看 ConcurrentSkipListMap 的主要方法 doPut, doGet, doRemove方法的原理。

5 doPut 原理

put方法主要經歷了2個步驟:

第一大步:在底層查找合適的位置,插入該節點的Node實例。

第二大步:插入該節點的一個或者多個IndexNode節點(數量和層數有關)。

5.1 第一大步:查找合適的位置,插入該Node節點。具體以下:

step1)查找前驅跳躍點b,而且獲取b.next節點爲 n。

step2)遍歷查找合適的插入點,n 爲null就建立節點,添加在前驅b的next節點,添加成功跳出第一步,失敗從新進行step1

step3)n不爲null,則n爲其應該插入的節點。明確了位置以後,先要判斷n是否仍是b的next節點,防止被搶先在中間插入了,再判斷n節點是不是有效節點,如n被邏輯刪除了就回到step1後再重來。最後判斷b節點是否被刪除了。接下來,判斷node的key的是否大小n節點的key,若是等於就替換掉該節點的value值(表示更新value),跳出第一步。若是大於意味着還要往n後找,最後找到了合適的插入點就嘗試插入,若是失敗重來step1,成功結束第一步。

完成第一步大部,僅僅是將節點插入了鏈表中,還須要完成跳錶的IndexNode構成。

5.1 第二大步:構建跳躍表的結點,調整跳錶。

step1) 隨機級別,偶數且大於0。

說明:node級別就意味着跳錶的間隔,node級別越大,層次越高,高級別層次的結點越少,key間隔越大。級別越大,在查找的時候能夠提高查找速度,從最大的級別開始,逐級定位結點。一個新加結點,首先要肯定其屬於幾級,1級就不須要構建IndexNode,一系列判斷出其所屬級別後,就先構建down方向的一系列結點,再經過各層的頭結點,將整個層的IndexNode的right方向結點聯通。

step2)若是該級別的 level 是0(要知道得到0的機率是很大的),不須要插入Index索引結點。插入的工做結束。

step2) 若是該級別的 level<= max(head的級別,當前的最大level),生成一系列的Index索引節點,而且經過down成員進行串接,全部級別Index索引結點(node爲插入節點)構成down鏈,生成的Index索引節點從級別1開始。

step3)若是該級別的 level> max(head的級別,當前的最大level) (這個函數返回的最大值也就31, 也就是說, 最多有31層的索引),則加大一個跳錶級別,生成從1開始的全部級別Index索引結點(node爲插入節點)構成down鏈。

step4)再次判斷頭結點級別,若是head級別比該級別高,證實head被其餘線程搶先調整了,重來。沒有搶先,從新構建head頭結點的索引headIndex,node是頭結點的node,補充缺失的級別就能夠了。替換頭結點HeadIndex成功跳出循環,失敗重來。

上面都是構建down方向的結點,確保head的down方向包含了全部索引級別。後面的方法就是構建right方法的鏈接了。這裏要注意,h變成了新的頭結點,level倒是舊的級別。

step5)h結點或h的right結點r爲null,不必進行,結束該環節

step6)r不爲null,比較key和r的結點n的key,n結點被邏輯刪除,就幫助其移除,移除後找下一個r結點。當前r結點要小於key,則key還在右邊,繼續找r。直到找到key應該在的位置,即r結點>=key,key的right就是r。

step7)不斷降級,直到找到當前的插入級別,直到到指定級別,構建鏈接,鏈接失敗重來,成功若是構建的結點被邏輯刪除了,經過findNode方法,刪除它。

6 圖解: put的完成過程

6.1 添加第1個節點

添加 key=1, value = A 節點, 結果如圖:
在這裏插入圖片描述

步驟以下:

  • 1 doPut()尋找前驅節點, 這時返回的 b = BaseHeader, n = null

  • 2 doPut直接 CAS操做設置b 的next節點

  • 3 這裏假設獲取的 level 是0(要知道得到0的機率是很大的, 這個函數返回的最大值也就31, 也就是說, 最多有31層的索引)

  • 4 因此這時 index索引節點= null, 操做結束

6.2 添加第2個節點

再次添加 key=2, value = B 節點, 最終效果圖以下:

在這裏插入圖片描述

6.3 添加第3個節點

這裏爲了理解上的便利, 咱們再添加一個節點, 最終效果圖以下:

在這裏插入圖片描述

步驟以下:

  • 1 doPut()尋找前驅節點, 這時返回的 b = node2, n = null

  • 2 doPut直接 CAS操做設置b 的next節點爲新的node3

  • 3 這裏假設獲取的 level 是1, 則 level <= max(max = 1)成立, 初始化一個 index索引節點

  • 4 最終找到要插入index位置, 而後進行down連接操做, 因此這時 index索引節點的down= null, 操做結束

此次增長了索引層 index 1

6.4 添加第4個節點

再put節點 key=4 value = D (情形和 Node1, Node2 同樣), 最終結果:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Am5hoyuI-1604491989652)(file:///C:/Users/WUQING~1/AppData/Local/Temp/msohtmlclip1/01/clip_image006.png)]

6.5 添加第5個節點

添加 key=5, value = E 節點, 結果如圖:

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-iIncHfVK-1604491989653)(file:///C:/Users/WUQING~1/AppData/Local/Temp/msohtmlclip1/01/clip_image007.png)]

步驟以下:

  • 1 doPut()尋找前驅節點, 這時返回的 b = node4, n = null

  • 2 doPut直接 CAS操做設置b 的next節點爲新的node5

  • 3 這裏假設獲取的 level 是2, 則 level <= max(max = 1)不成立, 只要 level > max, 只是在原來的 max + 1, 就是指增長一層的索引

  • 4 進行 index 索引鏈表的初始化, 一共兩個index 節點,1層一個,index鏈表是縱向的鏈表

  • 5 增長一個層次,在原來的 HeadIndex 的縱向鏈表上增長一個新節點,新的HeadIndex的 down= 老的HeadIndex,縱向鏈接起來, 而新HeadIndex的index是第二層的 Index,HadeIndex與Index橫向鏈接起來了

此次增長了索引層 index 1

7 put的源碼

/**
 * Main insetion method. Adds element if not present, or
 * replaces value if present and onlyIfAbsent is false.
 *
 * @param key the key
 * @param value the values that must be associated with key
 * @param onlyIfAbstsent if should not insert if already present
 * @return the old value, or null if newly inserted
 */
private V doPut(K key, V value, boolean onlyIfAbstsent){
    Node<K, V> z; // adde node
    if(key == null){
        throw new NullPointerException();
    }
    Comparator<? super K> cmp = comparator;
    outer:
    for(;;){
        // 0.
        for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 將 key 對應的前繼節點找到, b 爲前繼節點, n是前繼節點的next, 若沒發生 條件競爭, 最終 key在 b 與 n 之間 (找到的b在 base_level 上)
            if(n != null){ // 2. n = null時 b 是鏈表的最後一個節點, key 直接插到 b 以後 (調用 b.casNext(n, z))
                Object v; int c;
                Node<K, V> f = n.next; // 3 獲取 n 的右節點
                if(n != b.next){ // 4. 條件競爭(另一個線程在b以後插入節點, 或直接刪除節點n), 則 break 到位置 0, 從新
                    break ;
                }
                if((v = n.value) == null){ // 4. 若 節點n已經刪除, 則 調用 helpDelete 進行幫助刪除 (詳情見 helpDelete), 則 break 到位置 0, 從新來
                    n.helpDelete(b, f);
                    break ;
                }

                if(b.value == null || v == n){ // 5. 節點b被刪除中 ,則 break 到位置 0, 調用 findPredecessor 幫助刪除 index 層的數據, 至於 node 層的數據 會經過 helpDelete 方法進行刪除
                    break ;
                }
                if((c = cpr(cmp, key, n.key)) > 0){ // 6. 若 key 真的 > n.key (在調用 findPredecessor 時是成立的), 則進行 向後走
                    b = n;
                    n = f;
                    continue ;
                }
                if(c == 0){ // 7. 直接進行賦值
                    if(onlyIfAbstsent || n.casValue(v, value)){
                        V vv = (V) v;
                        return vv;
                    }
                    break ; // 8. cas 競爭條件失敗 重來
                }
                // else c < 0; fall through
            }
            // 9. 到這邊時 n.key > key > b.key
            z = new Node<K, V> (key, value, n);
            if(!b.casNext(n, z)){
                break ; // 10. cas競爭條件失敗 重來
            }
            break outer; // 11. 注意 這裏 break outer 後, 上面的 for循環不會再執行, 然後執行下面的代碼, 這裏是break 不是 continue outer, 這二者的效果是不同的
        }
    }

    int rnd = KThreadLocalRandom.nextSecondarySeed();
    if((rnd & 0x80000001) == 0){ // 12. 判斷是否須要添加level
        int level = 1, max;
        while(((rnd >>>= 1) & 1) != 0){
            ++level;
        }
        // 13. 上面這段代碼是獲取 level 的, 咱們這裏只須要知道獲取 level 就能夠 (50%的概率返回0,25%的概率返回1,12.5%的概率返回2...最大返回31。)
        Index<K, V> idx = null;
        HeadIndex<K, V> h = head;
        if(level <= (max = h.level)){ // 14. 初始化 max 的值, 若 level 小於 max , 則進入這段代碼 (level 是 1-31 之間的隨機數)
            for(int i = 1; i <= level; ++i){
                idx = new Index<K, V>(z, idx, null); // 15 添加 z 對應的 index 數據, 並將它們組成一個上下的鏈表(index層是上下左右都是鏈表)
            }
        }
        else{ // 16. 若 level > max 則只增長一層 index 索引層
            level = max + 1; // 17. 跳錶新的 level 產生
            Index<K, V>[] idxs = (Index<K, V>[])new Index<?, ?>[level + 1];
            for(int i = 1; i <= level; ++i){
                idxs[i] = idx = new Index<K, V>(z, idx, null);
            }
            for(;;){
                h = head;
                int oldLevel = h.level; // 18. 獲取老的 level 層
                if(level <= oldLevel){ // 19. 另外的線程進行了index 層增長操做, 因此 不須要增長 HeadIndex 層數
                    break;
                }
                HeadIndex<K, V> newh = h;
                Node<K, V> oldbase = h.node; // 20. 這裏的 oldbase 就是BASE_HEADER
                for(int j = oldLevel+1; j <= level; ++j){ // 21. 這裏其實就是增長一層的 HeadIndex (level = max + 1)
                    newh = new HeadIndex<K, V>(oldbase, newh, idxs[j], j); // 22. idxs[j] 就是上面的 idxs中的最高層的索引
                }
                if(casHead(h, newh)){ // 23. 這隻新的 headIndex
                    h = newh;  // 24. 這裏的 h 變成了 new HeadIndex
                    idx = idxs[level = oldLevel];  // 25. 這裏的 idx 上從上往下第二層的 index 節點 level 也變成的 第二
                    break;
                }
            }
        }

        // find insertion points and splice in
        splice:
        for(int insertionLevel = level;;){ // 26. 這時的 level 已是 第二高的 level(若上面 步驟19 條件競爭失敗, 則多出的 index 層實際上是無用的, 由於 那是 調用 Index.right 是找不到它的)
            int j = h.level;
            for(Index<K, V> q = h, r = q.right, t = idx;;){ // 27. 初始化對應的數據
                if(q == null || t == null){ // 28. 節點都被刪除 直接 break出去
                    break splice;
                }
                if(r != null){
                    Node<K, V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    int c = cpr(cmp, key, n.key);
                    if(n.value == null){ // 29. 老步驟, 幫助index 的刪除
                        if(!q.unlink(r)){
                            break ;
                        }
                        r = q.right; // 30. 向右進行遍歷
                        continue ;
                    }

                    if(c > 0){ // 31. 向右進行遍歷
                        q = r;
                        r = r.right;
                        continue ;
                    }
                }

                // 32.
                // 代碼運行到這裏, 說明 key < n.key
                // 第一次運行到這邊時, j 是最新的 HeadIndex 的level j > insertionLevel 很是用可能, 而下面又有 --j, 因此終會到 j == insertionLevel
                if(j == insertionLevel){
                    if(!q.link(r, t)){ // 33. 將 index t 加到 q 與 r 中間, 若條件競爭失敗的話就重試
                        break ; // restrt
                    }
                    if(t.node.value == null){ // 34. 若這時 node 被刪除, 則開始經過 findPredecessor 清理 index 層, findNode 清理 node 層, 以後直接 break 出去, doPut調用結束
                        findNode(key);
                        break splice;
                    }
                    if(--insertionLevel == 0){ // 35. index 層添加OK, --1 爲下層插入 index 作準備
                        break splice;
                    }
                }

                /**
                 * 下面這行代碼實際上是最重要的, 理解這行代碼, 那 doPut 就差很少了
                 * 1). --j 要知道 j 是 newhead 的level, 一開始必定 > insertionLevel的, 經過 --1 來爲下層操做作準備 (j 是 headIndex 的level)
                 * 2). 經過 19. 21, 22 步驟, 我的認爲 --j >= insertionLevel 是橫成立, 而 --j 是必需要作的
                 * 3) j 通過幾回--1, 當出現 j < level 時說明 (j+1) 層的 index已經添加成功, 因此處理下層的 index
                 */
                if(--j >= insertionLevel && j < level){
                    t = t.down;
                }
                /** 到這裏時, 其實有兩種狀況
                 *  1) 尚未一次index 層的數據插入
                 *  2) 已經進行 index 層的數據插入, 如今爲下一層的插入作準備
                 */
                q = q.down; // 從 index 層向下進行查找
                r = q.right;

            }
        }
    }
    return null;
}

8 findPredecessor() 尋找前繼節點

整體思路是: 從矩形鏈表的左上角的 HeadIndex 索引開始, 先向右, 遇到 null, 或 > key 時向下, 重複向右向下找, 一直找到 對應的前繼節點(前繼節點就是小於 key 的最大節點)

/**
 * Returns a base-level node with key strictly less than given key,
 * or the base-level header if there is no such node. Also
 * unlinks indexes to deleted nodes found along the way. Callers
 * rely on this side-effect of clearing indices to deleted nodes
 * @param key the key
 * @return a predecessor of the key
 */
private Node<K, V> findPredecessor(Object key, Comparator<? super K> cmp){
    if(key == null)
        throw new NullPointerException(); // don't postpone errors
    for(;;){
        for(Index<K, V> q = head, r = q.right, d;;){ // 1. 初始化數據 q 是head, r 是 最頂層 h 的右Index節點
            if(r != null){ // 2. 對應的 r =  null, 則進行向下查找
                Node<K, V> n = r.node;
                K k = n.key;
                if(n.value == null){ // 3. n.value = null 說明 節點n 正在刪除的過程當中
                    if(!q.unlink(r)){ // 4. 在 index 層直接刪除 r 節點, 若條件競爭發生直接進行break 到步驟1 , 從新從 head 節點開始查找
                        break; // restart
                    }
                    r = q.right; //reread r // 5. 刪除 節點r 成功, 獲取新的 r 節點, 回到步驟 2 (仍是從這層索引開始向右遍歷, 直到 r == null)
                    continue;
                }

                if(cpr(cmp, key, k) > 0){ // 6. 若 r.node.key < 參數key, 則繼續向右遍歷, continue 到 步驟 2處, 若 r.node.key >  參數key 直接跳到 步驟 7
                    q = r;
                    r = r.right;
                    continue;
                }
            }

            if((d = q.down) == null){ // 7. 到這邊時, 已經到跳錶的數據層, q.node < key < r的 或q.node < key 且 r == null; 因此直接返回 q.node
                return q.node;
            }

            q = d; // 8 未到數據層, 進行從新賦值向下走 (爲何向下走呢? 回過頭看看 跳錶, 原來 上層的index 通常都是比下層的 index 個數少的)
            r = d.right;
        }
    }
}

9. doGet() 獲取節點對應的值

整個過程:

  1. 尋找 key 的前繼節點 b (這時b.next = null || b.next > key, 則說明不存key對應的 Node)
  2. 接着就判斷 b, b.next 與 key之間的關係(其中有些 helpDelete操做)
/**
 * Gets value for key. Almost the same as findNode, but returns
 * the found value (to avoid retires during ret-reads)
 *
 *  這個 doGet 方法比較簡單
 * @param key the key
 * @return the value, or null if absent
 */
private V doGet(Object key){
    if(key == null){
        throw new NullPointerException();
    }
    Comparator<? super K> cmp = comparator;
    outer:
    for(;;){
        for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 獲取 key 的前繼節點 b, 其實這時 n.key >= key
            Object v; int c;
            if(n == null){ // 2. n == null 說明 key 對應的 node 不存在 因此直接 return null
                break outer;
            }
            Node<K, V> f = n.next;
            if(n != b.next){ // 3. 有另外的線程修改數據, 從新來
                break ;
            }
            if((v = n.value) == null){ // 4. n 是被刪除了的節點, 進行helpDelete 後從新再來
                n.helpDelete(b, f);
                break ;
            }
            if(b.value == null || v == n){ // 5. b已是刪除了的節點, 則 break 後再來
                break ;
            }
            if((c = cpr(cmp, key, n.key)) == 0){ // 6. 若 n.key = key 直接返回結果, 這裏返回的結果有多是 null
                V vv = (V) v;
                return vv;
            }
            if(c < 0){ // 7. c < 0說明不存在 key 的node 節點
                break outer;
            }
            // 8. 運行到這一步時, 實際上是 在調用 findPredecessor 後又有節點添加到 節點b的後面所致
            b = n;
            n = f;
        }
    }

    return null;
}

10. doRemove() 刪除節點

整個刪除個 ConcurrentSkipListMap 裏面 nonBlockingLinkedList 實現的一大亮點, 爲何呢? 由於這個 nonBlockingLinkedList 同時支持併發安全的從鏈表中間添加/刪除操做, 而 ConcurrentLinkedQueue 只支持併發安全的從鏈表中間刪除;
刪除操做:

  1. 尋找對應的節點
  2. 給節點的 value 至 null, node.value = null
  3. 將 node 有增長一個標記節點 (this.value = this 還記得哇, 不記得的直接看 node 類)
  4. 經過 CAS 直接將 K對應的Node和標記節點一同刪除
/**
 * Main deletion method. Locates node, nulls value, appends a
 * deletion marker, unlinks predecessor, removes associated index
 * nodes, and possibly reduces head index level
 *
 * Index nodes are cleared out simply by calling findPredecessor.
 * which unlinks indexes to deleted nodes found along path to key,
 * which will include the indexes to this node. This is node
 * unconditionally. We can't check beforehand whether there are
 * indexes hadn't been inserted yet for this node during initial
 * search for it, and we'd like to ensure lack of garbage
 * retention, so must call to be sure
 *
 * @param key the key
 * @param value if non-null, the value that must be
 *              associated with key
 * @return the node, or null if not found
 */
final V doRemove(Object key, Object value){
    if(key == null){
        throw new NullPointerException();
    }
    Comparator<? super K> cmp = comparator;
    outer:
    for(;;){
        for(Node<K, V> b = findPredecessor(key, cmp), n = b.next;;){ // 1. 獲取對應的前繼節點 b
            Object v; int c;
            if(n == null){ // 2. 節點 n 被刪除 直接 return null 返回 , 由於理論上 b.key < key < n.key
                break outer;
            }
            Node<K, V> f = n.next;
            if(n != b.next){ // 3. 有其餘線程在 節點b 後增長數據, 重來
                break ;
            }
            if((v = n.value) == null){ // 4. 節點 n 被刪除, 調用 helpDelete 後重來
                n.helpDelete(b, f);
                break ;
            }

            if(b.value == null || v == n){ // 5. 節點 b 刪除, 重來 調用findPredecessor時會對 b節點對應的index進行清除, 而b借點吧自己會經過 helpDelete 來刪除
                break ;
            }
            if((c = cpr(cmp, key, n.key)) < 0){ // 6. 若n.key < key 則說明 key 對應的節點就不存在, 因此直接 return
                break outer;
            }

            if(c > 0){ // 7. c>0 出如今 有其餘線程在本方法調用findPredecessor後又在b 後增長節點, 因此向後遍歷
                b = n;
                n = f;
                continue ;
            }

            if(value != null && !value.equals(v)){ // 8. 若 前面的條件爲真, 則不進行刪除 (調用 doRemove 時指定必定要知足 key value 都相同, 具體看 remove 方法)
                break outer;
            }
            if(!n.casValue(v, null)){ // 9. 進行數據的刪除
                break ;
            }
            if(!n.appendMarker(f) || !b.casNext(n, f)){ // 10. 進行 marker 節點的追加, 這裏的第二個 cas 不必定會成功, 但不要緊的 (第二個 cas 是刪除 n節點, 不成功會有  helpDelete 進行刪除)
                findNode(key);  // 11. 對 key 對應的index 進行刪除
            }
            else{
                findPredecessor(key, cmp); //12. 對 key 對應的index 進行刪除 10進行操做失敗後經過 findPredecessor 進行index 的刪除
                if(head.right == null){
                    tryReduceLevel(); // 13. 進行headIndex 對應的index 層的刪除
                }
            }

            V vv = (V) v;
            return vv;

        }
    }

    return null;
}

11 無鎖編程(lock free)

常見的無鎖編程(lock free)通常是基於CAS(Compare And Swap)+volatile 結合實現:(1)CAS保障操做的原子性,volatile 保障內存的可見性。

  • 優勢:

一、開銷較小:不須要進入內核,不須要切換線程;

二、沒有死鎖:總線鎖最長持續爲一次read+write的時間;

三、只有寫操做須要使用CAS,讀操做與串行代碼徹底相同,可實現讀寫不互斥。

  • 缺點:

一、編程很是複雜,兩行代碼之間可能發生任何事,不少常識性的假設都不成立。

二、CAS模型覆蓋的狀況很是少,沒法用CAS實現原子的複數操做。

12 無鎖編程Key-Value結構的對比

目前經常使用的key-value數據結構有三種:Hash表、紅黑樹、SkipList,它們各自有着不一樣的優缺點(不考慮刪除操做):

  • Hash表:插入、查找最快,爲O(1);如使用鏈表實現則可實現無鎖;數據有序化須要顯式的排序操做。

  • 紅黑樹:插入、查找爲O(logn),但常數項較小;無鎖實現的複雜性很高,通常須要加鎖;數據自然有序。

  • SkipList:插入、查找爲O(logn),但常數項比紅黑樹要大;底層結構爲鏈表,可無鎖實現;數據自然有序。

若是要實現一個key-value結構,需求的功能有插入、查找、迭代、修改,那麼首先Hash表就不是很適合了,由於迭代的時間複雜度比較高;而紅黑樹的插入極可能會涉及多個結點的旋轉、變色操做,所以須要在外層加鎖,這無形中下降了它可能的併發度。而SkipList底層是用鏈表實現的,能夠實現爲lock free,同時它還有着不錯的性能(單線程下只比紅黑樹略慢),很是適合用來實現咱們需求的那種key-value結構。

因此,LevelDB、Redis的底層存儲結構就是用的SkipList。


回到◀瘋狂創客圈

瘋狂創客圈 - Java高併發研習社羣,爲你們開啓大廠之門

相關文章
相關標籤/搜索