ConcurrentSkipListMap跳錶原理解析

咱們首先來看一下ConcurrentSkipListMap的繼承結構圖。node

內部結構以下(圖片來源於網絡),這裏面Node其實就是HeadIndex中的level1,level2,level3中的一個個綠點。算法

ConcurrentSkipListMap的總體數據結構是一種多層鏈表結構,在Java中,咱們都知道鏈表有LinkedList,但LinkedList是一個雙向鏈表.由插入時候的如下代碼就能夠看出來。編程

void linkBefore(E e, Node<E> succ) {
    // assert succ != null;
    final Node<E> pred = succ.prev;
    final Node<E> newNode = new Node<>(pred, e, succ);
    succ.prev = newNode;
    if (pred == null)
        first = newNode;
    else
        pred.next = newNode;
    size++;
    modCount++;
}

而ConcurrentSkipListMap中的各層鏈表爲單向鏈表,而且key值有序排列。這裏有其Node節點的代碼能夠看出,它只有一個next的後續節點。後端

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;

以及它的兩個構造器數組

Node(K key, Object value, Node<K,V> next) {
    this.key = key;
    this.value = value;
    this.next = next;
}
Node(Node<K,V> next) {
    this.key = null;
    this.value = this;
    this.next = next;
}

咱們先來看一下,當咱們new一個ConcurrentSkipListMap的時候會發生什麼。安全

final Comparator<? super K> comparator; //函數式接口——比較器
public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}

private transient volatile HeadIndex<K,V> head; //HeadIndex繼承於Index,這裏爲原始頭索引(當整個數據結構中尚未分層,沒有鏈表的時候)網絡

private static final Object BASE_HEADER = new Object();
private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    //這裏head會被初始化,他的屬性中,Node的Key爲null,value爲Object對象,下一個節點爲null;
    //head的分層索引down爲null,鏈表的後續索引right爲null,層級level爲第一層。
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
}
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level; //層數
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}
static class Index<K,V> {
    final Node<K,V> node; //節點
    final Index<K,V> down; //分層索引,分層索引跟上層索引的Node的key,value相同,next不一樣
    volatile Index<K,V> right; //鏈表後續索引

索引的構造器數據結構

Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
    this.node = node;
    this.down = down;
    this.right = right;
}

作爲一個Map,咱們來看一下它的put方法。併發

