深刻Java集合學習系列:ConcurrentSkipListMap實現原理




背景(註釋):java


一個併發的相似ConcurrentNavigableMap的實現。node

這個map經過實現Comparable或者提供一個Comparator來實現排列的,經過構造函數來提供。算法

這個實現是一個SkipLists的併發版本而且爲containsKey/get/put/remove操做提供了log(n)的消耗。插入、刪除、更新和讀取能夠在多個線程之間安全併發。數組

Iterators和spliterators是弱兼容的。從小到大的key排列中的視圖比從大到小的要快。安全

全部從這些方法中返回的Map.Entry只是某時刻的一個快照。他們不提供setValue方法。(注意到你能夠經過改變映射經過使用put、putIfAbsent、replace方法,依賴於你本身想要的效果)併發

注意不像其餘大部分的集合那樣,size方法不是一個常量時間運算。由於這個map的異步特性,決定了這個map的元素個數必須經過遍歷獲得,因此在遍歷過程當中改變了map那麼就會獲得一個不精確的結果。而且,這些以大量數據位參數的方法像putAll、equals、toArray、containsValue以及clear不會保證以原子的方式執行。好比,一個遍歷操做和putAll操做併發,那麼只會看到部分添加的元素。app

這個類和它的視圖還有迭代器實現了全部的可選的Map和Iterator接口的方法。像其餘的併發結合,這個類不支持把null做爲key或者value,由於null做爲返回值沒法區分是否缺乏元素。dom


算法(註釋):異步

這個類實現了一個相似於樹的二維跳錶,標識段經過連接包含不一樣數據的基本節點來展現。有兩個緣由說明爲什麼使用這種方法代替類數組的結構:函數

  • 數組結構會致使更復雜和消耗更大
  • 咱們爲繁重的段遍歷提供更廉價的算法從而實現可以使得基本鏈表跑得更快。下圖提供一個說明:
     * Head nodes          Index nodes
     * +-+    right        +-+                      +-+
     * |2|---------------->| |--------------------->| |->null
     * +-+                 +-+                      +-+
     *  | down              |                        |
     *  v                   v                        v
     * +-+            +-+  +-+       +-+            +-+       +-+
     * |1|----------->| |->| |------>| |----------->| |------>| |->null
     * +-+            +-+  +-+       +-+            +-+       +-+
     *  v              |    |         |              |         |
     * Nodes  next     v    v         v              v         v
     * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
     * | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
     * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
這個基本的鏈表使用了一個HM鏈表算法的變形。
基本思想是,經過標記被刪除節點的下一個(next)節點從而避免在併發狀況下與插入操做的衝突。以及在遍歷時維持3元組(前驅,當前節點,後繼)從而探測是否須要斷開那些被刪除的節點。

區別因而用位標記的鏈表刪除(AtomicMarkedReference會速度慢而且內存吃緊),節點直接是用CAS操做下一個引用。在刪除時,取代使用標記的方法,咱們經過拼接另外一個節點從而代替標記引用(沒有使用任何域),使用節點從而獲得一個相似於「箱子「的標記實現,可是使用新的節點僅僅當節點須要被刪除時,不會爲每一次連接。這樣使用更少的內存和提供了更快的速度。甚至就算標記引用的方法在JVM層面提供了更好的支持,遍歷使用這個技術仍然更快,由於每一次搜索最多隻須要讀取更多的一個節點,相對於標記方法每讀取一個節點就須要多讀取一個標記域。

這個方法維持了HM算法的一個最重要的性質,經過改變一個節點的next域從而使得在它之上的任何CAS操做都會失敗,可是經過這個主意是經過改變這個節點的引用執行另外一個i節點,不是標記它。這樣能夠更加擠壓內存,經過定義沒有key/value的標記節點(它不須要額外的類型測試消耗)。這個標記節點是在遍歷過程當中極少遇到的而且會被垃圾回收得很是快(注意,這個技術不能在沒有垃圾回收的系統中工做良好)。

