JDK源碼分析(12)之 ConcurrentHashMap 詳解

本文將主要講述 JDK1.8 版本 的 ConcurrentHashMap,其內部結構和不少的哈希優化算法,都是和 JDK1.8 版本的 HashMap是同樣的,因此在閱讀本文以前,必定要先了解 HashMap,能夠參考 HashMap 相關;另外 ConcurrentHashMap 中一樣有紅黑樹,這部分能夠先不看不影響總體結構把握,有興趣的能夠查看 紅黑樹html

1、ConcurrentHashMap 結構概述

1. 總體概述

CHM 的源碼有 6k 多行,包含的內容多,精巧,不容易理解;建議在查看源碼的時候,能夠首先把握總體結構脈絡,對於一些精巧的優化,哈希技巧能夠先了解目的就能夠了,不用深究;對總體把握比較清楚後,在逐步分析,能夠比較快速的看懂;java

JDK1.8 版本中的 CHM,和 JDK1.7 版本的差異很是大,在查看資料的時候要注意區分,1.7 中主要是使用 Segment 分段鎖 來解決併發問題的;而在 1.8 中則徹底沒有這些稍顯臃腫的結構,其結構基本和 HashMap 是同樣的,都是 數組 + 鏈表 + 紅黑樹,如圖所示:node

hashmap結構

其主要區別就在 CHM 支持併發:算法

  • 使用 Unsafe 方法操做數組內部元素,保證可見性;(U.getObjectVolatile、U.compareAndSwapObject、U.putObjectVolatile);
  • 在更新和移動節點的時候,直接鎖住對應的哈希桶,鎖粒度更小,且動態擴展;
  • 針對擴容慢操做進行優化,
    • 首先擴容過程的中,節點首先移動到過分表 nextTable ,全部節點移動完畢時替換散列表 table
    • 移動時先將散列表定長等分,而後逆序依次領取任務擴容,設置 sizeCtl 標記正在擴容;
    • 移動完成一個哈希桶或者遇到空桶時,將其標記爲 ForwardingNode 節點,並指向 nextTable
    • 後有其餘線程在操做哈希表時,遇到 ForwardingNode 節點,則先幫助擴容(繼續領取分段任務),擴容完成後再繼續以前的操做;
  • 優化哈希表計數器,採用 LongAdder、Striped64 相似思想;
  • 以及大量的哈希算法優化和狀態變量優化;

以上講的這些不太清楚也沒有關係,主要是有一個印象,大體清楚 CHM 的實現方向,具體細節後面還會結合源碼詳細講解;數組


2. 類定義和成員變量

public class ConcurrentHashMap<K,V> 
             extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable {
  private static final int MAXIMUM_CAPACITY = 1 << 30;       // 最大容量
  private static final int DEFAULT_CAPACITY = 16;            // 默認初始化容量
  private static final int DEFAULT_CONCURRENCY_LEVEL = 16;   // 併發級別,爲兼容1.7,實際未用
  private static final float LOAD_FACTOR = 0.75f;       // 固定負載係數,n - (n >>> 2)
  static final int TREEIFY_THRESHOLD = 8;               // 鏈表超過8時,轉爲紅黑樹
  static final int UNTREEIFY_THRESHOLD = 6;             // 紅黑樹低於6時,轉爲鏈表
  static final int MIN_TREEIFY_CAPACITY = 64;           // 樹化最小容量,容量小於64時,先擴容
  private static final int MIN_TRANSFER_STRIDE = 16;    // 擴容時拆分散列表,最小步長
  private static int RESIZE_STAMP_BITS = 16;            
  private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;  // 可參與擴容的最大線程
  
  static final int NCPU = Runtime.getRuntime().availableProcessors();  // CPU 數
  transient volatile Node<K,V>[] table;                // 散列表
  private transient volatile Node<K,V>[] nextTable;    // 擴容時的過分表
           
  private transient volatile int sizeCtl;              // 最重要的狀態變量,下面詳講
  private transient volatile int transferIndex;        // 擴容進度指示
  
  private transient volatile long baseCount;              // 計數器,基礎基數
  private transient volatile int cellsBusy;               // 計數器,併發標記
  private transient volatile CounterCell[] counterCells;  // 計數器,併發累計
  
  public ConcurrentHashMap() { }
  
  public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
      throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
           MAXIMUM_CAPACITY :
           tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));  // 注意這裏不是0.75,後面介紹
    this.sizeCtl = cap;
  }
  
  public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
  }
  
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, 1);
  }
  
  public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 
      throw new IllegalArgumentException();
    if (initialCapacity < concurrencyLevel)   // Use at least as many bins
      initialCapacity = concurrencyLevel;   // as estimated threads
    long size = (long)(1.0 + (long)initialCapacity / loadFactor);   // 注意這裏的初始化
    int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size);
    this.sizeCtl = cap;
  }
  ... 
}

