簡單高效的跳躍表 ConcurrentSkipListMap

一、簡介

看了TreeMap、HashMap源碼的,或者手寫實現紅黑樹、平衡二叉樹的,感受插入和刪除手寫太難了;我邏輯理解也不是很順暢,插入基本搞清楚了,可是刪除,我就呵呵了node

那麼,有沒有一種性能和紅黑樹或者平衡二叉樹不相上下,且又很好實現的數據結構,答案是確定,我知道的至少有一種 跳錶;不過也有缺點,看實現方式(我找的資料都是插入時,對剛插入的數據隨機處理索引),不必定穩定安全

什麼叫跳錶呢? 跳錶是一個隨機化的有序鏈表。跳錶在原有的有序鏈表上面增長了多級索引,經過索引來實現快速查找。跳錶不只能提升搜索性能,同時也能夠提升插入和刪除操做的性能。bash

它採用隨機技術決定鏈表中哪些節點應增長向前指針以及在該節點中應增長多少個指針。 採用這種隨機技術,跳錶中的搜索、插入、刪除操做的時間均爲O(logn),然而,最壞狀況下時間複雜性卻變成O(n)。 跳錶的結構大體以下圖:(來源百度百科數據結構

二、ConcurrentSkipListMap源碼分析

2.1 數據節點

final K key;
        volatile Object value;
        volatile Node<K,V> next;
複製代碼

節點數據包含,key,value和下個數據;也就是採用單鏈表的結構存儲實際數據app

具體的代碼操做就不貼出了,大概有如下內容:dom

  1. 使用sun.misc.Unsafe類來實現原子操做;配合volatile關鍵+自旋能夠實現線程安全操做
  2. 實現了值改變原子操做,後驅節點變化原子操做,當前next節點刪除的線程安全操做方法

2.2 普通索引節點

final Node<K,V> node;
        final Index<K,V> down;
        volatile Index<K,V> right;
複製代碼

包含對應數據,下方索引節點,右邊索引節點;源碼分析

  1. 一個特殊的單鏈表,和下方索引節點的數據節點相同;
  2. 實現右邊索引節點的原子操做

2.3 頭索引節點 HeadIndex<K,V>

static final class HeadIndex<K,V> extends Index<K,V> {
        final int level;
        HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
            super(node, down, right);
            this.level = level;
        }
    }
複製代碼

繼承了普通索引節點,增長了索引級別,從1開始;post

2.4 主要成員變量

static final Object BASE_HEADER = new Object();

    private transient volatile HeadIndex<K,V> head;

    final Comparator<? super K> comparator;
複製代碼

head 索引節點,comparator比較器;特色性能

  1. 頭索引節點,存儲的值對爲 null-BASE_HEADER;數據節點第一個值爲BASE_HEADER
  2. 頭索引節點的變化實現了原子操做
  3. head值爲最高索引級別(level最大)的第一個索引節點
  4. 存儲key值實現了Comparator接口,或者提供了比較器comparator,否則比較時拋出異常

2.5 查找前驅節點

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors for (;;) { for (Index<K,V> q = head, r = q.right, d;;) { if (r != null) { Node<K,V> n = r.node; K k = n.key; if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } if ((d = q.down) == null) return q.node; q = d; r = d.right; } } } 複製代碼
  1. 首先判斷key值,爲空,拋出異常
  2. 採用雙層for循環;在原子性操做去除無效值失敗時,內循環從新開始,至關於自旋操做,保證線程同步
  3. 內層循環採用right索引的值域比較大小:
  • right索引不爲空,首先若是rigth索引的值域無效(值節點中value爲空),則原子操做(unlink方法)連接q節點和right索引的rigt索引,失敗從新循環,成功,索引向右移動
  • right索引不爲空,若是right索引的值-key 比當前key小(cpr(cmp, key, k), key大,返回 > 0),則繼續往右移動,進行下次循環
  • right索引爲空,或者小,則索引向下移動
  • 若是down指針域爲空,則說明,這就是在當前全部索引節點中,小於key的最大索引節點

向下移動,則是把down指針域當成next,進行鏈表移動ui

向右移動,則是把right指針域當成next,進行鏈表移動

2.6 查找節點 Node<K,V> findNode(Object key)

private Node<K,V> findNode(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next) 
                    break;
                if ((v = n.value) == null) { 
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n) 
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0)
                    return n;
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }
複製代碼
  1. key值檢驗有效性
  2. 雙層循環;內循環處理流程
  • 若是next節點爲空,則說明未找到匹配的數據,則跳出雙層循環,結束流程,返回null
  • 檢查數據是否發生了變化,若是發生變化,從新內循環
  • 檢查數據有效性,無效時,去除節點(cas操做,有可能失敗),從新循環
  • 檢查b、n數據有效性,無效從新內循環
  • 比較索引值域-key 和key的大小,至關代表找到數據,返回;小於0,則不存在,跳出雙層循環,返回null;大於0,繼續日後找

因而可知,數據節點按照key的從小到大排列的

2.7 獲取數據 get

public V get(Object key) {
        return doGet(key);
    }
    
    private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)   
                    break;
                if ((v = n.value) == null) {    
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }
複製代碼

和findNode方法基本一致,就是在下面代碼中,返回值不一樣;一個是返回節點,一個是返回節點value值

if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
複製代碼

2.8 增長、修改數據