爲了使用刪除標記,這個鏈表使用null的方法來指示刪除,一種相似於延遲刪除的模型。若是一個節點的value爲null,那麼它被認爲是局部刪除,就算它仍是可達的。這樣須要組織合適的併發控制在代替vs刪除操做--一個代替操做必須失敗,若是刪除操做首先nulling,以及一個刪除操做必須返回刪除以前的值。(這個刪除是能夠和其餘方法併發的,若是其餘方法返回null說明不存在這個元素)


這裏有一個當節點刪除時的事件序列(b:前驅,n:當前節點,f:後繼),初始化:

            +------+       +------+      +------+
       ...  |   b  |------>|   n  |----->|   f  | ...
            +------+       +------+      +------+

     1  首先經過CAS操做讓n的value從non-null變爲null。如今沒有公共操做會認爲這個映射(n)會存在了。固然,其餘的不間斷的插入或者刪除操做仍是可能改變n的指向下一個的引用的。

     2  CAS操做n的next引用指向一個新的標記節點。如今沒有其餘節點可以被附加到n的後面。從而可以在基於CAS的鏈表中避免刪除錯誤。

           +------+       +------+      +------+       +------+
      ...  |   b  |------>|   n  |----->|marker|------>|   f  | ...
           +------+       +------+      +------+       +------+

     3  CAS操做b的next引用從而忽略了n和他的標記節點。如今,沒有新的遍歷會遭遇n,最後n和marker會被垃圾回收。

           +------+                                    +------+
      ...  |   b  |----------------------------------->|   f  | ...
           +------+                                    +------+

第一步的失敗會致使簡單的重試(由於另外一個操做而競爭失敗)。2、三兩步失敗是由於其餘線程在遍歷的過程當中注意到一個節點有null值,經過協做的方式幫忙marking或者unlinking了。這種協做的方式保證了沒有線程會由於執行刪除的線程尚未進展而卡住等待。這種標記節點的用法稍微複雜化了協做代碼,由於遍歷過程必須確保一直地讀取四個節點(b,n,marker,f),不是僅僅(b,n,f),當一個節點的next域指向了一個marker,它就不會改變。

跳錶的模型中增長了段,因此基礎遍歷開始於接近目的地--常常只須要遍歷不多的節點。不須要改變算法除了只要確保遍歷開始於前驅(here,b),沒有被刪除(結構上),不然在處理這個刪除以後重試。

段層級以鏈表的形式經過volatile的next來使用CAS操做。在段上的競爭會好比新增/刪除節點會致使連接失敗。就算這個發生時,段鏈表依然保持有序,從而能夠做爲劃分。這個會影響性能,可是跳錶自己就是依賴機率的,結果就是"p"值可能會小於虛值。這個競爭窗口會保持得足夠小,從而在實際上失敗是很是少的,甚至在大量競爭的狀況下。

由於使用了一些重試邏輯從而使得base和index鏈表的重試是比較廉價的。遍歷會在嘗試了大多數」協做「CAS操做以後執行。這個不是很是必要,可是隱含的價值是能夠幫助減小其餘下游的CAS失敗操做,從而好於從新開始的開銷。這樣惡化了壞狀況,可是改進了高度競爭的狀況。

區別於其餘的跳錶實現,段插入和刪除須要一個分開的遍歷過程在基本層面的動做以後,增長或者刪除段節點。這樣增長了一個線程的消耗,可是提高了多個線程的競爭性能,經過縮小干擾窗口,刪除使得全部index節點不可達在刪除操做返回後,從而也避免了垃圾回收。這個在這裏是很是重要的,由於咱們不可以直接把擁有key的節點直接去除,由於他們仍然可能被讀取。


段使用了保持良好性能的稀疏策略:初始的k=1,p=0.5意味着四分之一的節點有段中的下標。。。。。。這個期待的總共的空間比咱們的java.util.TreeMap稍微少點。

