ConcurrentHashMap源碼深度分析

一、基本變量

// Unsafe mechanics
//Unsafe類是用來作cas操做的,都是native方法,代碼由C++實現,下面的變量表示對應變量的偏移
//例如:public final native boolean compareAndSwapInt(Object o, long offset,
                                              int expected,
                                              int x);
// 表示對象o偏移位置offset的變量若是和指望值expected相等,把變量值設置爲x

private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

一、table初始化

一、table會延遲到第一次put時初始化,同過使用循環+CAS的套路,能夠保證一次只有一個線程會初始化table。
二、在table爲空的時候若是sizeCtl小於0,則說明已經有線程開始初始化了,其它線程經過Thread.yield()讓出CPU時間片,等待table非空便可。
三、不然使用CAS將sizeCtl的值換爲-1,置換成功則初始化table。
四、注意table的大小爲sizeCtl,初始化後將sizeCtl的值設爲n - (n >>> 2)即0.75n,這個值用來肯定是否須要爲table擴容。java

//Initializes table, using the size recorded in sizeCtl.
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //判斷是否已經有線程在初始化,若是有則讓出CPU,以後繼續自旋判斷
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
         //cas操做設置sizeCtl的值
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                //繼續判斷雙重檢查
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //初始化table
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);//設置sizeCtl=0.75n
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

二、put操做(putVal方法)

