J.U.C 之 ConcurrentSkipListMap

到目前爲止,咱們在Java世界裏看到了兩種實現key-value的數據結構:Hash、TreeMap,這兩種數據結構各自都有着優缺點。java

  1. Hash表:插入、查找最快,爲O(1);如使用鏈表實現則可實現無鎖;數據有序化須要顯式的排序操做。
  2. 紅黑樹:插入、查找爲O(logn),但常數項較小;無鎖實現的複雜性很高,通常須要加鎖;數據自然有序。 然而,此次介紹第三種實現key-value的數據結構:SkipList。SkipList有着不低於紅黑樹的效率,可是其原理和實現的複雜度要比紅黑樹簡單多了。

SkipList

什麼是SkipList?Skip List ,稱之爲跳錶,它是一種能夠替代平衡樹的數據結構,其數據元素默認按照key值升序,自然有序。Skip list讓已排序的數據分佈在多層鏈表中,以0-1隨機數決定一個數據的向上攀升與否,經過「空間來換取時間」的一個算法,在每一個節點中增長了向前的指針,在插入、刪除、查找時能夠忽略一些不可能涉及到的結點,從而提升了效率。node

咱們先看一個簡單的鏈表,以下:算法

若是咱們須要查詢九、2一、30,則須要比較次數爲3 + 6 + 8 = 17 次,那麼有沒有優化方案呢?有!咱們將該鏈表中的某些元素提煉出來做爲一個比較「索引」,以下:數組

咱們先與這些索引進行比較來決定下一個元素是往右仍是下走,因爲存在「索引」的緣故,致使在檢索的時候會大大減小比較的次數。固然元素不是不少,很難體現出優點,當元素足夠多的時候,這種索引結構就會大顯身手。安全

SkipList的特性

SkipList具有以下特性:數據結構

  1. 由不少層結構組成,level是經過必定的機率隨機產生的
  2. 每一層都是一個有序的鏈表,默認是升序,也能夠根據建立映射時所提供的Comparator進行排序,具體取決於使用的構造方法
  3. 最底層(Level 1)的鏈表包含全部元素
  4. 若是一個元素出如今Level i 的鏈表中,則它在Level i 之下的鏈表也都會出現
  5. 每一個節點包含兩個指針,一個指向同一鏈表中的下一個元素,一個指向下面一層的元素

咱們將上圖再作一些擴展就能夠變成一個典型的SkipList結構了併發

SkipList的查找

SkipListd的查找算法較爲簡單,對於上面咱們咱們要查找元素21,其過程以下:app

  1. 比較3,大於3,日後找(9),
  2. 比9大,繼續日後找(25),可是比25小,則從9的下一層開始找(16)
  3. 16的後面節點依然爲25,則繼續從16的下一層找
  4. 找到21

紅色虛線表明路徑。dom

SkipList的插入

SkipList的插入操做主要包括:函數

  1. 查找合適的位置。這裏須要明確一點就是在確認新節點要佔據的層次K時,採用丟硬幣的方式,徹底隨機。若是佔據的層次K大於鏈表的層次,則從新申請新的層,不然插入指定層次
  2. 申請新的節點
  3. 調整指針

假定咱們要插入的元素爲23,通過查找能夠確認她是位於25前,九、1六、21後。固然須要考慮申請的層次K。

若是層次K > 3

須要申請新層次(Level 4)

若是層次 K = 2

直接在Level 2 層插入便可

這裏會涉及到以個算法:經過丟硬幣決定層次K,該算法咱們經過後面ConcurrentSkipListMap源碼來分析。還有一個須要注意的地方就是,在K層插入元素後,須要確保全部小於K層的層次都應該出現新節點。

SkipList的刪除

刪除節點和插入節點思路基本一致:找到節點,刪除節點,調整指針。

好比刪除節點9,以下:

ConcurrentSkipListMap

經過上面咱們知道SkipList採用空間換時間的算法,其插入和查找的效率O(logn),其效率不低於紅黑樹,可是其原理和實現的複雜度要比紅黑樹簡單多了。通常來講會操做鏈表List,就會對SkipList毫無壓力。