改變段的層級(這個相似樹結構的高度)使用CAS操做。初始化的高度爲1.當建立一個比當前層級高的段時會在頭上增長一個層級。爲了保持良好的性能,刪除方法中會使用啓發式的方法去下降層級,若是最高的層級上是空的。這樣可能出如今沒有層級的段上遭遇競爭。這樣不會形成多大傷害,實際上相對於沒有限制的提高層級,這是個更好的選擇。

實現這些的代碼比你想象的更詳細。大多數運算會涉及定位元素(或者插入元素的位置)。這些代碼不能被很是好的分塊,由於子運算須要立刻獲得前面運算的結果,不這樣作的話會增長GC的負擔。(這是又一個我但願JAVA提供宏的地方)findPredecessor()操做搜僅僅索段節點,返回最底層節點的前驅。findNode()操做完成最底層節點的搜索。這種方法一樣出現了一點代碼的複製。

爲了在線程之間參數隨機的值,咱們使用了JDK中的隨機支持(經過"secondary seed")。



實現:

讓咱們來看源代碼,首先是put方法:

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }

    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    if (n != b.next)               // inconsistent read
                        break;
                    if ((v = n.value) == null) {   // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n) // b is deleted
                        break;
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }

                z = new Node<K,V>(key, value, n);
                if (!b.casNext(n, z))
                    break;         // restart if lost race to append to b
                break outer;
            }
        }

        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { // try to grow by one level
                level = max + 1; // hold in array and later pick the one to use
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }
            // find insertion points and splice in
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    if (r != null) {
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        if (!q.link(r, t))
                            break; // restart
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }

                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

首先put方法直接調用doPut方法,這裏的關鍵加入了參數onlyIfAbsent,用於指明是否只在缺乏的狀況下添加。

因爲doPut方法比較長(超過100行),咱們把它分爲兩部分來看(詳情以下):

第一部分是outer嵌套的雙層循環:

  • 首先調用findPredecessor(稍後介紹),從而經過上層的Index取得底層的節點Node,b。
  • 取得next:n,在n不爲空的狀況下取得n.next:f,從而三元組:b/n/f。
  • 接着在n!=b.next、n.value==null、b.value=null、n.value=n的狀況下說明n或者b應該被刪除的,從而break重試。
  • 不然,若key要大於當前節點n的值,則重置b/n從而比較下一個節點。
  • 不然若是key等於當前節點n的值,則在onlyIfAbsent爲true時返回當前值,不然在CAS操做成功後返回當前值,不然重試。
  • 不然構造新節點z,試着添加到b的後面,而後break outer。
走完上面的過程,那麼要麼在key存在的狀況下完成了全部操做。要麼已經在最底層Node處添加了新節點z(key),那麼還須要第二部分的操做即在上層Index裏面添加一系列元素。
  • 經過ThreadLocalRandom.nextSecondarySeed()取得線程無關的隨機數。
  • 與0x80000001作&操做爲0的狀況下執行方法(極大機率,意思是說32位的rnd最高位和最低位不爲1)。
  • 用最低位連續1的個數來取得須要的層級數level。假如層級數小於等於當前head的level,則直接構造Index而且完成down方向上的構造,idx爲添加節點最高層的Index。
  • 不然,層級數大於當前level,那麼就使用層級數level+1,這個時候須要先構造層級數組以及相似以前的Index數組,而後再構造HeadIndex從而拼接添加層的Index。這裏使用CAS操做,最後成功置換head以後退出,一樣保持idx爲最高層的Index。
  • 接下來就要完成最後一步,試着在每一層添加Index:注意,加入當前線程完成置換head,那麼就只須要添加oldLevel個級別,不然須要添加level個級別。這個數值由level來記錄。
  • 這裏採起的方式爲:(q,r)爲當前層的可能將要被idx插入的節點,t用於標識idx的值。
  • insertionLevel爲開始插入的節點,當level抵達以前不會作link操做。因爲過程當中可能回遇到被刪除節點,因此這裏也使用了雙重循環for以及失敗重試的策略。
  • 最後當insertionLevel爲0時全部層的拼接都完成。(探測「刪除」---》節點前進---》拼接當前Index---》探測刪除---》拼接下一層)
  • 最終返回null。