一、首先計算key的hash值
二、判斷table是否爲空,若是是就初始化
三、根據hash值取餘肯定桶的位置,並判斷桶是否爲空,若是是空,經過cas操做設置進去
四、若是桶的第一個節點非空,而且hash=MOVED,說明有線程正在進行擴容,調用helpTransfer幫助擴容
五、對桶加鎖並判斷節點是鏈表仍是樹,根據不一樣狀況插入節點
六、判斷是否達到鏈表轉樹的閾值
七、統計節點數數組

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //一、首先計算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //二、判斷table是否爲空,若是是就初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
            //三、根據hash值取餘肯定桶的位置,並判斷桶是否爲空,若是是空
            //經過cas操做設置進去
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //四、若是桶的第一個節點非空,而且hash=MOVED,說明有線程正在進行擴容,
        //調用helpTransfer幫助擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            //五、對桶加鎖並判斷節點是鏈表仍是樹,根據不一樣狀況插入節點
            synchronized (f) {
                if (tabAt(tab, i) == f) {//保證多線程環境下的安全性,再一次檢查
                    if (fh >= 0) {//hash值大於0,說明是一個鏈表
                        binCount = 1;//記錄鏈表的元素個數
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //判斷key是否已在鏈表中,若是是直接替換value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            //不然說明是一個新Key,插入表尾
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //不是鏈表節點就是樹節點,調用樹結點插入方法
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //六、判斷是否達到鏈表轉樹的閾值
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //七、統計節點數
    addCount(1L, binCount);
    return null;
}

三、鏈表轉樹

一、判斷鏈表節點是否小於64,若是是優先使用擴容
二、對桶加鎖
三、依次把鏈表節點集合轉成樹結點集合
四、把樹結點集合轉成紅黑樹,這顆紅黑樹是鏈表和樹結構的結合,它一樣經過next引用串成了一個鏈表安全

private final void treeifyBin(Node<K,V>[] tab, int index) {//將tab[index]鏈表轉樹
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //一、判斷鏈表節點是否小於64,若是是優先使用擴容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
            //爲了多線程環境下的安全性,再次判斷該桶是否爲空,hash>0表示是鏈表
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {//二、對桶加鎖
                if (tabAt(tab, index) == b) {//加鎖後再次判斷
                    TreeNode<K,V> hd = null, tl = null;
                    //三、依次把鏈表節點集合轉成樹結點
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //四、把樹結點集合轉成紅黑樹
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

四、擴容

一、因爲擴容容許併發操做,首先經過CPU數量,計算容許每一個線程能處理的桶的數量stride,最少每一個線程處理16個
二、申請nextTable數組
三、把nextTab設置爲ForwardingNode,表示正在進行擴容操做
四、每一個線程進來的線程去獲取本身能處理區域,每一個線程經過cas操做設置transferIndex變量,設置成功則線程得到table中(transferIndex, transferIndex-strade)這一段的處理權限
相似下圖這樣的,反向分配的,先來的線程從後面獲取桶段
線程獲取桶處理
五、循環依次處理每個桶:
5.1 若是當前桶首節點是鏈表,把鏈表節點從新分配到兩個桶,一部分在原位置i,一分部在i+n位置
5.2 若是是紅黑樹,這裏有點意思,一樣是經過一個for像鏈表同樣遍歷,分紅兩部分,一個將放在i位置,一個將放在i+n位置。多線程

  • 那麼問題來了,這裏既然是樹,爲何能夠像鏈表同樣操做呢?併發

    這裏是做者設計很巧妙的地方,佩服Doug lea大神,樹的遍歷很是不方便,而鏈表遍歷很方便。TreeNode自己繼承了Node,有一個next指針。當把鏈表節點數超過8,轉成紅黑樹的時候,next指針不變,鏈表結構沒有破壞, 因此這顆樹自己是一個樹和鏈表的結合體,既能夠當鏈表用又能夠當樹用(這個設計有點相似LinkedHashMap,即時一個hashMap又是一個鏈表)。
    相似下圖這樣的結構
    紅黑樹與鏈表
    這裏被分紅兩個部分後,再根據這兩個部分是否達到了樹轉列表的閾UNTREEIFY_THRESHOLD=6肯定是轉成鏈表仍是組織成紅黑樹。
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
      int n = tab.length, stride;
      //取CPU的數量,肯定每一個線程遷移的Node的數量,確保不會少於MIN_TRANSFER_STRIDE=16個
      if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
          stride = MIN_TRANSFER_STRIDE; // subdivide range
      //申請數組nextTable
      if (nextTab == null) {            // initiating
          try {
              @SuppressWarnings("unchecked")
              Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
              nextTab = nt;
          } catch (Throwable ex) {      // try to cope with OOME
              sizeCtl = Integer.MAX_VALUE;
              return;
          }
          nextTable = nextTab;
          transferIndex = n;
      }
      int nextn = nextTab.length;
      //把nextTab設置爲ForwardingNode,表示正在進行擴容操做
      ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
      boolean advance = true;
      boolean finishing = false; // to ensure sweep before committing nextTab
      for (int i = 0, bound = 0;;) {
          Node<K,V> f; int fh;
          while (advance) {
              int nextIndex, nextBound;
              if (--i >= bound || finishing)
                  advance = false;
              else if ((nextIndex = transferIndex) <= 0) {
                  i = -1;
                  advance = false;
              }
              /**
              nextIndex從n開始
               這裏每一個線程進來的時候經過cas操做設置transferIndex變量,
               若是成功表示該線程處理nextIndex-stride這一段桶的遷移
               即 i=[nextIndex-1, nextIndex-stride] 這一段桶
              */
            //不相等,說明不到最後一個線程,直接退出transfer方法
              else if (U.compareAndSwapInt
                       (this, TRANSFERINDEX, nextIndex,
                        nextBound = (nextIndex > stride ?
                                     nextIndex - stride : 0))) {
                  bound = nextBound;
                  i = nextIndex - 1;
                  advance = false;
              }
          }
          //判斷是否處理完成全部的桶
          if (i < 0 || i >= n || i + n >= nextn) {
              int sc;
              if (finishing) {
                  nextTable = null;
                  table = nextTab;
                  sizeCtl = (n << 1) - (n >>> 1);
                  return;
              }
              if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                  if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                      return;
                  finishing = advance = true;
                  i = n; // recheck before commit
              }
          }
          else if ((f = tabAt(tab, i)) == null)
              advance = casTabAt(tab, i, null, fwd);
          else if ((fh = f.hash) == MOVED)
              advance = true; // already processed
          else {
              synchronized (f) {
                  if (tabAt(tab, i) == f) {
                      Node<K,V> ln, hn;
                      //hash>0表示鏈表,鏈表節點從新分配到兩個桶,一部分在原位置i,一分部在i+n位置
                      if (fh >= 0) {
                          int runBit = fh & n;
                          Node<K,V> lastRun = f;
                          //找到低位和高位鏈表的各自的尾節點,配合後面用頭插法
                          for (Node<K,V> p = f.next; p != null; p = p.next) {
                              int b = p.hash & n;
                              if (b != runBit) {
                                  runBit = b;
                                  lastRun = p;
                              }
                          }
                          if (runBit == 0) {
                              ln = lastRun;
                              hn = null;
                          }
                          else {
                              hn = lastRun;
                              ln = null;
                          }
                          //循環將鏈表拆分紅兩個鏈表,頭插法
                          for (Node<K,V> p = f; p != lastRun; p = p.next) {
                              int ph = p.hash; K pk = p.key; V pv = p.val;
                              if ((ph & n) == 0)
                                  ln = new Node<K,V>(ph, pk, pv, ln);
                              else
                                  hn = new Node<K,V>(ph, pk, pv, hn);
                          }
                          //將拆分後的列表經過cas操做設置爲對應的桶中
                          setTabAt(nextTab, i, ln);
                          setTabAt(nextTab, i + n, hn);
                          setTabAt(tab, i, fwd);
                          advance = true;
                      }
                      //該桶存儲的是紅黑樹
                      else if (f instanceof TreeBin) {
                          TreeBin<K,V> t = (TreeBin<K,V>)f;
                          TreeNode<K,V> lo = null, loTail = null;
                          TreeNode<K,V> hi = null, hiTail = null;
                          int lc = 0, hc = 0;
                          //這裏的代碼比較有意思
                          //因爲這裏的紅黑樹是一個樹和鏈表的結合體,TreeNode同時也是一個Node
                          //因此這裏能夠像鏈表同樣遍歷,使用這種結構是爲了方便遍歷
                          //依次取出樹的節點,根據節點hash的結果分到地位和高位鏈表
                          //後面把鏈表轉成樹,同時會保持鏈表的結構
                          for (Node<K,V> e = t.first; e != null; e = e.next) {
                              int h = e.hash;
                              TreeNode<K,V> p = new TreeNode<K,V>
                                  (h, e.key, e.val, null, null);
                              if ((h & n) == 0) {
                                  if ((p.prev = loTail) == null)
                                      lo = p;
                                  else
                                      loTail.next = p;
                                  loTail = p;
                                  ++lc;
                              }
                              else {
                                  if ((p.prev = hiTail) == null)
                                      hi = p;
                                  else
                                      hiTail.next = p;
                                  hiTail = p;
                                  ++hc;
                              }
                          }
                          //把樹拆分紅了低位(放在位置i)和高位(位置i+n)鏈表,這裏判斷兩個鏈表長度是否達到轉成樹的閾值
                          ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                              (hc != 0) ? new TreeBin<K,V>(lo) : t;
                          hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                              (lc != 0) ? new TreeBin<K,V>(hi) : t;
                          //拆分後的結構設置到相應的桶中
                          setTabAt(nextTab, i, ln);
                          setTabAt(nextTab, i + n, hn);
                          setTabAt(tab, i, fwd);
                          advance = true;
                      }
                  }
              }
          }
      }
    }

五、統計元素數量

一、簡單經過CouterCell數組的和,因爲可能有其餘線程在進行增刪操做,這個數據並非精確的。ide

final long sumCount() {
   CounterCell[] as = counterCells; CounterCell a;
   long sum = baseCount;
   if (as != null) {
       for (int i = 0; i < as.length; ++i) {
           if ((a = as[i]) != null)
               sum += a.value;
       }
   }
   return sum;
}
相關文章
相關標籤/搜索