public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }
    
    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    if (n != b.next)     
                        break;
                    if ((v = n.value) == null) {  
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n) 
                        break;
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; // restart if lost race to replace value
                    }
                }
                z = new Node<K,V>(key, value, n);
                if (!b.casNext(n, z))
                    break; 
                break outer;
            }
        }

        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) {
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { 
                level = max + 1; 
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }

            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    if (r != null) {
                        Node<K,V> n = r.node;
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        if (!q.link(r, t))
                            break;
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }

                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }
複製代碼

從上面代碼能夠看出,分兩個大步驟:一是插入數據域節點(修改值只有這一步),二是索引層處理

插入數據:和get思路大同小異,存在一下區別

  1. 找到相同key的數據節點時,替換value值,結束整個流程
  2. 若是向後查找,當前節點next大於要插入key值,說明插入節點在當前節點和後面節點之間,原子操做連接,成功進行下一步

索引層處理

  1. 隨機一個正數,若是不知足(rnd & 0x80000001) == 0,則結束
  2. 經過隨機數rnd,來肯定作多有多少層level
  3. 若是level層數小於現有層,則直接生成當前數據的從1到level的索引節點
  4. 若是level層大於現有層,則增長一層:生成當前數據從1到level的索引節點,並生成原有層到新的level層的頭節點,頭節點右值域爲當前數據同層索引節點;h爲新的head指針,idxs爲當前插入索引節點在舊的索引層最高層
  5. 雙層循環:外層循環還是檢驗數據發生變化時從新處理,保證數據的線程安全
  • 若是當前插入數據表明的索引節點爲空,或者是,上次循環時索引層爲1,結束
  • 去除無效索引節點;成功,向右移動,繼續循環,失敗則從外層循環從新開始;
  • 當前插入數據key值大,繼續右移,繼續循環
  • 這時已經找到位置;找到位置的level和插入數據的level同層,則索引節點插入;插入爲原子操做,失敗,則從內層循環從新開始
  • 找到位置的level和插入數據的level同層,存在當前插入索引爲無效數據,則跳出外層循環,結束
  • 找到位置的level和插入數據的level同層;插入數據,level 減1,若是此時值爲0,說明索引節點所有連接成功,結束
  • 若是當前內循環時索引節點level,比插入數據level大,則內循環索引節點向下移動,繼續循環
  • 若是內循環索引節點不大於插入數據level節點,則一塊兒向下移動(在我看來,這就應該是同級移動),繼續循環

2.9 移除數據 remove

public V remove(Object key) {
        return doRemove(key, null);
    }
    final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                /////////////////////////////////////////////// 相等
                if (value != null && !value.equals(v))
                    break outer;
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    findPredecessor(key, cmp);      // clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }
複製代碼

又是熟悉的套路;校驗,雙層循環;那咱們從key值比較相等(代碼中 /////////////////////////////////////////////// 相等 處)開始提及

  1. 若是刪除數據的value值不爲空且不相等,代表不存在想刪除數據,跳出雙層循環,結束
  2. 當前數據節點是想刪除的數據節點,數據value原子操做置空,失敗,內循環從新開始
  3. 刪除數據value = null,原子操做去除數據節點並進行後驅處理,失敗,則經過findNode來去除節點(其經過自旋+cas操做必定能操做成功)
  4. 刪除數據節點,並處理後驅成功;則經過 findPredecessor方法進行無效索引節點去除,一樣此操做確定能夠成功;
  5. 若是最高層head索引節點,右指針域爲空,嘗試去除最高層索引層(已經沒有任何數據域索引節點了)
  6. 返回刪除節點的value值

2.10 釋放索引層 tryReduceLevel

private void tryReduceLevel() {
        HeadIndex<K,V> h = head;
        HeadIndex<K,V> d;
        HeadIndex<K,V> e;
        if (h.level > 3 &&
            (d = (HeadIndex<K,V>)h.down) != null &&
            (e = (HeadIndex<K,V>)d.down) != null &&
            e.right == null &&
            d.right == null &&
            h.right == null &&
            casHead(h, d) && 
            h.right != null)
            casHead(d, h); 
    }
複製代碼

去除索引條件

  1. 至少有4層索引節點
  2. head節點的down指針域不爲空
  3. head節點的down指針域的down指針域不爲空
  4. head的right指針域爲空
  5. head的down指針域的右域爲空
  6. head節點的down指針域的down指針域的右域爲空
  7. 原子操做 head 置換爲head的down指針域,失敗,結束
  8. 當前head的右指針域不爲空,則原子操做head,從新回退爲h

爲啥要這麼多判斷啊,爲啥要回退啊,我是徹底不懂,只能說確定是爲了處理線程同步問題的,哪位讀者知道,謝謝留言告訴我

三、總結

  1. 存儲包括數據單鏈表、索引單鏈表結構;索引單鏈表有兩個指針域,右、下
  2. 數據單鏈表,是從小到大排列的
  3. 索引數據域,每一個數據對應的索引層數不是有序分佈的
  4. 增長數據時,隨機數決定是否增長索引層,是否增長插入值索引;這個隨機很重要,影響着效率
  5. 刪除數據後,可能減小索引層
  6. head索引以及其down、down-down指針域的節點值是固定的
  7. 未加入數據時有一層索引,一個固定頭數據
  8. 查找時,經過索引查找最大左區間數據節點,而後再數據鏈表中向後查詢
  9. 插入數據的key類,必須實現Comparator接口或者初始化時傳入比較器

技術變化都很快,但基礎技術、理論知識永遠都是那些;做者但願在餘後的生活中,對經常使用技術點進行基礎知識分享;若是你以爲文章寫的不錯,請給與關注和點贊;若是文章存在錯誤,也請多多指教!

相關文章
相關標籤/搜索