public V put(K key, V value) {
    //寫入時不容許寫入null值
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //在最底層鏈表中獲取目標的前置節點
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            //若是該前置節點的後續節點不爲null
            if (n != null) {
                Object v; int c;
                //獲取後續節點的後續節點
                Node<K,V> f = n.next;
                //若是n不爲前置節點的後續節點,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
                //由於可能有其餘線程已經插入了其餘節點在b的後續節點
                if (n != b.next)               // inconsistent read
                    break;
                //若是後續節點n的值爲null,刪除該節點n,用n的後續節點頂替n,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
                if ((v = n.value) == null) {   // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                //若是b爲null或者v等於本身的value,說明b已經被其餘線程刪除了
                //從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
                if (b.value == null || v == n) // b is deleted
                    break;
                //若是咱們查找的key大於後續節點的key,向後續節點推動
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                //若是咱們查找的key等於後續節點的key
                if (c == 0) {
                    //經過無鎖競爭,將value替換後續節點的值,並返回後續節點的原值
                    //競爭失敗的,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
                    //這裏咱們能夠看到修改已存在的key的時候,是不會進行層數變化的
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }
            //若是後續節點n爲null,初始化一個節點,放入咱們要存儲的key,value,該節點的後續節點爲n
            z = new Node<K,V>(key, value, n);
            //經過無鎖競爭,將該新節點替換掉b的後續節點n,此時只有一個線程能夠競爭成功,並替換
            //競爭失敗的線程,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
            if (!b.casNext(n, z))
                break;         // restart if lost race to append to b
            //競爭成功的,退出'for (;;)'循環
            break outer;
        }
    }
    //到此時表示已經節點已經put成功了,但對於跳錶來講,來要根據隨機數的值來表示是否向上增長層數與上層節點
    //獲取一個僞隨機的種子
    int rnd = ThreadLocalRandom.nextSecondarySeed();
    //若是該種子的二進制與10000000000000000000000000000001進行與運算爲0
    //即該種子的二進制最高位與最末尾必須爲0,其餘位無所謂
    //若是該種子的二進制最高位與最末位不爲0,不增長新節點的層數
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        //定義層level,從1開始
        int level = 1, max;
        //判斷該種子值的二進制從第二位開始向左有多少個連續的1,層數加多少個1
        //這裏因爲是隨機值,因此層數level是不肯定的
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        //獲取頭索引head
        HeadIndex<K,V> h = head;
        //若是level小於等於頭索引的層數
        if (level <= (max = h.level)) {
            //根據層數level不斷建立新增節點的下層索引
            //注意此時只是新增了新節點的索引,並無關聯到跳錶的真實體中
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        //若是層數level大於頭索引的層數
        else { // try to grow by one level
            //將層數level變動爲頭索引的層數加1
            level = max + 1; // hold in array and later pick the one to use
            //建立一個數量爲level+1的索引數組
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            //根據層數level不斷建立新增節點的下層索引,並放入數組中
            //此時只是新增了新節點的索引,並無關聯到跳錶的真實體中
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                //獲取頭索引
                h = head;
                //獲取頭索引的層數
                int oldLevel = h.level;
                //若是level小於等於oldLevel,說明已經有其餘線程修改了頭索引的層數,退出循環
                if (level <= oldLevel) // lost race to add level
                    break;
                //定義一個新的頭索引,取值h
                HeadIndex<K,V> newh = h;
                //獲取頭索引的節點
                Node<K,V> oldbase = h.node;
                //該段代碼的意思其實只是新增一層新鏈表,這一層新鏈表以原頭索引爲下層索引,新增節點索引爲鏈表後續索引,因此這
                //一層新鏈表只有頭索引和新增節點的索引兩個索引,但因爲有多個線程的參與,該循環體可能會不斷執行
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                //經過無鎖競爭,使用newh來替代h(此時h不等於newh,只是跳錶的頭索引位置被替換掉了)
                //競爭成功的線程,將h變動爲newh,idx變動爲oldlevel層的新增節點索引,並退出循環'for (;;)'
                //由於此時新鏈表層已經肯定,僅爲頭索引和新節點索引,而以前全部的層的鏈表均要插入新節點索引,因此會做此變動
                //競爭失敗的從新進入循環'for (;;)'
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // find insertion points and splice in
        //獲取level,該level爲原頭節點的層數,即原跳錶的層數,不包括新層
        splice: for (int insertionLevel = level;;) {
            //獲取頭索引的層數
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                //若是頭索引爲null或者新增節點索引爲null,退出'for (int insertionLevel = level;;)'
                //此處表示有其餘線程刪除了頭索引或者新增節點的索引
                if (q == null || t == null)
                    break splice;
                //若是頭索引的鏈表後續索引存在,若是是新層則爲新節點索引,若是是老層,則爲原索引
                if (r != null) {
                    //獲取r的節點
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    //獲取咱們插入的key和n的key的比較值
                    int c = cpr(cmp, key, n.key);
                    //刪除空值索引,具體解釋見doGet中
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    //若是咱們插入的key大於n的key,繼續向後續推動
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                //若是j爲跳錶原層數      
                if (j == insertionLevel) {
                    //此時q爲咱們須要在第j層插入新增節點的前置索引
                    //將新節點索引經過無鎖競爭插入q與r之間,競爭失敗的會從新進入'for (Index<K,V> q = h, r = q.right, t = idx;;)'循環
                    if (!q.link(r, t))
                        break; // restart
                    //若是新增節點的值爲null,表示該節點已經被其餘線程刪除,結束循環'for (int insertionLevel = level;;)'
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    //插入層逐層自減,當爲最底層時退出循環'for (int insertionLevel = level;;)'
                    if (--insertionLevel == 0)
                        break splice;
                }
                //其餘節點隨着插入節點的層數下移而下移
                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}
boolean casValue(Object cmp, Object val) {
    //無鎖競爭,若是cmp等於valueOffset,將val替換cmp
    return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
    //獲取調用索引對象的節點
    Node<K,V> n = node;
    //將新索引的鏈表後續索引(newSucc)設爲老索引(succ)
    newSucc.right = succ;
    //若是調用索引對象的值不爲null,經過無鎖競爭,將新索引替換老索引
    return n.value != null && casRight(succ, newSucc);
}
private Node<K,V> findNode(Object key) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //找到最底層目標節點的前置節點
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //若是該前置節點的鏈表後續節點爲null,退出'for (;;)'循環
            if (n == null)
                break outer;
            //獲取後續節點的後續節點
            Node<K,V> f = n.next;
            //若是n不爲前置節點的後續節點,表示已經有其餘線程刪除了該節點,從新進入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循環
            if (n != b.next)                // inconsistent read
                break;
            //若是後續節點的值爲null,無鎖競爭,刪除該節點(將後續節點替代該節點)
            //從新進入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循環
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            //若是前置節點已被其餘線程刪除,從新進入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循環
            if (b.value == null || v == n)  // b is deleted
                break;
            //若是插入的key與後續節點的key相等,返回後續節點
            if ((c = cpr(cmp, key, n.key)) == 0)
                return n;
            //若是插入的節點key小於後續節點key,結束循環‘for (;;)’
            if (c < 0)
                break outer;
            //若是插入的節點key大於後續節點key,向後推動
            b = n;
            n = f;
        }
    }
    return null;
}

關於層數是如何來增長的,這個就依靠於ThreadLocalRandom.nextSecondarySeed()這個隨機數來決定,當該隨機數的二進制最高位與最末位不爲0的時候,咱們put進該Map的數據只會在最底層鏈表中,不會在高層鏈表中構建節點。當該隨機數的二進制最高位與最末位都爲0的時候,且該隨機數從二進制第二位開始向左有多少個1,就表明會在多少層高層鏈表中構建節點,固然超過原跳錶的最高層只會增長一層。app

用一個網上隨機的例子來講明,雖然這個例子中的算法並非以上ConcurrentSkipListMap的真實算法。可是能夠幫助理解

跳躍表的初試狀態以下圖,表中沒有一個元素:

若是咱們要插入元素2,首先是在底部插入元素2,以下圖:

而後咱們拋硬幣,結果是正面,那麼咱們要將2插入到L2層,以下圖: 

繼續拋硬幣,結果是反面,那麼元素2的插入操做就中止了,插入後的表結構就是上圖所示。接下來,咱們插入元素33,跟元素2的插入同樣,如今L1層插入33,以下圖:

而後拋硬幣,結果是反面,那麼元素33的插入操做就結束了,插入後的表結構就是上圖所示。接下來,咱們插入元素55,首先在L1插入55,插入後以下圖:

而後拋硬幣,結果是正面,那麼L2層須要插入55,以下圖:

繼續拋硬幣,結果又是正面,那麼L3層須要插入55,以下圖:

以此類推,咱們插入剩餘的元素。固然由於規模小,結果極可能不是一個理想的跳躍表。可是若是元素個數n的規模很大,學過幾率論的同窗都知道,最終的表結構確定很是接近於理想跳躍表。

而後咱們來看一下它的get方法。

public V get(Object key) {
    return doGet(key);
}
private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //在最底層鏈表中獲取目標的前置節點
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //若是找到的前置節點沒有後續節點,直接跳出循環'for (;;)',而後返回null
            if (n == null)
                break outer;
            //若是後續節點不爲null,獲取後續節點到後續節點
            Node<K,V> f = n.next;
            //若是n已經不爲前置節點到後續節點了,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
            if (n != b.next)                // inconsistent read
                break;
            //若是後續節點n的值爲null
            if ((v = n.value) == null) {    // n is deleted
                //經過無鎖競爭刪除該節點n,由於只容許有一個線程能夠刪除成功從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
                n.helpDelete(b, f);
                break;
            }
            //若是前置節點的值爲null或者後續節點的值爲null從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環
            if (b.value == null || v == n)  // b is deleted
                break;
            //若是查找的鍵與後續節點的鍵相同,返回後續節點的值
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            //若是查找的鍵小於後續節點的鍵,直接跳出循環'for (;;)',而後返回null
            if (c < 0)
                break outer;
            //若是查找的鍵大於後續節點的鍵,繼續向鏈表的後端推動
            b = n;
            n = f;
        }
    }
    return null;
}