因爲這個過程當中用到了findPredecessor和findNode(該方法是private控制符,僅用於內部操做),因此咱們接下來首先看這兩個。
    private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            for (Index<K,V> q = head, r = q.right, d;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }
實際上這裏的工做方式與doPut裏面作的事情相似(因此不展開詳細),它只是在發現刪除節點時試着刪除那一層的Index而且重試或者前進,最後返回了最下層的Node。

    private Node<K,V> findNode(Object key) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0)
                    return n;
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }
findNode的原理實際上也在doPut中出現過了,它會進行以下操做:
  • 調用findPredecessor方法尋找前驅。
  • 當發現目標節點時(c==0)直接返回節點n。
  • 若是已經key小於了當前節點,那麼就說明不存在,直接返回null。
也就是說,findPredecessor和findNode都有消除刪除節點的反作用,也是被這麼用的。

最後咱們來看刪除操做的源代碼:

    public V remove(Object key) {
        return doRemove(key, null);
    }
    final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    break outer;
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    findPredecessor(key, cmp);      // clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

這裏的remove操做調用doRemove操做,用第三個參數value來指示是否須要值匹配。
doRemove工做原理以下:
  • 首先根據findPredecessor來定位節點Node,而後相似其餘操做同樣處理刪除節點,當key不存在時跳出。
  • 當找到所須要的節點時,試着使用CAS操做將value變爲null,而後試着在其後添加一個標記節點(appendMarker)以及經過CAS操做講刪除節點和標記節點一同清除(假如之一失敗則經過findNode的方式協做清除)。
  • 成功後使用findPredecessor來清除屬於該節點的上層Index,而且假如此時層數過多則試着下降層數(tryReduceLevel)。
  • 而後返回被刪除的key所對應的值,若不存在則返回null。

背景(註釋):


一個併發的相似ConcurrentNavigableMap的實現。

這個map經過實現Comparable或者提供一個Comparator來實現排列的,經過構造函數來提供。

這個實現是一個SkipLists的併發版本而且爲containsKey/get/put/remove操做提供了log(n)的消耗。插入、刪除、更新和讀取能夠在多個線程之間安全併發。

Iterators和spliterators是弱兼容的。從小到大的key排列中的視圖比從大到小的要快。

全部從這些方法中返回的Map.Entry只是某時刻的一個快照。他們不提供setValue方法。(注意到你能夠經過改變映射經過使用put、putIfAbsent、replace方法,依賴於你本身想要的效果)

注意不像其餘大部分的集合那樣,size方法不是一個常量時間運算。由於這個map的異步特性,決定了這個map的元素個數必須經過遍歷獲得,因此在遍歷過程當中改變了map那麼就會獲得一個不精確的結果。而且,這些以大量數據位參數的方法像putAll、equals、toArray、containsValue以及clear不會保證以原子的方式執行。好比,一個遍歷操做和putAll操做併發,那麼只會看到部分添加的元素。

這個類和它的視圖還有迭代器實現了全部的可選的Map和Iterator接口的方法。像其餘的併發結合,這個類不支持把null做爲key或者value,由於null做爲返回值沒法區分是否缺乏元素。


算法(註釋):

這個類實現了一個相似於樹的二維跳錶,標識段經過連接包含不一樣數據的基本節點來展現。有兩個緣由說明爲什麼使用這種方法代替類數組的結構:

  • 數組結構會致使更復雜和消耗更大
  • 咱們爲繁重的段遍歷提供更廉價的算法從而實現可以使得基本鏈表跑得更快。下圖提供一個說明:
     * Head nodes          Index nodes
     * +-+    right        +-+                      +-+
     * |2|---------------->| |--------------------->| |->null
     * +-+                 +-+                      +-+
     *  | down              |                        |
     *  v                   v                        v
     * +-+            +-+  +-+       +-+            +-+       +-+
     * |1|----------->| |->| |------>| |----------->| |------>| |->null
     * +-+            +-+  +-+       +-+            +-+       +-+
     *  v              |    |         |              |         |
     * Nodes  next     v    v         v              v         v
     * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
     * | |->|A|->|B|->|C|->|D|->|E|->|F|->|G|->|H|->|I|->|J|->|K|->null
     * +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+  +-+