上面有幾個重要的地方這裏單獨講:緩存

LOAD_FACTOR:多線程

這裏的負載係數,同 HashMap 等其餘 Map 的係數有明顯區別:併發


sizeCtl:

sizeCtl 是 CHM 中最重要的狀態變量,其中包括不少中狀態,這裏先總體介紹幫助後面源碼理解;

  • sizeCtl = 0 :初始值,還未指定初始容量;

  • sizeCtl > 0 :

    • table 未初始化,表示初始化容量;
    • table 已初始化,表示擴容閾值(0.75n);
  • sizeCtl = -1 :表示正在初始化;

  • sizeCtl < -1 :表示正在擴容,具體結構如圖所示:

    chm-sizectl

計算代碼以下:

/*
 * n=64
 * Integer.numberOfLeadingZeros(n)=26
 * resizeStamp(64) = 0001 1010 | 1000 0000 0000 0000 = 1000 0000 0001 1010
 */
static final int resizeStamp(int n) {
  return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

因此 resizeStamp(64) << RESIZE_STAMP_SHIFT) + 2 ,表示擴容目標爲 64,有一個線程正在擴容;


3. Node 節點

static class Node<K,V> implements Map.Entry<K,V> {  // 哈希表普通節點
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
  
  Node<K,V> find(int h, Object k) {}   // 主要在擴容時,利用多態查詢已轉移節點
}

static final class ForwardingNode<K,V> extends Node<K,V> {  // 標識擴容節點
  final Node<K,V>[] nextTable;  // 指向成員變量 ConcurrentHashMap.nextTable
  
  ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);  // hash = -1,快速肯定 ForwardingNode 節點
    this.nextTable = tab;
  }
  
  Node<K,V> find(int h, Object k) {}
}

static final class TreeBin<K,V> extends Node<K,V> { // 紅黑樹根節點
  TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);  // hash = -2,快速肯定紅黑樹,
    ...
  }
}  
static final class TreeNode<K,V> extends Node<K,V> { } // 紅黑樹普通節點,其 hash 同 Node 普通節點 > 0;


4. 哈希計算

static final int MOVED     = -1;          // hash for forwarding nodes
static final int TREEBIN   = -2;          // hash for roots of trees
static final int RESERVED  = -3;          // hash for transient reservations
static final int HASH_BITS = 0x7fffffff;  // usable bits of normal node hash

// 讓高位16位,參與哈希桶定位運算的同時,保證 hash 爲正
static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

除此以外還有,

  • tableSizeFor : 將容量轉爲大於n,且最小的2的冪;
  • 除留餘數法 :hash % length = hash & (length-1)
  • 擴容後哈希桶定位:(e.hash & oldCap),0 - 位置不變,1 - 原來的位置 + oldCap;

以上這些哈希優化的具體原理,都在以前的博客講過了,就不在重複了,HashMap 相關

5. 哈希桶可見性

咱們都知道一個數組即便聲明爲 volatile,也只能保證這個數組引用自己的可見性,其內部元素的可見性是沒法保證的,若是每次都加鎖,則效率必然大大下降,在 CHM 中則使用 Unsafe 方法來保證:

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
  return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
  U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}