如下這段findPredecessor方法總體的意思爲:從最上層的頭索引開始向右查找(鏈表的後續索引),若是後續索引的節點的Key大於咱們要查找的Key.則頭索引移到下層鏈表,在下層鏈表查找,以此反覆,一直查找到沒有下層的分層索引爲止,返回該索引的節點。若是後續索引的節點的Key小於咱們要查找的Key,則在該層鏈表中向後查找。因爲查找的Key永遠小於索引節點的Key,因此只能找到目標的前置索引節點。其中會有空值索引的存在,這裏是經過CAS來進行處理的。這裏須要注意的是咱們要找的值最終都是在最底層鏈表中找到的,它不會在高層和中層鏈表中去找最終值(就算高層、中層鏈表中有這個值)。

比方說咱們要在這個圖中找55,它是不會直接在L3中直接找到的,而是通過箭頭的方向在L1找55的前置節點。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        //獲取原始頭索引以及該頭索引的鏈表後續索引,當ConcurrentSkipListMap剛初始化的時候,r爲null
        //可是咱們查找確定不會在空的跳錶中查找,因此咱們認定頭索引的後續索引r不爲null.
        for (Index<K,V> q = head, r = q.right, d;;) {
            //若是鏈表後續索引不爲null
            if (r != null) {
                //獲取鏈表後續索引的節點
                Node<K,V> n = r.node;
                K k = n.key;
                //若是該節點的值value爲null
                if (n.value == null) {
                    //刪除空值索引,即把r的後續索引頂替掉r
                    //若是無鎖競爭失敗,即多個線程都在刪空索引,只有一個線程能刪成功,從頭開始進入循環'for (Index<K,V> q = head, r = q.right, d;;)'
                    if (!q.unlink(r))
                        break;           // restart
                    //無鎖競爭成功,空值索引被成功刪除,從新獲取r值,並跳事後續代碼,進入'for (Index<K,V> q = head, r = q.right, d;;)'下一輪循環
                    //這裏咱們須要注意,這裏的q和r並不表明哪一個固定的索引,他們都是不斷在具體的索引中不斷變化的
                    r = q.right;         // reread r
                    continue;
                }
                //若是該節點的值value不爲null,且查找的key值大於該節點的key值
                //向後續索引推動
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            //若是q的分層索引爲null,返回q的節點
            if ((d = q.down) == null)
                return q.node;
            //若是q的分層索引不爲null,將q賦值爲自身的分層索引
            q = d;
            //r賦值爲分層索引的鏈表後續索引
            r = d.right;
        }
    }
}
final boolean unlink(Index<K,V> succ) {
    //前索引的節點的value是否爲null,以及用後索引替換前索引是否成功
    return node.value != null && casRight(succ, succ.right);
}
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
    //無鎖競爭,若是前索引cmp等於rightOffset的時候,使用後索引來替換前索引,並返回true;不然返回false
    return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