這個基本的鏈表使用了一個HM鏈表算法的變形。
基本思想是,經過標記被刪除節點的下一個(next)節點從而避免在併發狀況下與插入操做的衝突。以及在遍歷時維持3元組(前驅,當前節點,後繼)從而探測是否須要斷開那些被刪除的節點。

區別因而用位標記的鏈表刪除(AtomicMarkedReference會速度慢而且內存吃緊),節點直接是用CAS操做下一個引用。在刪除時,取代使用標記的方法,咱們經過拼接另外一個節點從而代替標記引用(沒有使用任何域),使用節點從而獲得一個相似於「箱子「的標記實現,可是使用新的節點僅僅當節點須要被刪除時,不會爲每一次連接。這樣使用更少的內存和提供了更快的速度。甚至就算標記引用的方法在JVM層面提供了更好的支持,遍歷使用這個技術仍然更快,由於每一次搜索最多隻須要讀取更多的一個節點,相對於標記方法每讀取一個節點就須要多讀取一個標記域。

這個方法維持了HM算法的一個最重要的性質,經過改變一個節點的next域從而使得在它之上的任何CAS操做都會失敗,可是經過這個主意是經過改變這個節點的引用執行另外一個i節點,不是標記它。這樣能夠更加擠壓內存,經過定義沒有key/value的標記節點(它不須要額外的類型測試消耗)。這個標記節點是在遍歷過程當中極少遇到的而且會被垃圾回收得很是快(注意,這個技術不能在沒有垃圾回收的系統中工做良好)。

爲了使用刪除標記,這個鏈表使用null的方法來指示刪除,一種相似於延遲刪除的模型。若是一個節點的value爲null,那麼它被認爲是局部刪除,就算它仍是可達的。這樣須要組織合適的併發控制在代替vs刪除操做--一個代替操做必須失敗,若是刪除操做首先nulling,以及一個刪除操做必須返回刪除以前的值。(這個刪除是能夠和其餘方法併發的,若是其餘方法返回null說明不存在這個元素)


這裏有一個當節點刪除時的事件序列(b:前驅,n:當前節點,f:後繼),初始化:

            +------+       +------+      +------+
       ...  |   b  |------>|   n  |----->|   f  | ...
            +------+       +------+      +------+

     1  首先經過CAS操做讓n的value從non-null變爲null。如今沒有公共操做會認爲這個映射(n)會存在了。固然,其餘的不間斷的插入或者刪除操做仍是可能改變n的指向下一個的引用的。

     2  CAS操做n的next引用指向一個新的標記節點。如今沒有其餘節點可以被附加到n的後面。從而可以在基於CAS的鏈表中避免刪除錯誤。

           +------+       +------+      +------+       +------+
      ...  |   b  |------>|   n  |----->|marker|------>|   f  | ...
           +------+       +------+      +------+       +------+

     3  CAS操做b的next引用從而忽略了n和他的標記節點。如今,沒有新的遍歷會遭遇n,最後n和marker會被垃圾回收。

           +------+                                    +------+
      ...  |   b  |----------------------------------->|   f  | ...
           +------+                                    +------+

第一步的失敗會致使簡單的重試(由於另外一個操做而競爭失敗)。2、三兩步失敗是由於其餘線程在遍歷的過程當中注意到一個節點有null值,經過協做的方式幫忙marking或者unlinking了。這種協做的方式保證了沒有線程會由於執行刪除的線程尚未進展而卡住等待。這種標記節點的用法稍微複雜化了協做代碼,由於遍歷過程必須確保一直地讀取四個節點(b,n,marker,f),不是僅僅(b,n,f),當一個節點的next域指向了一個marker,它就不會改變。