2、源碼解析

1. initTable 方法

private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
  while ((tab = table) == null || tab.length == 0) {
    if ((sc = sizeCtl) < 0) Thread.yield();  // 有其餘線程在初始化
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  // 設置狀態 -1
      try {
        if ((tab = table) == null || tab.length == 0) {
          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;  // 注意此時的 sizeCtl 表示初始容量,完畢後表示擴容閾值
          @SuppressWarnings("unchecked")
          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
          table = tab = nt;
          sc = n - (n >>> 2);  // 同 0.75n
        }
      } finally {
        sizeCtl = sc;  // 注意這裏沒有 CAS 更新,這就是狀態變量的高明瞭,由於前面設置了 -1,此時這裏沒有競爭
      }
      break;
    }
  }
  return tab;
}


2. get 方法

get 方法可能看代碼不是很長,可是他卻能 保證無鎖狀態下的內存一致性 ,他的每一句代碼都要仔細理解,多設想一下若是發生競爭會怎樣,如此纔能有所得;

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  int h = spread(key.hashCode()); // 計算 hash
  if ((tab = table) != null && (n = tab.length) > 0 &&  // 確保 table 已經初始化
    
    // 確保對應的哈希桶不爲空,注意這裏是 Volatile 語義獲取;由於擴容的時候,是徹底拷貝,因此只要不爲空,則鏈表必然完整
    (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    }
    
    // hash < 0,則必然在擴容,原來位置的節點可能所有移動到 i + oldCap 位置,因此利用多態到 nextTable 中查找
    else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null;
    
    while ((e = e.next) != null) { // 遍歷鏈表
      if (e.hash == h &&
        ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
  return null;
}


3. putVal 方法

注意 CHM 的 key 和 value 都不能爲空

final V putVal(K key, V value, boolean onlyIfAbsent) {
  if (key == null || value == null) throw new NullPointerException();
  int hash = spread(key.hashCode());  // hash 計算
  int binCount = 0;                   // 狀態變量,主要表示查找鏈表節點數,最後判斷是否轉爲紅黑樹
  for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0) tab = initTable();  // 初始化
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {      // cas 獲取哈希桶
      if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas 更新,失敗時繼續循環更新
        break;  // no lock when adding to empty bin
    }
    else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f);  // 正在擴容的時候,先幫助擴容
    else {
      V oldVal = null;
      synchronized (f) {  // 注意這裏只鎖定了一個哈希桶,因此比 1.7 中的 Segment 分段鎖 粒度更低
        if (tabAt(tab, i) == f) {  // 確認該哈希桶是否已經移動
          if (fh >= 0) {   // hash >=0 則必然是普通節點,直接遍歷鏈表便可
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
              K ek;
              if(e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 查找成功
                oldVal = e.val;
                if (!onlyIfAbsent) e.val = value;
                break;
              }
              Node<K,V> pred = e;
              if ((e = e.next) == null) {  // 查找失敗時,直接在末尾添加新節點
                pred.next = new Node<K,V>(hash, key, value, null);
                break;
              }
            }
          }
          else if (f instanceof TreeBin) {  // 樹根節點
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {  // 紅黑樹查找
              oldVal = p.val;
              if (!onlyIfAbsent) p.val = value;
            }
          }
        }
      }
      if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); // 若是鏈表長度大於8,轉爲紅黑樹
        if (oldVal != null)
          return oldVal;
        break;
      }
    }
  }
  addCount(1L, binCount); // 計數加一,注意這裏使用的是計數器,普通的 Atomic 變量仍然可能稱爲性能瓶頸;
  return null;
}

其具體流程如圖所示:

chmput


4. 擴容

擴容操做一直都是比較慢的操做,而 CHM 中巧妙的利用任務劃分,使得多個線程可能同時參與擴容;另外擴容條件也有兩個:

  • 有鏈表長度超過 8,可是容量小於 64 的時候,發生擴容;
  • 節點數超過閾值的時候,發生擴容;