static final int cpr(Comparator c, Object x, Object y) {
    //比較x,y是否相等,若是返回0則相等,爲正數說明x大於y
    return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}

關於比較器,能夠參考本人博客Java函數式編程整理

void helpDelete(Node<K,V> b, Node<K,V> f) {
    /*
     * 若是本節點的後續節點爲f,且自己爲b的後續節點
     */
    if (f == next && this == b.next) {
        //若是後續節點爲null或者後續節點值不爲後續節點
        if (f == null || f.value != f) // not already marked
            //經過無鎖競爭生成一個key爲null,value和next都相同的後續節點(value和next也可能爲null)
            casNext(f, new Node<K,V>(f));
        else
            //若是後續節點不爲空,經過無鎖競爭,將f的後續節點替換掉自己節點,即刪除自己節點。
            b.casNext(this, f.next);
    }
}
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

固然咱們最後依然要討論它的併發性能,咱們能夠看到整個put,get中都沒有鎖的參與,徹底經過雙for循環(第一層基本相似於for(;;)的無限循環,第二層加入了各類CAS無鎖競爭,競爭失敗的會從新進入第二層循環)來保證線程安全。咱們都知道純無鎖的性能實際上是要遠遠高過鎖性能的,因此在高併發的讀寫操做的時候,其性能是要好過ConcurrentHashMap,由於ConcurrentHashMap有數組單節點中有節點鎖,具體的性能測試能夠參考Fork/Join框架原理和使用探祕

相關文章
相關標籤/搜索