跳錶的模型中增長了段,因此基礎遍歷開始於接近目的地--常常只須要遍歷不多的節點。不須要改變算法除了只要確保遍歷開始於前驅(here,b),沒有被刪除(結構上),不然在處理這個刪除以後重試。

段層級以鏈表的形式經過volatile的next來使用CAS操做。在段上的競爭會好比新增/刪除節點會致使連接失敗。就算這個發生時,段鏈表依然保持有序,從而能夠做爲劃分。這個會影響性能,可是跳錶自己就是依賴機率的,結果就是"p"值可能會小於虛值。這個競爭窗口會保持得足夠小,從而在實際上失敗是很是少的,甚至在大量競爭的狀況下。

由於使用了一些重試邏輯從而使得base和index鏈表的重試是比較廉價的。遍歷會在嘗試了大多數」協做「CAS操做以後執行。這個不是很是必要,可是隱含的價值是能夠幫助減小其餘下游的CAS失敗操做,從而好於從新開始的開銷。這樣惡化了壞狀況,可是改進了高度競爭的狀況。

區別於其餘的跳錶實現,段插入和刪除須要一個分開的遍歷過程在基本層面的動做以後,增長或者刪除段節點。這樣增長了一個線程的消耗,可是提高了多個線程的競爭性能,經過縮小干擾窗口,刪除使得全部index節點不可達在刪除操做返回後,從而也避免了垃圾回收。這個在這裏是很是重要的,由於咱們不可以直接把擁有key的節點直接去除,由於他們仍然可能被讀取。


段使用了保持良好性能的稀疏策略:初始的k=1,p=0.5意味着四分之一的節點有段中的下標。。。。。。這個期待的總共的空間比咱們的java.util.TreeMap稍微少點。

改變段的層級(這個相似樹結構的高度)使用CAS操做。初始化的高度爲1.當建立一個比當前層級高的段時會在頭上增長一個層級。爲了保持良好的性能,刪除方法中會使用啓發式的方法去下降層級,若是最高的層級上是空的。這樣可能出如今沒有層級的段上遭遇競爭。這樣不會形成多大傷害,實際上相對於沒有限制的提高層級,這是個更好的選擇。

實現這些的代碼比你想象的更詳細。大多數運算會涉及定位元素(或者插入元素的位置)。這些代碼不能被很是好的分塊,由於子運算須要立刻獲得前面運算的結果,不這樣作的話會增長GC的負擔。(這是又一個我但願JAVA提供宏的地方)findPredecessor()操做搜僅僅索段節點,返回最底層節點的前驅。findNode()操做完成最底層節點的搜索。這種方法一樣出現了一點代碼的複製。

爲了在線程之間參數隨機的值,咱們使用了JDK中的隨機支持(經過"secondary seed")。



實現:

讓咱們來看源代碼,首先是put方法:

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }

    private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    if (n != b.next)               // inconsistent read
                        break;
                    if ((v = n.value) == null) {   // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n) // b is deleted
                        break;
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }

                z = new Node<K,V>(key, value, n);
                if (!b.casNext(n, z))
                    break;         // restart if lost race to append to b
                break outer;
            }
        }

        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { // try to grow by one level
                level = max + 1; // hold in array and later pick the one to use
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }
            // find insertion points and splice in
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    if (r != null) {
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        if (!q.link(r, t))
                            break; // restart
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }

                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

首先put方法直接調用doPut方法,這裏的關鍵加入了參數onlyIfAbsent,用於指明是否只在缺乏的狀況下添加。

因爲doPut方法比較長(超過100行),咱們把它分爲兩部分來看(詳情以下):