ConcurrentSkipListMap其內部採用SkipLis數據結構實現。爲了實現SkipList,ConcurrentSkipListMap提供了三個內部類來構建這樣的鏈表結構:Node、Index、HeadIndex。其中Node表示最底層的單鏈表有序節點、Index表示爲基於Node的索引層,HeadIndex用來維護索引層次。到這裏咱們能夠這樣說ConcurrentSkipListMap是經過HeadIndex維護索引層次,經過Index從最上層開始往下層查找,一步一步縮小查詢範圍,最後到達最底層Node時,就只須要比較很小一部分數據了。在JDK中的關係以下圖:

Node

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile ConcurrentSkipListMap.Node<K, V> next;

    /** 省略些許代碼 */
}
複製代碼

Node的結構和通常的單鏈表毫無區別,key-value和一個指向下一個節點的next。

Index

static class Index<K,V> {
    final ConcurrentSkipListMap.Node<K,V> node;
    final ConcurrentSkipListMap.Index<K,V> down;
    volatile ConcurrentSkipListMap.Index<K,V> right;

    /** 省略些許代碼 */
}
複製代碼

Index提供了一個基於Node節點的索引Node,一個指向下一個Index的right,一個指向下層的down節點。

HeadIndex

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;  //索引層,從1開始,Node單鏈表層爲0
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}
複製代碼

HeadIndex內部就一個level來定義層級。

ConcurrentSkipListMap提供了四個構造函數,每一個構造函數都會調用initialize()方法進行初始化工做。

final void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
    head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
            null, null, 1);
}
複製代碼

注意,initialize()方法不只僅只在構造函數中被調用,如clone,clear、readObject時都會調用該方法進行初始化步驟。這裏須要注意randomSeed的初始化。

private transient int randomSeed;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
複製代碼

randomSeed一個簡單的隨機數生成器(在後面介紹)。

put操做

CoucurrentSkipListMap提供了put()方法用於將指定值與此映射中的指定鍵關聯。源碼以下:

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}
複製代碼

首先判斷value若是爲null,則拋出NullPointerException,不然調用doPut方法,其實若是各位看過JDK的源碼的話,應該對這樣的操做很熟悉了,JDK源碼裏面不少方法都是先作一些必要性的驗證後,而後經過調用do**()方法進行真正的操做。

doPut()方法內容較多,咱們分步分析。

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    // 比較器
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {

        /** 省略代碼 */
複製代碼

doPut()方法有三個參數,除了key,value外還有一個boolean類型的onlyIfAbsent,該參數做用與若是存在當前key時,該作何動做。當onlyIfAbsent爲false時,替換value,爲true時,則返回該value。用代碼解釋爲:

if (!map.containsKey(key))
    return map.put(key, value);
else
     return map.get(key);
複製代碼

首先判斷key是否爲null,若是爲null,則拋出NullPointerException,從這裏咱們能夠確認ConcurrentSkipList是不支持key或者value爲null的。而後調用findPredecessor()方法,傳入key來確認位置。findPredecessor()方法其實就是確認key要插入的位置。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
     if (key == null)
         throw new NullPointerException(); // don't postpone errors
     for (;;) {
         // 從head節點開始,head是level最高級別的headIndex
         for (Index<K,V> q = head, r = q.right, d;;) {

             // r != null,表示該節點右邊還有節點,須要比較
             if (r != null) {
                 Node<K,V> n = r.node;
                 K k = n.key;
                 // value == null,表示該節點已經被刪除了
                 // 經過unlink()方法過濾掉該節點
                 if (n.value == null) {
                     //刪掉r節點
                     if (!q.unlink(r))
                         break;           // restart
                     r = q.right;         // reread r
                     continue;
                 }

                 // value != null,節點存在
                 // 若是key 大於r節點的key 則往前進一步
                 if (cpr(cmp, key, k) > 0) {
                     q = r;
                     r = r.right;
                     continue;
                 }
             }

             // 到達最右邊,若是dowm == null,表示指針已經達到最下層了,直接返回該節點
             if ((d = q.down) == null)
                 return q.node;
             q = d;
             r = d.right;
         }
     }
 }