其擴容的過程可描述爲:

  • 首先擴容過程的中,節點首先移動到過分表 nextTable ,全部節點移動完畢時替換散列表 table
  • 移動時先將散列表定長等分,而後逆序依次領取任務擴容,設置 sizeCtl 標記正在擴容;
  • 移動完成一個哈希桶或者遇到空桶時,將其標記爲 ForwardingNode 節點,並指向 nextTable
  • 後有其餘線程在操做哈希表時,遇到 ForwardingNode 節點,則先幫助擴容(繼續領取分段任務),擴容完成後再繼續以前的操做;

圖形化表示以下:

chmput

源碼分析以下:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  int n = tab.length, stride;
  if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // 根據 CPU 數量計算任務步長
  if (nextTab == null) {          // 初始化 nextTab
    try {
      @SuppressWarnings("unchecked")
      Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];  // 擴容一倍
      nextTab = nt;
    } catch (Throwable ex) {
      sizeCtl = Integer.MAX_VALUE; // 發生 OOM 時,再也不擴容
      return;
    }
    nextTable = nextTab;
    transferIndex = n;
  }
  int nextn = nextTab.length;
  ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);  // 標記空桶,或已經轉移完畢的桶
  boolean advance = true;
  boolean finishing = false; // to ensure sweep before committing nextTab
  for (int i = 0, bound = 0;;) {  // 逆向遍歷擴容
    Node<K,V> f; int fh;
    while (advance) {  // 向前獲取哈希桶
      int nextIndex, nextBound;
      if (--i >= bound || finishing)               // 已經取到哈希桶,或已完成時退出
        advance = false;
      else if ((nextIndex = transferIndex) <= 0) { // 遍歷到達頭節點,已經沒有待遷移的桶,線程準備退出
        i = -1;
        advance = false;
      }
      else if (U.compareAndSwapInt
           (this, TRANSFERINDEX, nextIndex,
            nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {  // 當前任務完成,領取下一批哈希桶
        bound = nextBound;
        i = nextIndex - 1;  // 索引指向下一批哈希桶
        advance = false;
      }
    }
    
    // i < 0  :表示擴容結束,已經沒有待移動的哈希桶
    // i >= n :擴容結束,再次檢查確認
    // i + n >= nextn : 在使用 nextTable 替換 table 時,有線程進入擴容就會出現
    if (i < 0 || i >= n || i + n >= nextn) { // 完成擴容準備退出
      int sc;
      if (finishing) {  // 兩次檢查,只有最後一個擴容線程退出時,才更新變量
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1); // 0.75*2*n
        return;
      }
      if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  // 擴容線程減一
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return;  // 不是最後一個線程,直接退出
        finishing = advance = true;   // 最後一個線程,再次檢查
        i = n;                        // recheck before commit
      }
    }
    else if ((f = tabAt(tab, i)) == null)  // 當前節點爲空,直接標記爲 ForwardingNode,而後繼續獲取下一個桶
      advance = casTabAt(tab, i, null, fwd);
    
    // 以前的線程已經完成該桶的移動,直接跳過,正常狀況下本身的任務區間,不會出現 ForwardingNode 節點,
    else if ((fh = f.hash) == MOVED)  // 此處爲極端條件下的健壯性檢查
      advance = true; // already processed
    
    // 開始處理鏈表
    else {
      // 注意在 get 的時候,能夠無鎖獲取,是由於擴容是全拷貝節點,完成後最後在更新哈希桶
      // 而在 put 的時候,是直接將節點加入尾部,獲取修改其中的值,此時若是容許 put 操做,最後就會發生髒讀,
      // 因此 put 和 transfer,須要競爭同一把鎖,也就是對應的哈希桶,以保證內存一致性效果
      synchronized (f) { 
        if (tabAt(tab, i) == f) {  // 確認鎖定的是同一個桶
          Node<K,V> ln, hn;
          if (fh >= 0) {  // 正常節點
            int runBit = fh & n;  // hash & n,判斷擴容後的索引
            Node<K,V> lastRun = f;
            
            // 此處找到鏈表最後擴容後處於同一位置的連續節點,這樣最後一節就不用再一次複製了
            for (Node<K,V> p = f.next; p != null; p = p.next) {
              int b = p.hash & n;
              if (b != runBit) {
                runBit = b;
                lastRun = p;
              }
            }
            if (runBit == 0) {
              ln = lastRun;
              hn = null;
            }
            else {
              hn = lastRun;
              ln = null;
            }
            
            // 依次將鏈表拆分紅,lo、hi 兩條鏈表,即位置不變的鏈表,和位置 + oldCap 的鏈表
            // 注意最後一節鏈表沒有new,而是直接使用原來的節點
            // 同時鏈表的順序也被打亂了,lastRun 到最後爲正序,前面一節爲逆序
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
              int ph = p.hash; K pk = p.key; V pv = p.val;
              if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
              else
                hn = new Node<K,V>(ph, pk, pv, hn);
            }
            setTabAt(nextTab, i, ln);      // 插入 lo 鏈表
            setTabAt(nextTab, i + n, hn);  // 插入 hi 鏈表
            setTabAt(tab, i, fwd);         // 哈希桶移動完成,標記爲 ForwardingNode 節點
            advance = true;                // 繼續獲取下一個桶
          }
          else if (f instanceof TreeBin) { // 拆分成黑樹
            TreeBin<K,V> t = (TreeBin<K,V>)f;
            TreeNode<K,V> lo = null, loTail = null; // 爲避免最後在反向遍歷,先留頭結點的引用,
            TreeNode<K,V> hi = null, hiTail = null; // 由於順序的鏈表,能夠加速紅黑樹構造
            int lc = 0, hc = 0;  // 一樣記錄 lo,hi 鏈表的長度
            for (Node<K,V> e = t.first; e != null; e = e.next) {  // 中序遍歷紅黑樹
              int h = e.hash;
              TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);  // 構造紅黑樹節點
              if ((h & n) == 0) {
                if ((p.prev = loTail) == null)
                  lo = p;
                else
                  loTail.next = p;
                loTail = p;
                ++lc;
              }
              else {
                if ((p.prev = hiTail) == null)
                  hi = p;
                else
                  hiTail.next = p;
                hiTail = p;
                ++hc;
              }
            }
            
            // 判斷是否須要將其轉化爲紅黑樹,同時若是隻有一條鏈,那麼就能夠不用在構造
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
            setTabAt(nextTab, i, ln);
            setTabAt(nextTab, i + n, hn);
            setTabAt(tab, i, fwd);
            advance = true;
          }
        }
      }
    }
  }
}