第一部分是outer嵌套的雙層循環:

  • 首先調用findPredecessor(稍後介紹),從而經過上層的Index取得底層的節點Node,b。
  • 取得next:n,在n不爲空的狀況下取得n.next:f,從而三元組:b/n/f。
  • 接着在n!=b.next、n.value==null、b.value=null、n.value=n的狀況下說明n或者b應該被刪除的,從而break重試。
  • 不然,若key要大於當前節點n的值,則重置b/n從而比較下一個節點。
  • 不然若是key等於當前節點n的值,則在onlyIfAbsent爲true時返回當前值,不然在CAS操做成功後返回當前值,不然重試。
  • 不然構造新節點z,試着添加到b的後面,而後break outer。
走完上面的過程,那麼要麼在key存在的狀況下完成了全部操做。要麼已經在最底層Node處添加了新節點z(key),那麼還須要第二部分的操做即在上層Index裏面添加一系列元素。
  • 經過ThreadLocalRandom.nextSecondarySeed()取得線程無關的隨機數。
  • 與0x80000001作&操做爲0的狀況下執行方法(極大機率,意思是說32位的rnd最高位和最低位不爲1)。
  • 用最低位連續1的個數來取得須要的層級數level。假如層級數小於等於當前head的level,則直接構造Index而且完成down方向上的構造,idx爲添加節點最高層的Index。
  • 不然,層級數大於當前level,那麼就使用層級數level+1,這個時候須要先構造層級數組以及相似以前的Index數組,而後再構造HeadIndex從而拼接添加層的Index。這裏使用CAS操做,最後成功置換head以後退出,一樣保持idx爲最高層的Index。
  • 接下來就要完成最後一步,試着在每一層添加Index:注意,加入當前線程完成置換head,那麼就只須要添加oldLevel個級別,不然須要添加level個級別。這個數值由level來記錄。
  • 這裏採起的方式爲:(q,r)爲當前層的可能將要被idx插入的節點,t用於標識idx的值。
  • insertionLevel爲開始插入的節點,當level抵達以前不會作link操做。因爲過程當中可能回遇到被刪除節點,因此這裏也使用了雙重循環for以及失敗重試的策略。
  • 最後當insertionLevel爲0時全部層的拼接都完成。(探測「刪除」---》節點前進---》拼接當前Index---》探測刪除---》拼接下一層)
  • 最終返回null。
因爲這個過程當中用到了findPredecessor和findNode(該方法是private控制符,僅用於內部操做),因此咱們接下來首先看這兩個。
    private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            for (Index<K,V> q = head, r = q.right, d;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    if (cpr(cmp, key, k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                if ((d = q.down) == null)
                    return q.node;
                q = d;
                r = d.right;
            }
        }
    }
實際上這裏的工做方式與doPut裏面作的事情相似(因此不展開詳細),它只是在發現刪除節點時試着刪除那一層的Index而且重試或者前進,最後返回了最下層的Node。

    private Node<K,V> findNode(Object key) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0)
                    return n;
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }
findNode的原理實際上也在doPut中出現過了,它會進行以下操做:
  • 調用findPredecessor方法尋找前驅。
  • 當發現目標節點時(c==0)直接返回節點n。
  • 若是已經key小於了當前節點,那麼就說明不存在,直接返回null。
也就是說,findPredecessor和findNode都有消除刪除節點的反作用,也是被這麼用的。

最後咱們來看刪除操做的源代碼:

    public V remove(Object key) {
        return doRemove(key, null);
    }
    final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    break outer;
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    findPredecessor(key, cmp);      // clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

這裏的remove操做調用doRemove操做,用第三個參數value來指示是否須要值匹配。
doRemove工做原理以下:
  • 首先根據findPredecessor來定位節點Node,而後相似其餘操做同樣處理刪除節點,當key不存在時跳出。
  • 當找到所須要的節點時,試着使用CAS操做將value變爲null,而後試着在其後添加一個標記節點(appendMarker)以及經過CAS操做講刪除節點和標記節點一同清除(假如之一失敗則經過findNode的方式協做清除)。
  • 成功後使用findPredecessor來清除屬於該節點的上層Index,而且假如此時層數過多則試着下降層數(tryReduceLevel)。
  • 而後返回被刪除的key所對應的值,若不存在則返回null。
相關文章
相關標籤/搜索