複製代碼

findPredecessor()方法意思很是明確:尋找前輩。從最高層的headIndex開始向右一步一步比較,直到right爲null或者右邊節點的Node的key大於當前key爲止,而後再向下尋找,依次重複該過程,直到down爲null爲止,即找到了前輩,看返回的結果注意是Node,不是Item,因此插入的位置應該是最底層的Node鏈表。

在這個過程當中ConcurrentSkipListMap賦予了該方法一個其餘的功能,就是經過判斷節點的value是否爲null,若是爲null,表示該節點已經被刪除了,經過調用unlink()方法刪除該節點。

final boolean unlink(Index<K,V> succ) {
    return node.value != null && casRight(succ, succ.right);
}
複製代碼

刪除節點過程很是簡單,更改下right指針便可。

經過findPredecessor()找到前輩節點後,作什麼呢?看下面:

for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
       // 前輩節點的next != null
       if (n != null) {
           Object v; int c;
           Node<K,V> f = n.next;

           // 不一致讀,主要緣由是併發,有節點捷足先登
           if (n != b.next)               // inconsistent read
               break;

           // n.value == null,該節點已經被刪除了
           if ((v = n.value) == null) {   // n is deleted
               n.helpDelete(b, f);
               break;
           }

           // 前輩節點b已經被刪除
           if (b.value == null || v == n) // b is deleted
               break;

           // 節點大於,往前移
           if ((c = cpr(cmp, key, n.key)) > 0) {
               b = n;
               n = f;
               continue;
           }

           // c == 0 表示,找到一個key相等的節點,根據onlyIfAbsent參數來作判斷
           // onlyIfAbsent ==false,則經過casValue,替換value
           // onlyIfAbsent == true,返回該value
           if (c == 0) {
               if (onlyIfAbsent || n.casValue(v, value)) {
                   @SuppressWarnings("unchecked") V vv = (V)v;
                   return vv;
               }
               break; // restart if lost race to replace value
           }
           // else c < 0; fall through
       }

       // 將key-value包裝成一個node,插入
       z = new Node<K,V>(key, value, n);
       if (!b.casNext(n, z))
           break;         // restart if lost race to append to b
       break outer;
   }
複製代碼

找到合適的位置後,就是在該位置插入節點咯。插入節點的過程比較簡單,就是將key-value包裝成一個Node,而後經過casNext()方法加入到鏈表當中。固然是插入以前須要進行一系列的校驗工做。

在最下層插入節點後,下一步工做是什麼?新建索引。前面博主提過,在插入節點的時候,會根據採用拋硬幣的方式來決定新節點所插入的層次,因爲存在併發的可能,ConcurrentSkipListMap採用ThreadLocalRandom來生成隨機數。以下:

int rnd = ThreadLocalRandom.nextSecondarySeed();
複製代碼

拋硬幣決定層次的思想很簡單,就是經過拋硬幣若是硬幣爲正面則層次level + 1 ,不然中止,以下:

// 拋硬幣決定層次
while (((rnd >>>= 1) & 1) != 0)
    ++level;
複製代碼

在闡述SkipList插入節點的時候說明了,決定的層次level會分爲兩種狀況進行處理,一是若是層次level大於最大的層次話則須要新增一層,不然就在相應層次以及小於該level的層次進行節點新增處理。

level <= headIndex.level

// 若是決定的層次level比最高層次head.level小,直接生成最高層次的index
// 因爲須要確認每一層次的down,因此須要從最下層依次往上生成
if (level <= (max = h.level)) {
    for (int i = 1; i <= level; ++i)
        idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
}
複製代碼

從底層開始,小於level的每一層都初始化一個index,每次的node都指向新加入的node,down指向下一層的item,右側next所有爲null。整個處理過程很是簡單:爲小於level的每一層初始化一個index,而後加入到原來的index鏈條中去。

