跳錶(skip list)是在1990年由William Pugh提出的一種數據結構.JAVA裏也有對應的數據結構--ConcurrentSkipListMap.引用wiki中的一個gif來展現跳錶的插入過程.node
在ConcurrentSkipListMap中有三種類型的節點:Node<K,V>、Index<K,V>和HeadIndex<K,V>.數組
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;
// 建立一個普通節點
Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}
// 建立一個新的marker節點,它的value字段指向它本身,而且key是null(base-level-header的key也是null)
Node(Node<K,V> next) {
this.key = null;
this.value = this;
this.next = next;
}
// 經過cas的方式設置value字段
boolean casValue(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
// 經過cas的方式設置next字段.
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean isMarker() {
return value == this;
}
boolean isBaseHeader() {
return value == BASE_HEADER;
}
// 該操做分兩步.
// 1.建立一個marker節點,並將該marker節點的next指向f.
// 2.將當前節點的next字段由f替換爲建立的marker節點.
boolean appendMarker(Node<K,V> f) {
return casNext(f, new Node<K,V>(f));
}
// 刪除當前節點
// 若是未進行標記,則在當前節點以後添加標記節點.
// 若是已作過標記,那麼直接調用cas操做,將b的next字段指向f.next(此時f是marker節點,f.next纔是真正的後繼節點)
void helpDelete(Node<K,V> b, Node<K,V> f) {
// 這裏分兩步操做,主要是減小協助刪除的線程之間的CAS干擾.
if (f == next && this == b.next) {
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
else
b.casNext(this, f.next);
}
}
V getValidValue() {
Object v = value;
if (v == this || v == BASE_HEADER)
return null;
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
AbstractMap.SimpleImmutableEntry<K,V> createSnapshot() {
Object v = value;
if (v == null || v == this || v == BASE_HEADER)
return null;
@SuppressWarnings("unchecked") V vv = (V)v;
return new AbstractMap.SimpleImmutableEntry<K,V>(key, vv);
}
// UNSAFE mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
// 獲取value字段的偏移量
valueOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("value"));
// 獲取next字段的偏移量
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
複製代碼
static class Index<K,V> {
final Node<K,V> node; // 指向base-level Node節點
final Index<K,V> down; // 指向下一層Index節點.
volatile Index<K,V> right; // 指向右側Index節點.
/**
* Creates index node with given values.
*/
Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
this.node = node;
this.down = down;
this.right = right;
}
// cas right字段
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
final boolean indexesDeletedNode() {
return node.value == null;
}
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
Node<K,V> n = node;
newSucc.right = succ;
// 當前Index節點的node節點未被刪除,才經過cas將right字段指向newSucc節點.
return n.value != null && casRight(succ, newSucc);
}
// 解綁,使當前Index節點的right字段跳過succ節點.
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long rightOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Index.class;
rightOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("right"));
} catch (Exception e) {
throw new Error(e);
}
}
}
複製代碼
HeadIndex<K,V>:頭結點.其屬性level用於記錄層數.bash
結構數據結構
1.head頭結點位於最左側.HeadIndex節點的個數決定了層數.
2.上層的鏈表由Index節點組成.
3.最底層的鏈表結構由Node節點組成.
4.其中淺黃的N表明指向底層Node節點的指針.
5.D表明Down指針,指向下一層的Index節點.
6.淺綠色的N表明next指針,指向下個一個Node節點.
7.L1和L2後面的數字表明HeadIndex節點處於的層數.
複製代碼
ConcurrentSkipListMap的put操做(doPut方法)能夠分解爲兩個步驟.app
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
// 0.外層outer循環.
outer: for (;;) {
// 1.每次循環查詢key的前置Node節點(b),並獲取前置節點的下一個節點(n)
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
//2.判斷b的後繼節點n是否爲null.爲null跳轉到8
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
// 3.若是n再也不是b的下一個節點,說明結構已被其餘線程修改.跳到1繼續循環
if (n != b.next) // inconsistent read
break;
//4. 節點n被刪除的狀況,刪除節點n,跳到1繼續循環
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//5. 節點b被刪除,跳到1繼續執行
if (b.value == null || v == n) // b is deleted
break;
//6. 比較傳入的key和n的key,比較結果大於0.將b和n都後移,而後跳到2繼續執行
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
//7. key相等
if (c == 0) {
// 若是onlyIfAbsent爲true或者value替換成功,則返回老的value值,結束doPut方法.
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
// 跳轉到1.
break; // restart if lost race to replace value
}
// else c < 0; fall through
//8. 若是n爲null,直接新建Node節點z
z = new Node<K,V>(key, value, n);
//9. 經過cas操做將b的next指針指向z.
if (!b.casNext(n, z))
//10. cas操做失敗,跳到1繼續執行.
break; // restart if lost race to append to b
//11. cas成功,跳出outer循環,進行Index節點的構建.
break outer;
}
}
複製代碼
流程:dom
// 獲取隨機數,用於計算層數
int rnd = ThreadLocalRandom.nextSecondarySeed();
//0. 判斷最高位和最低位是否都爲0,rnd的最高位和最低位都爲0的狀況才構建Index
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
//1. 對rnd的每一位進行判斷,當bit位爲1時將level+1
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
//2. 判斷level是否小於等於當前的層數.true,執行3;false,執行4.
if (level <= (max = h.level)) {
//3. 建立level個Index節點,每一個Index節點的down指針指向上一個Index節點.第一個Index節點的down指針指向null.
for (int i = 1; i <= level; ++i)
// z爲須要插入的Node節點,讓全部建立的index節點都指向z.
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
//4. 將level設置爲max + 1.若是當前跳錶層數是4,那麼level就是5.
level = max + 1; // hold in array and later pick the one to use
//5. 建立Index數組,這裏的數組大小是level+1,由於下標0不存儲數據.
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
//6. 建立Index節點,並放入idxs數組中.
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
//7. 循環.
for (;;) {
//8. 從新獲取頭節點
h = head;
//9. 從新獲取跳錶的層數
int oldLevel = h.level;
//10. 比較level和跳錶層數.true,執行14;false,執行11.
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
//11. 構造新的Head節點.此時level是大於oldLevel的.這裏舉個例子(E1):好比原來的層數是3,而level是4,那麼就須要新增1個HeadIndex節點,那麼HeadIndex也就變成了4層.最後獲得的newh也就是最上層的HeadIndex節點.
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
//12. 經過cas操做將head節點替換爲新的最上層的headIndex節點.操做成功,執行14;操做失敗,跳到7繼續循環.
if (casHead(h, newh)) {
h = newh;
//*13*. 這裏將level設置爲了oldLevel.並將idx指向idxs[level],參照E1,那麼這裏idx就是指向idxs[3].之因此要這樣作,是由於以前跳錶只有3層,如今變成了4層,而且在step11的時候建立了新的HeadIndex,而且直接將右指針指向了idxs[4],所以後面的操做只須要將idxs[1]、idxs[2]、idxs[3]插入到對應層的Index鏈表中便可.
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
//14.
int j = h.level;
//15. 初始化數據.
for (Index<K,V> q = h, r = q.right, t = idx;;) {
//16. q(當前層的headIndex節點)爲null或t(要插入的Index節點)爲null.跳出splice循環,結束方法.
if (q == null || t == null)
break splice;
//17. 判斷r是否爲null.true,執行18;false,執行25.
if (r != null) {
//18. n爲r所指向的Node節點.
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
//19. 獲取key值比較的結果.
int c = cpr(cmp, key, n.key);
//20. 判斷n是否被刪除.
if (n.value == null) {
//21.斷開q與r的鏈接,也就是經過cas操做使q的right指針指向r的右繼Index節點.
if (!q.unlink(r))
//22. 操做失敗,有其餘線程在修改,跳到14繼續執行.
break;
//23. 操做成功,讓r從新指向q的右繼節點,跳到16繼續執行.
r = q.right;
continue;
}
if (c > 0) {
//24. c大於0.將q和r右移,跳到16繼續執行.
q = r;
r = r.right;
continue;
}
}
//25. 判斷j和insertionLevel是否相等.相等才插入Index節點.
// 這裏j爲當前頭結點h的層數(有可能執行到這裏時,其餘線程又增長了HeadIndex節點的個數,所以此時的h指向的並不必定是最高層的HeadIndex節點,可是不要緊,咱們此次的插入操做只須要關心h所指向的HeadIndex節點及其下層的HeadIndex節點所在的Index鏈表).
// insertionLevel指的是須要插入t的層數.
// 仍是拿E1來舉例,若是操做都成功,那麼j就是4,而insertionLevel爲3,因此這時不能插入,由於此時t指向的Index節點是屬於第3層的,須要將q = q.down,直到q也是第3層時才能對t進行插入操做.
if (j == insertionLevel) {
//26. 插入t
if (!q.link(r, t))
// 不成功則跳到14繼續執行.
break; // restart
// 若是新插入的節點被刪除
if (t.node.value == null) {
// 這裏的findNode方式實際上是爲了輔助刪除,在遍歷的鏈路上的被標記刪除的節點都會被完全刪除掉.並跳出splice循環,結束put操做.
findNode(key);
break splice;
}
//27. insertionLevel自減,等於0說明操做完成,跳出splice循環,結束方法.
if (--insertionLevel == 0)
break splice;
}
//*28*. j自減.
// 正常狀況,當執行完27以後,--j應該是等於insertionLevel,而且j也是小於level的,這個時候將t下移沒有問題.
// 可是有一種異常狀況,仍是拿E1舉例.當j爲4,insertionLevel爲3時,因爲j != insertionLevel,所以j會一直自減直到j也爲3的時候,會執行25.
// 若是j爲3的時候執行插入成功,那麼27執行後insertionLevel會變爲2,j也會變爲2,此時t將下移.
// 而後繼續循環,好比當insertionLevel = 1的時候,若是走到26的時候插入失敗,那麼就會跳到14繼續執行,此時j和t都會被初始化.也就是說此時j又變成4了,而且t也再也不指向第1層的Index節點,而是指向第3層的Index節點(而此時第3層節點其實已經插入成功了),可是insertionLevel仍是1,因此在這種狀況下,--j > insertionLevel && j < level的狀況下,也須要將t下移,使其指向最後將要插入的Index節點.
// 經過上面兩種狀況,所以if表達式中須要寫成 --j >= insertionLevel && j < level,而不是--j == insertionLevel && j < level
if (--j >= insertionLevel && j < level)
t = t.down;
// q下移,以進行下一層的插入操做.
q = q.down;
r = q.right;
}
}
}
複製代碼
流程:ui
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
// 0. 獲取前置Node節點b以及b的next節點n.
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
// 1.
Object v; int c;
if (n == null)
// n爲null,由於b.key < key <= n.key,所以若是n爲null,那麼key所對應的節點應該也被刪除,所以直接跳到20.
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
// 2. 說明在以前有線程對跳錶的數據結構進行了修改,致使n != b.next,因此跳到0繼續執行.
break;
if ((v = n.value) == null) { // n is deleted
//3.value爲null,說明節點n被刪除了,因此此時協助刪除.參考數據結構中的Node類的方法解析.
// 跳轉到0繼續執行.
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
//4. b的value爲null或者v == n(說明n是一個marker節點)都說明節點b已被刪除.
// 跳轉到0繼續執行.
break;
if ((c = cpr(cmp, key, n.key)) < 0)
//5. key < n.key說明沒有須要刪除的節點,直接跳轉到20返回.
break outer;
if (c > 0) {
//6. key > n.key,將b、n右移,而後跳轉到1繼續執行.
b = n;
n = f;
continue;
}
// 參考remove方法
if (value != null && !value.equals(v))
break outer;
//7. 經過cas操做將n的value設置爲null
if (!n.casValue(v, null))
//8. 失敗,跳轉到0繼續執行.
break;
if (!n.appendMarker(f) || !b.casNext(n, f))
//9. 添加marker節點失敗或直接刪除節點n失敗,調用findNode方法
findNode(key); // retry via findNode
else {
//10. 調用findPredecessor
findPredecessor(key, cmp); // clean index
if (head.right == null)
//11. head的右邊節點爲null,則嘗試減少level.
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
// 20.
return null;
}
複製代碼