還有其餘相關方法不是很複雜,就不詳細講了,好比 tryPresize,helpTransfer,addCount


5. 計數器

當獲取 Map.size 的時候,若是使用 Atomic 變量,很容易致使過分競爭,產生性能瓶頸,因此 CHM 中使用了,計數器的方式:

public int size() {
  long n = sumCount();
  return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}
private transient volatile CounterCell[] counterCells;  // 計數器

@sun.misc.Contended static final class CounterCell {  // @sun.misc.Contended 避免僞緩存
  volatile long value;
  CounterCell(long x) { value = x; }
}

final long sumCount() {
  CounterCell[] as = counterCells; CounterCell a;
  long sum = baseCount;
  if (as != null) {
    for (int i = 0; i < as.length; ++i) {  // 累計計數
      if ((a = as[i]) != null)
        sum += a.value;
    }
  }
  return sum;
}

具體細節還比較多,以後在單獨開一篇博客詳細講解;


總結

  • 首先 JDK1.8 的 CHM,沒有使用 Segment 分段鎖,而是直接鎖定單個哈希桶
  • 對數組中的哈希桶使用 CAS 操做,保證其可見性
  • 對擴容是用,任務拆分,多線程同時擴容的方式,加速擴容
  • 對 size 使用計數器思想
  • CHM 中對狀態變量的應用,使得不少操做都得以無所化進行
相關文章
相關標籤/搜索