level > headIndex.level

// leve > head.level 則新增一層
 else { // try to grow by one level
     // 新增一層
     level = max + 1;

     // 初始化 level個item節點
     @SuppressWarnings("unchecked")
     ConcurrentSkipListMap.Index<K,V>[] idxs =
             (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
     for (int i = 1; i <= level; ++i)
         idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

     //
     for (;;) {
         h = head;
         int oldLevel = h.level;
         // 層次擴大了,須要從新開始(有新線程節點加入)
         if (level <= oldLevel) // lost race to add level
             break;
         // 新的頭結點HeadIndex
         ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
         ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
         // 生成新的HeadIndex節點,該HeadIndex指向新增層次
         for (int j = oldLevel+1; j <= level; ++j)
             newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

         // HeadIndex CAS替換
         if (casHead(h, newh)) {
             h = newh;
             idx = idxs[level = oldLevel];
             break;
         }
     }
複製代碼

當拋硬幣決定的level大於最大層次level時,須要新增一層進行處理。處理邏輯以下:

  1. 初始化一個對應的index數組,大小爲level + 1,而後爲每一個單位都建立一個index,箇中參數爲:Node爲新增的Z,down爲下一層index,right爲null
  2. 經過for循環來進行擴容操做。從最高層進行處理,新增一個HeadIndex,箇中參數:節點Node,down都爲最高層的Node和HeadIndex,right爲剛剛建立的對應層次的index,level爲相對應的層次level。最後經過CAS把當前的head與新加入層的head進行替換。 經過上面步驟咱們發現,儘管已經找到了前輩節點,也將node插入了,也肯定肯定了層次並生成了相應的Index,可是並無將這些Index插入到相應的層次當中,因此下面的代碼就是將index插入到相對應的層當中。
// 從插入的層次level開始
  splice: for (int insertionLevel = level;;) {
      int j = h.level;
      // 從headIndex開始
      for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
          if (q == null || t == null)
              break splice;

          // r != null;這裏是找到相應層次的插入節點位置,注意這裏只橫向找
          if (r != null) {
              ConcurrentSkipListMap.Node<K,V> n = r.node;

              int c = cpr(cmp, key, n.key);

              // n.value == null ,解除關係,r右移
              if (n.value == null) {
                  if (!q.unlink(r))
                      break;
                  r = q.right;
                  continue;
              }

              // key > n.key 右移
              if (c > 0) {
                  q = r;
                  r = r.right;
                  continue;
              }
          }

          // 上面找到節點要插入的位置,這裏就插入
          // 當前層是最頂層
          if (j == insertionLevel) {
              // 創建聯繫
              if (!q.link(r, t))
                  break; // restart
              if (t.node.value == null) {
                  findNode(key);
                  break splice;
              }
              // 標誌的插入層 -- ,若是== 0 ,表示已經到底了,插入完畢,退出循環
              if (--insertionLevel == 0)
                  break splice;
          }

          // 上面節點已經插入完畢了,插入下一個節點
          if (--j >= insertionLevel && j < level)
              t = t.down;
          q = q.down;
          r = q.right;
      }
  }
複製代碼

這段代碼分爲兩部分看,一部分是找到相應層次的該節點插入的位置,第二部分在該位置插入,而後下移。

至此,ConcurrentSkipListMap的put操做到此就結束了。代碼量有點兒多,這裏總結下:

  1. 首先經過findPredecessor()方法找到前輩節點Node
  2. 根據返回的前輩節點以及key-value,新建Node節點,同時經過CAS設置next
  3. 設置節點Node,再設置索引節點。採起拋硬幣方式決定層次,若是所決定的層次大於現存的最大層次,則新增一層,而後新建一個Item鏈表。
  4. 最後,將新建的Item鏈表插入到SkipList結構中。

get操做

相比於put操做 ,get操做會簡單不少,其過程其實就只至關於put操做的第一步:

private V doGet(Object key) {
      if (key == null)
          throw new NullPointerException();
      Comparator<? super K> cmp = comparator;
      outer: for (;;) {
          for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
              Object v; int c;
              if (n == null)
                  break outer;
              ConcurrentSkipListMap.Node<K,V> f = n.next;
              if (n != b.next)                // inconsistent read
                  break;
              if ((v = n.value) == null) {    // n is deleted
                  n.helpDelete(b, f);
                  break;
              }
              if (b.value == null || v == n)  // b is deleted
                  break;
              if ((c = cpr(cmp, key, n.key)) == 0) {
                  @SuppressWarnings("unchecked") V vv = (V)v;
                  return vv;
              }
              if (c < 0)
                  break outer;
              b = n;
              n = f;
          }
      }
      return null;
  }
