咱們首先來看一下ConcurrentSkipListMap的繼承結構圖。node
內部結構以下(圖片來源於網絡),這裏面Node其實就是HeadIndex中的level1,level2,level3中的一個個綠點。算法
ConcurrentSkipListMap的總體數據結構是一種多層鏈表結構,在Java中,咱們都知道鏈表有LinkedList,但LinkedList是一個雙向鏈表.由插入時候的如下代碼就能夠看出來。編程
void linkBefore(E e, Node<E> succ) { // assert succ != null; final Node<E> pred = succ.prev; final Node<E> newNode = new Node<>(pred, e, succ); succ.prev = newNode; if (pred == null) first = newNode; else pred.next = newNode; size++; modCount++; }
而ConcurrentSkipListMap中的各層鏈表爲單向鏈表,而且key值有序排列。這裏有其Node節點的代碼能夠看出,它只有一個next的後續節點。後端
static final class Node<K,V> { final K key; volatile Object value; volatile Node<K,V> next;
以及它的兩個構造器數組
Node(K key, Object value, Node<K,V> next) { this.key = key; this.value = value; this.next = next; }
Node(Node<K,V> next) { this.key = null; this.value = this; this.next = next; }
咱們先來看一下,當咱們new一個ConcurrentSkipListMap的時候會發生什麼。安全
final Comparator<? super K> comparator; //函數式接口——比較器
public ConcurrentSkipListMap() { this.comparator = null; initialize(); }
private transient volatile HeadIndex<K,V> head; //HeadIndex繼承於Index,這裏爲原始頭索引(當整個數據結構中尚未分層,沒有鏈表的時候)網絡
private static final Object BASE_HEADER = new Object();
private void initialize() { keySet = null; entrySet = null; values = null; descendingMap = null; //這裏head會被初始化,他的屬性中,Node的Key爲null,value爲Object對象,下一個節點爲null; //head的分層索引down爲null,鏈表的後續索引right爲null,層級level爲第一層。 head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null), null, null, 1); }
static final class HeadIndex<K,V> extends Index<K,V> { final int level; //層數 HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) { super(node, down, right); this.level = level; } }
static class Index<K,V> { final Node<K,V> node; //節點 final Index<K,V> down; //分層索引,分層索引跟上層索引的Node的key,value相同,next不一樣 volatile Index<K,V> right; //鏈表後續索引
索引的構造器數據結構
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) { this.node = node; this.down = down; this.right = right; }
作爲一個Map,咱們來看一下它的put方法。併發
public V put(K key, V value) { //寫入時不容許寫入null值 if (value == null) throw new NullPointerException(); return doPut(key, value, false); }
private V doPut(K key, V value, boolean onlyIfAbsent) { Node<K,V> z; // added node if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { //在最底層鏈表中獲取目標的前置節點 for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //若是該前置節點的後續節點不爲null if (n != null) { Object v; int c; //獲取後續節點的後續節點 Node<K,V> f = n.next; //若是n不爲前置節點的後續節點,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 //由於可能有其餘線程已經插入了其餘節點在b的後續節點 if (n != b.next) // inconsistent read break; //若是後續節點n的值爲null,刪除該節點n,用n的後續節點頂替n,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } //若是b爲null或者v等於本身的value,說明b已經被其餘線程刪除了 //從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 if (b.value == null || v == n) // b is deleted break; //若是咱們查找的key大於後續節點的key,向後續節點推動 if ((c = cpr(cmp, key, n.key)) > 0) { b = n; n = f; continue; } //若是咱們查找的key等於後續節點的key if (c == 0) { //經過無鎖競爭,將value替換後續節點的值,並返回後續節點的原值 //競爭失敗的,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 //這裏咱們能夠看到修改已存在的key的時候,是不會進行層數變化的 if (onlyIfAbsent || n.casValue(v, value)) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } break; // restart if lost race to replace value } // else c < 0; fall through } //若是後續節點n爲null,初始化一個節點,放入咱們要存儲的key,value,該節點的後續節點爲n z = new Node<K,V>(key, value, n); //經過無鎖競爭,將該新節點替換掉b的後續節點n,此時只有一個線程能夠競爭成功,並替換 //競爭失敗的線程,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 if (!b.casNext(n, z)) break; // restart if lost race to append to b //競爭成功的,退出'for (;;)'循環 break outer; } } //到此時表示已經節點已經put成功了,但對於跳錶來講,來要根據隨機數的值來表示是否向上增長層數與上層節點 //獲取一個僞隨機的種子 int rnd = ThreadLocalRandom.nextSecondarySeed(); //若是該種子的二進制與10000000000000000000000000000001進行與運算爲0 //即該種子的二進制最高位與最末尾必須爲0,其餘位無所謂 //若是該種子的二進制最高位與最末位不爲0,不增長新節點的層數 if ((rnd & 0x80000001) == 0) { // test highest and lowest bits //定義層level,從1開始 int level = 1, max; //判斷該種子值的二進制從第二位開始向左有多少個連續的1,層數加多少個1 //這裏因爲是隨機值,因此層數level是不肯定的 while (((rnd >>>= 1) & 1) != 0) ++level; Index<K,V> idx = null; //獲取頭索引head HeadIndex<K,V> h = head; //若是level小於等於頭索引的層數 if (level <= (max = h.level)) { //根據層數level不斷建立新增節點的下層索引 //注意此時只是新增了新節點的索引,並無關聯到跳錶的真實體中 for (int i = 1; i <= level; ++i) idx = new Index<K,V>(z, idx, null); } //若是層數level大於頭索引的層數 else { // try to grow by one level //將層數level變動爲頭索引的層數加1 level = max + 1; // hold in array and later pick the one to use //建立一個數量爲level+1的索引數組 @SuppressWarnings("unchecked")Index<K,V>[] idxs = (Index<K,V>[])new Index<?,?>[level+1]; //根據層數level不斷建立新增節點的下層索引,並放入數組中 //此時只是新增了新節點的索引,並無關聯到跳錶的真實體中 for (int i = 1; i <= level; ++i) idxs[i] = idx = new Index<K,V>(z, idx, null); for (;;) { //獲取頭索引 h = head; //獲取頭索引的層數 int oldLevel = h.level; //若是level小於等於oldLevel,說明已經有其餘線程修改了頭索引的層數,退出循環 if (level <= oldLevel) // lost race to add level break; //定義一個新的頭索引,取值h HeadIndex<K,V> newh = h; //獲取頭索引的節點 Node<K,V> oldbase = h.node; //該段代碼的意思其實只是新增一層新鏈表,這一層新鏈表以原頭索引爲下層索引,新增節點索引爲鏈表後續索引,因此這 //一層新鏈表只有頭索引和新增節點的索引兩個索引,但因爲有多個線程的參與,該循環體可能會不斷執行 for (int j = oldLevel+1; j <= level; ++j) newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j); //經過無鎖競爭,使用newh來替代h(此時h不等於newh,只是跳錶的頭索引位置被替換掉了) //競爭成功的線程,將h變動爲newh,idx變動爲oldlevel層的新增節點索引,並退出循環'for (;;)' //由於此時新鏈表層已經肯定,僅爲頭索引和新節點索引,而以前全部的層的鏈表均要插入新節點索引,因此會做此變動 //競爭失敗的從新進入循環'for (;;)' if (casHead(h, newh)) { h = newh; idx = idxs[level = oldLevel]; break; } } } // find insertion points and splice in //獲取level,該level爲原頭節點的層數,即原跳錶的層數,不包括新層 splice: for (int insertionLevel = level;;) { //獲取頭索引的層數 int j = h.level; for (Index<K,V> q = h, r = q.right, t = idx;;) { //若是頭索引爲null或者新增節點索引爲null,退出'for (int insertionLevel = level;;)' //此處表示有其餘線程刪除了頭索引或者新增節點的索引 if (q == null || t == null) break splice; //若是頭索引的鏈表後續索引存在,若是是新層則爲新節點索引,若是是老層,則爲原索引 if (r != null) { //獲取r的節點 Node<K,V> n = r.node; // compare before deletion check avoids needing recheck //獲取咱們插入的key和n的key的比較值 int c = cpr(cmp, key, n.key); //刪除空值索引,具體解釋見doGet中 if (n.value == null) { if (!q.unlink(r)) break; r = q.right; continue; } //若是咱們插入的key大於n的key,繼續向後續推動 if (c > 0) { q = r; r = r.right; continue; } } //若是j爲跳錶原層數 if (j == insertionLevel) { //此時q爲咱們須要在第j層插入新增節點的前置索引 //將新節點索引經過無鎖競爭插入q與r之間,競爭失敗的會從新進入'for (Index<K,V> q = h, r = q.right, t = idx;;)'循環 if (!q.link(r, t)) break; // restart //若是新增節點的值爲null,表示該節點已經被其餘線程刪除,結束循環'for (int insertionLevel = level;;)' if (t.node.value == null) { findNode(key); break splice; } //插入層逐層自減,當爲最底層時退出循環'for (int insertionLevel = level;;)' if (--insertionLevel == 0) break splice; } //其餘節點隨着插入節點的層數下移而下移 if (--j >= insertionLevel && j < level) t = t.down; q = q.down; r = q.right; } } } return null; }
boolean casValue(Object cmp, Object val) { //無鎖競爭,若是cmp等於valueOffset,將val替換cmp return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val); }
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); }
final boolean link(Index<K,V> succ, Index<K,V> newSucc) { //獲取調用索引對象的節點 Node<K,V> n = node; //將新索引的鏈表後續索引(newSucc)設爲老索引(succ) newSucc.right = succ; //若是調用索引對象的值不爲null,經過無鎖競爭,將新索引替換老索引 return n.value != null && casRight(succ, newSucc); }
private Node<K,V> findNode(Object key) { if (key == null) throw new NullPointerException(); // don't postpone errors Comparator<? super K> cmp = comparator; outer: for (;;) { //找到最底層目標節點的前置節點 for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; //若是該前置節點的鏈表後續節點爲null,退出'for (;;)'循環 if (n == null) break outer; //獲取後續節點的後續節點 Node<K,V> f = n.next; //若是n不爲前置節點的後續節點,表示已經有其餘線程刪除了該節點,從新進入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循環 if (n != b.next) // inconsistent read break; //若是後續節點的值爲null,無鎖競爭,刪除該節點(將後續節點替代該節點) //從新進入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循環 if ((v = n.value) == null) { // n is deleted n.helpDelete(b, f); break; } //若是前置節點已被其餘線程刪除,從新進入‘for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)’循環 if (b.value == null || v == n) // b is deleted break; //若是插入的key與後續節點的key相等,返回後續節點 if ((c = cpr(cmp, key, n.key)) == 0) return n; //若是插入的節點key小於後續節點key,結束循環‘for (;;)’ if (c < 0) break outer; //若是插入的節點key大於後續節點key,向後推動 b = n; n = f; } } return null; }
關於層數是如何來增長的,這個就依靠於ThreadLocalRandom.nextSecondarySeed()這個隨機數來決定,當該隨機數的二進制最高位與最末位不爲0的時候,咱們put進該Map的數據只會在最底層鏈表中,不會在高層鏈表中構建節點。當該隨機數的二進制最高位與最末位都爲0的時候,且該隨機數從二進制第二位開始向左有多少個1,就表明會在多少層高層鏈表中構建節點,固然超過原跳錶的最高層只會增長一層。app
用一個網上隨機的例子來講明,雖然這個例子中的算法並非以上ConcurrentSkipListMap的真實算法。可是能夠幫助理解
跳躍表的初試狀態以下圖,表中沒有一個元素:
若是咱們要插入元素2,首先是在底部插入元素2,以下圖:
而後咱們拋硬幣,結果是正面,那麼咱們要將2插入到L2層,以下圖:
繼續拋硬幣,結果是反面,那麼元素2的插入操做就中止了,插入後的表結構就是上圖所示。接下來,咱們插入元素33,跟元素2的插入同樣,如今L1層插入33,以下圖:
而後拋硬幣,結果是反面,那麼元素33的插入操做就結束了,插入後的表結構就是上圖所示。接下來,咱們插入元素55,首先在L1插入55,插入後以下圖:
而後拋硬幣,結果是正面,那麼L2層須要插入55,以下圖:
繼續拋硬幣,結果又是正面,那麼L3層須要插入55,以下圖:
以此類推,咱們插入剩餘的元素。固然由於規模小,結果極可能不是一個理想的跳躍表。可是若是元素個數n的規模很大,學過幾率論的同窗都知道,最終的表結構確定很是接近於理想跳躍表。
而後咱們來看一下它的get方法。
public V get(Object key) { return doGet(key); }
private V doGet(Object key) { if (key == null) throw new NullPointerException(); Comparator<? super K> cmp = comparator; outer: for (;;) { //在最底層鏈表中獲取目標的前置節點 for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { Object v; int c; //若是找到的前置節點沒有後續節點,直接跳出循環'for (;;)',而後返回null if (n == null) break outer; //若是後續節點不爲null,獲取後續節點到後續節點 Node<K,V> f = n.next; //若是n已經不爲前置節點到後續節點了,從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 if (n != b.next) // inconsistent read break; //若是後續節點n的值爲null if ((v = n.value) == null) { // n is deleted //經過無鎖競爭刪除該節點n,由於只容許有一個線程能夠刪除成功,並從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 n.helpDelete(b, f); break; } //若是前置節點的值爲null或者後續節點的值爲null從新進入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循環 if (b.value == null || v == n) // b is deleted break; //若是查找的鍵與後續節點的鍵相同,返回後續節點的值 if ((c = cpr(cmp, key, n.key)) == 0) { @SuppressWarnings("unchecked") V vv = (V)v; return vv; } //若是查找的鍵小於後續節點的鍵,直接跳出循環'for (;;)',而後返回null if (c < 0) break outer; //若是查找的鍵大於後續節點的鍵,繼續向鏈表的後端推動 b = n; n = f; } } return null; }
如下這段findPredecessor方法總體的意思爲:從最上層的頭索引開始向右查找(鏈表的後續索引),若是後續索引的節點的Key大於咱們要查找的Key.則頭索引移到下層鏈表,在下層鏈表查找,以此反覆,一直查找到沒有下層的分層索引爲止,返回該索引的節點。若是後續索引的節點的Key小於咱們要查找的Key,則在該層鏈表中向後查找。因爲查找的Key永遠小於索引節點的Key,因此只能找到目標的前置索引節點。其中會有空值索引的存在,這裏是經過CAS來進行處理的。這裏須要注意的是咱們要找的值最終都是在最底層鏈表中找到的,它不會在高層和中層鏈表中去找最終值(就算高層、中層鏈表中有這個值)。
比方說咱們要在這個圖中找55,它是不會直接在L3中直接找到的,而是通過箭頭的方向在L1找55的前置節點。
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) { if (key == null) throw new NullPointerException(); // don't postpone errors for (;;) { //獲取原始頭索引以及該頭索引的鏈表後續索引,當ConcurrentSkipListMap剛初始化的時候,r爲null //可是咱們查找確定不會在空的跳錶中查找,因此咱們認定頭索引的後續索引r不爲null. for (Index<K,V> q = head, r = q.right, d;;) { //若是鏈表後續索引不爲null if (r != null) { //獲取鏈表後續索引的節點 Node<K,V> n = r.node; K k = n.key; //若是該節點的值value爲null if (n.value == null) { //刪除空值索引,即把r的後續索引頂替掉r //若是無鎖競爭失敗,即多個線程都在刪空索引,只有一個線程能刪成功,從頭開始進入循環'for (Index<K,V> q = head, r = q.right, d;;)' if (!q.unlink(r)) break; // restart //無鎖競爭成功,空值索引被成功刪除,從新獲取r值,並跳事後續代碼,進入'for (Index<K,V> q = head, r = q.right, d;;)'下一輪循環 //這裏咱們須要注意,這裏的q和r並不表明哪一個固定的索引,他們都是不斷在具體的索引中不斷變化的 r = q.right; // reread r continue; } //若是該節點的值value不爲null,且查找的key值大於該節點的key值 //向後續索引推動 if (cpr(cmp, key, k) > 0) { q = r; r = r.right; continue; } } //若是q的分層索引爲null,返回q的節點 if ((d = q.down) == null) return q.node; //若是q的分層索引不爲null,將q賦值爲自身的分層索引 q = d; //r賦值爲分層索引的鏈表後續索引 r = d.right; } } }
final boolean unlink(Index<K,V> succ) { //前索引的節點的value是否爲null,以及用後索引替換前索引是否成功 return node.value != null && casRight(succ, succ.right); }
final boolean casRight(Index<K,V> cmp, Index<K,V> val) { //無鎖競爭,若是前索引cmp等於rightOffset的時候,使用後索引來替換前索引,並返回true;不然返回false return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val); }
static final int cpr(Comparator c, Object x, Object y) { //比較x,y是否相等,若是返回0則相等,爲正數說明x大於y return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y); }
關於比較器,能夠參考本人博客Java函數式編程整理
void helpDelete(Node<K,V> b, Node<K,V> f) { /* * 若是本節點的後續節點爲f,且自己爲b的後續節點 */ if (f == next && this == b.next) { //若是後續節點爲null或者後續節點值不爲後續節點 if (f == null || f.value != f) // not already marked //經過無鎖競爭生成一個key爲null,value和next都相同的後續節點(value和next也可能爲null) casNext(f, new Node<K,V>(f)); else //若是後續節點不爲空,經過無鎖競爭,將f的後續節點替換掉自己節點,即刪除自己節點。 b.casNext(this, f.next); } }
boolean casNext(Node<K,V> cmp, Node<K,V> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }
固然咱們最後依然要討論它的併發性能,咱們能夠看到整個put,get中都沒有鎖的參與,徹底經過雙for循環(第一層基本相似於for(;;)的無限循環,第二層加入了各類CAS無鎖競爭,競爭失敗的會從新進入第二層循環)來保證線程安全。咱們都知道純無鎖的性能實際上是要遠遠高過鎖性能的,因此在高併發的讀寫操做的時候,其性能是要好過ConcurrentHashMap,由於ConcurrentHashMap有數組單節點中有節點鎖,具體的性能測試能夠參考Fork/Join框架原理和使用探祕 。