複製代碼

與put操做第一步類似,首先調用findPredecessor()方法找到前輩節點,而後順着right一直往右找便可,同時在這個過程當中一樣承擔了一個刪除value爲null的節點的職責。

remove操做

remove操做爲刪除指定key節點,以下:

public V remove(Object key) {
    return doRemove(key, null);
}
複製代碼

直接調用doRemove()方法,這裏remove有兩個參數,一個是key,另一個是value,因此doRemove方法即提供remove key,也提供同時知足key-value。

final V doRemove(Object key, Object value) {
       if (key == null)
           throw new NullPointerException();
       Comparator<? super K> cmp = comparator;
       outer: for (;;) {
           for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
               Object v; int c;
               if (n == null)
                   break outer;
               ConcurrentSkipListMap.Node<K,V> f = n.next;

               // 不一致讀,從新開始
               if (n != b.next)                    // inconsistent read
                   break;

               // n節點已刪除
               if ((v = n.value) == null) {        // n is deleted
                   n.helpDelete(b, f);
                   break;
               }

               // b節點已刪除
               if (b.value == null || v == n)      // b is deleted
                   break;

               if ((c = cpr(cmp, key, n.key)) < 0)
                   break outer;

               // 右移
               if (c > 0) {
                   b = n;
                   n = f;
                   continue;
               }

               /* * 找到節點 */

               // value != null 表示須要同時校驗key-value值
               if (value != null && !value.equals(v))
                   break outer;

               // CAS替換value
               if (!n.casValue(v, null))
                   break;
               if (!n.appendMarker(f) || !b.casNext(n, f))
                   findNode(key);                  // retry via findNode
               else {
                   // 清理節點
                   findPredecessor(key, cmp);      // clean index

                   // head.right == null表示該層已經沒有節點,刪掉該層
                   if (head.right == null)
                       tryReduceLevel();
               }
               @SuppressWarnings("unchecked") V vv = (V)v;
               return vv;
           }
       }
       return null;
   }
複製代碼

調用findPredecessor()方法找到前輩節點,而後經過右移,而後比較,找到後利用CAS把value替換爲null,而後判斷該節點是否是這層惟一的index,若是是的話,調用tryReduceLevel()方法把這層幹掉,完成刪除。

其實從這裏能夠看出,remove方法僅僅是把Node的value設置null,並無真正刪除該節點Node,其實從上面的put操做、get操做咱們能夠看出,他們在尋找節點的時候都會判斷節點的value是否爲null,若是爲null,則調用unLink()方法取消關聯關係,以下:

if (n.value == null) {
    if (!q.unlink(r))
        break;           // restart
    r = q.right;         // reread r
    continue;
}
複製代碼

size操做

ConcurrentSkipListMap的size()操做和ConcurrentHashMap不一樣,它並無維護一個全局變量來統計元素的個數,因此每次調用該方法的時候都須要去遍歷。

public int size() {
    long count = 0;
    for (Node<K,V> n = findFirst(); n != null; n = n.next) {
        if (n.getValidValue() != null)
            ++count;
    }
    return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}
複製代碼

調用findFirst()方法找到第一個Node,而後利用node的next去統計。最後返回統計數據,最多能返回Integer.MAX_VALUE。注意這裏在線程併發下是安全的。

相關文章
相關標籤/搜索