我是這樣給阿里面試官吹 ConcurrentHashMap的

由於上篇文章HashMap已經講解的很詳細了,所以此篇文章會簡單介紹思路,再學習併發HashMap就簡單不少了,上一篇文章中咱們最終知道HashMap是線程不安全的,所以在老版本JDK中提供了HashTable來實現多線程級別的,改變之處重要有如下幾點。java

  1. HashTableput, get, remove等方法是經過 synchronized來修飾保證其線程安全性的。
  2. HashTable是 不容許key跟value爲null的。
  3. 問題是 synchronized是個關鍵字級別的重量鎖,在get數據的時候任何寫入操做都不容許。相對來講性能很差。所以目前主要用的 ConcurrentHashMap來保證線程安全性。

ConcurrentHashMap主要分爲JDK<=7跟JDK>=8的兩個版本,ConcurrentHashMap的空間利用率更低通常只有10%~20%,接下來分別介紹。node

JDK7

先宏觀說下JDK7中的大體組成,ConcurrentHashMap由Segment數組結構和HashEntry數組組成。Segment是一種可重入鎖,是一種數組和鏈表的結構,一個Segment中包含一個HashEntry數組,每一個HashEntry又是一個鏈表結構。正是經過Segment分段鎖,ConcurrentHashMap實現了高效率的併發。缺點是併發程度是有segment數組來決定的,併發度一旦初始化沒法擴容。先繪製個ConcurrentHashMap的形象直觀圖。要想理解currentHashMap,能夠簡單的理解爲將數據「分表分庫」ConcurrentHashMap是由 Segment 數組 結構和HashEntry 數組 結構組成。web

  • Segment 是一種可重入鎖 ReentrantLock的子類 ,在 ConcurrentHashMap 裏扮演鎖的角色, HashEntry則用於存儲鍵值對數據。
  • ConcurrentHashMap 裏包含一個 Segment 數組來實現鎖分離, Segment的結構和 HashMap 相似,一個 Segment裏包含一個 HashEntry 數組,每一個 HashEntry 是一個鏈表結構的元素, 每一個 Segment守護者一個 HashEntry 數組裏的元素,當對 HashEntry數組的數據進行修改時,必須首先得到它對應的 Segment 鎖。
  1. 咱們先看下segment類:
static final class Segment<K,Vextends ReentrantLock implements Serializable {
     transient volatile HashEntry<K,V>[] table; //包含一個HashMap 能夠理解爲
}

能夠理解爲咱們的每一個segment都是實現了Lock功能的HashMap。若是咱們同時有多個segment造成了segment數組那咱們就能夠實現併發咯。算法

咱們看下 currentHashMap的構造函數,先總結幾點。
    1. 每個segment裏面包含的table(HashEntry數組)初始化大小也必定是2的次冪
    2. 這裏設置了若干個用於位計算的參數。
    3. initialCapacity:初始容量大小 ,默認16。
    4. loadFactor: 擴容因子,默認0.75,當一個Segment存儲的元素數量大於initialCapacity* loadFactor時,該Segment會進行一次擴容。
    5. concurrencyLevel:併發度,默認16。併發度能夠理解爲程序運行時可以 「同時更新」ConccurentHashMap且不產生鎖競爭的最大線程數,實際上就是ConcurrentHashMap中的 分段鎖個數,即Segment[]的數組長度。若是併發度設置的太小,會帶來嚴重的鎖競爭問題;若是併發度設置的過大,本來位於同一個Segment內的訪問會擴散到不一樣的Segment中,CPU cache命中率會降低,從而引發程序性能降低。
    6. segment的數組大小最終必定是2的次冪

構造函數詳解:數組

   //initialCapacity 是咱們保存因此KV數據的初始值
   //loadFactor這個就是HashMap的負載因子
   // 咱們segment數組的初始化大小
      @SuppressWarnings("unchecked")
       public ConcurrentHashMap(int initialCapacity,
                                float loadFactor, int concurrencyLevel)
 
{
           if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
               throw new IllegalArgumentException();
           if (concurrencyLevel > MAX_SEGMENTS) // 最大容許segment的個數,不能超過 1< 24
               concurrencyLevel = MAX_SEGMENTS;
           int sshift = 0// 相似擾動函數
           int ssize = 1
           while (ssize < concurrencyLevel) {
               ++sshift;
               ssize <<= 1// 確保segment必定是2次冪
           }
           this.segmentShift = 32 - sshift;  
           //有點相似與擾動函數,跟下面的參數配合使用實現 當前元素落到那個segment上面。
           this.segmentMask = ssize - 1// 爲了 取模 專用
           if (initialCapacity > MAXIMUM_CAPACITY) //不能大於 1< 30
               initialCapacity = MAXIMUM_CAPACITY;
   
           int c = initialCapacity / ssize; //總的數組大小 被 segment 分散後 須要多少個table
           if (c * ssize < initialCapacity)
               ++c; //確保向上取值
           int cap = MIN_SEGMENT_TABLE_CAPACITY; 
           // 每一個table初始化大小爲2
           while (cap < c) // 單獨的一個segment[i] 對應的table 容量大小。
               cap <<= 1;
           // 將table的容量初始化爲2的次冪
           Segment<K,V> s0 =
               new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
               // 負載因子,閾值,每一個segment的初始化大小。跟hashmap 初始值相似。
               // 而且segment的初始化是懶加載模式,剛開始只有一個s0,其他的在須要的時候纔會增長。
           Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
           UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
           this.segments = ss;
       }
  1. hash 無論是咱們的get操做仍是put操做要須要經過hash來對數據進行定位。
   //  總體思想就是經過屢次不一樣方式的位運算來努力將數據均勻的分不到目標table中,都是些擾動函數
   private int hash(Object k) {
       int h = hashSeed;
       if ((0 != h) && (k instanceof String)) {
           return sun.misc.Hashing.stringHash32((String) k);
       }
       h ^= k.hashCode();
       // single-word Wang/Jenkins hash.
       h += (h <<  15) ^ 0xffffcd7d;
       h ^= (h >>> 10);
       h += (h <<   3);
       h ^= (h >>>  6);
       h += (h <<   2) + (h << 14);
       return h ^ (h >>> 16);
   }
  1. get 相對來講比較簡單,無非就是經過 hash找到對應的 segment,繼續經過 hash找到對應的 table,而後就是遍歷這個鏈表看是否能夠找到,而且要注意 get的時候是沒有加鎖的。
   public V get(Object key) {
       Segment<K,V> s;
       HashEntry<K,V>[] tab;
       int h = hash(key); // JDK7中標準的hash值獲取算法
       long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // hash值如何映射到對應的segment上
       if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
           //  無非就是得到hash值對應的segment 是否存在,
           for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                    (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                e != null; e = e.next) {
               // 看下這個hash值對應的是segment(HashEntry)中的具體位置。而後遍歷查詢該鏈表
               K k;
               if ((k = e.key)  key || (e.hash  h && key.equals(k)))
                   return e.value;
           }
       }
       return null;
   }
  1. put 相同的思路,先找到 hash值對應的 segment位置,而後看該 segment位置是否初始化了(由於segment是懶加載模式)。選擇性初始化,最終執行put操做。
   @SuppressWarnings("unchecked")
   public V put(K key, V value) {
       Segment<K,V> s;
       if (value  null)
           throw new NullPointerException();
       int hash = hash(key);// 仍是得到最終hash值
       int j = (hash >>> segmentShift) & segmentMask; // hash值位操做對應的segment數組位置
       if ((s = (Segment<K,V>)UNSAFE.getObject          
            (segments, (j << SSHIFT) + SBASE))  null)
           s = ensureSegment(j); 
       // 初始化時候由於只有第一個segment,若是落在了其他的segment中 則須要現初始化。
       return s.put(key, hash, value, false);
       // 直接在數據中執行put操做。
   }

其中put操做基本思路跟HashMap幾乎同樣,只是在開始跟結束進行了加鎖的操做tryLock and unlock,而後JDK7中都是先擴容再添加數據的,而且得到不到鎖也會進行自旋的tryLock或者lock阻塞排隊進行等待(同時得到鎖前提早new出新數據)。安全

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 在往該 segment 寫入前,須要先獲取該 segment 的獨佔鎖,獲取失敗嘗試獲取自旋鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 內部的數組
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求應該放置的數組下標
        int index = (tab.length - 1) & hash;
        // first 是數組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
 
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key)  key ||
                    (e.hash  hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 繼續順着鏈表走
                e = e.next;
            }
            else {
                // node 是否是 null,這個要看獲取鎖的過程。沒得到鎖的線程幫咱們建立好了節點,直接頭插法
                // 若是不爲 null,那就直接將它設置爲鏈表表頭;若是是 null,初始化並設置爲鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
 
                int c = count + 1;
                // 若是超過了該 segment 的閾值,這個 segment 須要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 擴容
                else
                    // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
                    // 將新的結點設置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}

若是加鎖失敗了調用scanAndLockForPut,完成查找或新建節點的工做。當獲取到鎖後直接將該節點加入鏈表便可,「提高」了put操做的性能,這裏涉及到自旋。大體過程:微信

  1. 在我獲取不到鎖的時候我進行tryLock,準備好new的數據,同時還有必定的次數限制,還要考慮別的已經得到線程的節點修改該頭節點。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    int retries = -1// negative while locating node
 
    // 循環獲取鎖
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            if (e  null) {
                if (node  null// speculatively create node
              // 進到這裏說明數組該位置的鏈表是空的,沒有任何元素
             // 固然,進到這裏的另外一個緣由是 tryLock() 失敗,因此該槽存在併發,不必定是該位置
                    node = new HashEntry<K,V>(hash, key, value, null);
                retries = 0;
            }
            else if (key.equals(e.key))
                retries = 0;
            else
                // 順着鏈表往下走
                e = e.next;
        }
    // 重試次數若是超過 MAX_SCAN_RETRIES(單核 1 次多核 64 次),那麼不搶了,進入到阻塞隊列等待鎖
    //    lock() 是阻塞方法,直到獲取鎖後返回
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        else if ((retries & 1)  0 &&
                 // 進入這裏,說明有新的元素進到了鏈表,而且成爲了新的表頭
                 // 這邊的策略是,從新執行 scanAndLockForPut 方法
                 (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}
  1. Size多線程

    這個size方法比較有趣,他是先無鎖的統計下全部的數據量看下先後兩次是否數據同樣,若是同樣則返回數據,若是不同則要把所有的segment進行加鎖,統計,解鎖。而且size方法只是返回一個統計性的數字,所以size謹慎使用哦。併發

public int size() {
       // Try a few times to get accurate count. On failure due to
       // continuous async changes in table, resort to locking.
       final Segment<K,V>[] segments = this.segments;
       int size;
       boolean overflow; // true if size overflows 32 bits
       long sum;         // sum of modCounts
       long last = 0L;   // previous sum
       int retries = -1// first iteration isn't retry
       try {
           for (;;) {
               if (retries++  RETRIES_BEFORE_LOCK) {  //  超過2次則所有加鎖
                   for (int j = 0; j < segments.length; ++j)
                       ensureSegment(j).lock(); // 直接對所有segment加鎖消耗性太大
               }
               sum = 0L;
               size = 0;
               overflow = false;
               for (int j = 0; j < segments.length; ++j) {
                   Segment<K,V> seg = segmentAt(segments, j);
                   if (seg != null) {
                       sum += seg.modCount; // 統計的是modCount,涉及到增刪該都會加1
                       int c = seg.count;
                       if (c < 0 || (size += c) < 0)
                           overflow = true;
                   }
               }
               if (sum  last) // 每個先後的修改次數同樣 則認爲同樣,但凡是有一個不同則直接break。
                   break;
               last = sum;
           }
       } finally {
           if (retries > RETRIES_BEFORE_LOCK) {
               for (int j = 0; j < segments.length; ++j)
                   segmentAt(segments, j).unlock();
           }
       }
       return overflow ? Integer.MAX_VALUE : size;
   }
  1. rehash segment 數組初始化後就不可變了,也就是說 「併發性不可變」,不過 segment裏的 table能夠擴容爲2倍,該方法沒有考慮併發,由於執行該方法以前已經獲取了鎖。其中JDK7中的 rehash思路跟JDK8 中擴容後處理鏈表的思路同樣,我的不過感受沒有8寫的精髓好看。
// 方法參數上的 node 是此次擴容後,須要添加到新的數組中的數據。
private void rehash(HashEntry<K,V> node) {
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    // 2 倍
    int newCapacity = oldCapacity << 1;
    threshold = (int)(newCapacity * loadFactor);
    // 建立新數組
    HashEntry<K,V>[] newTable =
        (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 新的掩碼,如從 16 擴容到 32,那麼 sizeMask 爲 31,對應二進制 ‘000...00011111’
    int sizeMask = newCapacity - 1;
    // 遍歷原數組,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置
    for (int i = 0; i < oldCapacity ; i++) {
        // e 是鏈表的第一個元素
        HashEntry<K,V> e = oldTable[i];
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 計算應該放置在新數組中的位置,
            // 假設原數組長度爲 16,e 在 oldTable[3] 處,那麼 idx 只多是 3 或者是 3 + 16 = 19
            int idx = e.hash & sizeMask; // 新位置
            if (next  null)   // 該位置處只有一個元素
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                // e 是鏈表表頭
                HashEntry<K,V> lastRun = e;
                // idx 是當前鏈表的頭結點 e 的新位置
                int lastIdx = idx;
                // for 循環找到一個 lastRun 結點,這個結點以後的全部元素是將要放到一塊兒的
                for (HashEntry<K,V> last = next;
                     last != null;
                     last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 將 lastRun 及其以後的全部結點組成的這個鏈表放到 lastIdx 這個位置
                newTable[lastIdx] = lastRun;
                // 下面的操做是處理 lastRun 以前的結點,
                //這些結點可能分配在另外一個鏈表中,也可能分配到上面的那個鏈表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}
  1. CAS操做 在JDK7裏在 ConcurrentHashMap中經過原子操做 sun.misc.Unsafe查找元素、替換元素和設置元素。經過這樣的硬件級別得到數據能夠保證及時是多線程我也每次得到的數據是最新的。這些原子操做起着很是關鍵的做用,你能夠在全部 ConcurrentHashMap的基本功能中看到,隨機距離以下:
     final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }
    static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
        return (tab  null) ? null :
            (HashEntry<K,V>) UNSAFE.getObjectVolatile
            (tab, ((long)i << TSHIFT) + TBASE);
    }
   static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
                                       HashEntry<K,V> e)
 
{
        UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
    }

常見問題

  1. ConcurrentHashMap實現原理是怎麼樣的或者ConcurrentHashMap如何在保證高併發下線程安全的同時實現了性能提高?

ConcurrentHashMap容許多個修改操做併發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不一樣部分進行的修改。內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的HashTable,只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。app

  1. 在高併發下的狀況下如何保證取得的元素是最新的?

用於存儲鍵值對數據的HashEntry,在設計上它的成員變量value跟next都是volatile類型的,這樣就保證別的線程對value值的修改,get方法能夠立刻看到。

ConcurrentHashMap的弱一致性體如今迭代器,clear和get方法,緣由在於沒有加鎖。

    1. 好比迭代器在遍歷數據的時候是一個Segment一個Segment去遍歷的,若是在遍歷完一個Segment時正好有一個線程在剛遍歷完的Segment上插入數據,就會體現出不一致性。clear也是同樣。
    2. get方法和containsKey方法都是遍歷對應索引位上全部節點,都是不加鎖來判斷的,若是是修改性質的由於可見性的存在能夠直接得到最新值,不過若是是新添加值則沒法保持一致性。

JDK8

JDK8相比與JDK7主要區別以下:

  1. 取消了segment數組,直接用table保存數據,鎖的粒度更小,減小併發衝突的機率。採用table數組元素做爲鎖,從而實現了對每一行數據進行加鎖,進一步減小併發衝突的機率,併發控制使用Synchronized和CAS來操做。
  2. 存儲數據時採用了數組+ 鏈表+紅黑樹的形式。
  1. CurrentHashMap重要參數:

private static final int MAXIMUM_CAPACITY = 1 << 30; // 數組的最大值 

private static final int DEFAULT_CAPACITY = 16; // 默認數組長度 

static final int TREEIFY_THRESHOLD = 8; // 鏈表轉紅黑樹的一個條件 

static final int UNTREEIFY_THRESHOLD = 6; // 紅黑樹轉鏈表的一個條件 

static final int MIN_TREEIFY_CAPACITY = 64; // 鏈表轉紅黑樹的另外一個條件

static final int MOVED     = -1;  // 表示正在擴容轉移 

static final int TREEBIN   = -2; // 表示已經轉換成樹 

static final int RESERVED  = -3; // hash for transient reservations 

static final int HASH_BITS = 0x7fffffff; // 得到hash值的輔助參數

transient volatile Node<K,V>[] table;// 默認沒初始化的數組,用來保存元素 

private transient volatile Node<K,V>[] nextTable; // 轉移的時候用的數組 

static final int NCPU = Runtime.getRuntime().availableProcessors();// 獲取可用的CPU個數 

private transient volatile Node<K,V>[] nextTable; // 鏈接表,用於哈希表擴容,擴容完成後會被重置爲 null 

private transient volatile long baseCount;保存着整個哈希表中存儲的全部的結點的個數總和,有點相似於 HashMap 的 size 屬性。private transient volatile int sizeCtl

負數:表示進行初始化或者擴容,-1:表示正在初始化,-N:表示有 N-1 個線程正在進行擴容 正數:0 表示尚未被初始化,> 0的數:初始化或者是下一次進行擴容的閾值,有點相似HashMap中的threshold,不過功能「更強大」

  1. 若干重要類
  • 構成每一個元素的基本類 Node
      static class Node<K,Vimplements Map.Entry<K,V{
              final int hash;    // key的hash值
              final K key;       // key
              volatile V val;    // value
              volatile Node<K,V> next; 
               //表示鏈表中的下一個節點
      }
  • TreeNode繼承於Node,用來存儲紅黑樹節點
      static final class TreeNode<K,Vextends Node<K,V{
              TreeNode<K,V> parent;  
              // 紅黑樹的父親節點
              TreeNode<K,V> left;
              // 左節點
              TreeNode<K,V> right;
             // 右節點
              TreeNode<K,V> prev;    
             // 前節點
              boolean red;
             // 是否爲紅點
      }
  • ForwardingNode 在 Node 的子類 ForwardingNode 的構造方法中,能夠看到此變量的hash = 「-1」 ,類中還存儲 nextTable的引用。該初始化方法只在 transfer方法被調用,若是一個類被設置成此種狀況而且hash = -1 則說明該節點不須要resize了。
static final class ForwardingNode<K,Vextends Node<K,V{
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //注意這裏
            super(MOVED, nullnullnull);
            this.nextTable = tab;
        }
 //.....
}
  • TreeBin TreeBin從字面含義中能夠理解爲存儲樹形結構的容器,而樹形結構就是指TreeNode,因此TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制.
static final class TreeBin<K,Vextends Node<K,V{
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1// set while holding write lock
        static final int WAITER = 2// set when waiting for write lock
        static final int READER = 4// increment value for setting read lock
}

構造函數

總體的構造狀況基本跟HashMap相似,而且爲了跟原來的JDK7中的兼容性還能夠傳入併發度。不過JDK8中併發度已經有table的具體長度來控制了。

  1. ConcurrentHashMap():建立一個帶有默認初始容量 (16)、加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
  2. ConcurrentHashMap(int):建立一個帶有指定初始容量 tableSizeFor、默認加載因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
  3. ConcurrentHashMap(Map<? extends K, ? extends V> m):構造一個與給定映射具備相同映射關係的新映射
  4. ConcurrentHashMap(int initialCapacity, float loadFactor):建立一個帶有指定初始容量、加載因子和默認 concurrencyLevel (1) 的新的空映射
  5. ConcurrentHashMap(int, float, int):建立一個帶有指定初始容量、加載因子和併發級別的新的空映射。

put

假設table已經初始化完成,put操做採用 CAS + synchronized 實現併發插入或更新操做,具體實現以下。

  1. 作一些邊界處理,而後得到hash值。
  2. 沒初始化就初始化,初始化後看下對應的桶是否爲空,爲空就原子性的嘗試插入。
  3. 若是當前節點正在擴容還要去幫忙擴容,騷操做。
  4. syn來加鎖當前節點,而後操做幾乎跟就跟hashmap同樣了。

// Node 節點的 hash值在HashMap中存儲的就是hash值,在currenthashmap中可能有多種狀況哦!
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key  null || value  nullthrow new NullPointerException(); //邊界處理
    int hash = spread(key.hashCode());// 最終hash值計算
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { //循環表
        Node<K,V> f; int n, i, fh;
        if (tab  null || (n = tab.length)  0)
            tab = initTable(); // 初始化表 若是爲空,懶漢式
        else if ((f = tabAt(tab, i = (n - 1) & hash))  null) {
        // 若是對應桶位置爲空
            if (casTabAt(tab, i, nullnew Node<K,V>(hash, key, value, null))) 
                         // CAS 原子性的嘗試插入
                break;
        } 
        else if ((fh = f.hash)  MOVED) 
        // 若是當前節點正在擴容。還要幫着去擴容。
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) { //  桶存在數據 加鎖操做進行處理
                if (tabAt(tab, i)  f) {
                    if (fh >= 0) { // 若是存儲的是鏈表 存儲的是節點的hash值
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 遍歷鏈表去查找,若是找到key同樣則選擇性
                            if (e.hash  hash &&
                                ((ek = e.key)  key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next)  null) {// 找到尾部插入
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {// 若是桶節點類型爲TreeBin
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) { 
                             // 嘗試紅黑樹插入,同時也要防止節點原本就有,選擇性覆蓋
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { // 若是鏈表數量
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); //  鏈表轉紅黑樹哦!
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); // 統計大小 而且檢查是否要擴容。
    return null;
}

涉及到重要函數initTabletabAtcasTabAthelpTransferputTreeValtreeifyBinaddCount函數。

initTable

「只容許一個線程」對錶進行初始化,若是不巧有其餘線程進來了,那麼會讓其餘線程交出 CPU 等待下次系統調度Thread.yield。這樣,保證了表同時只會被一個線程初始化,對於table的大小,會根據sizeCtl的值進行設置,若是沒有設置szieCtl的值,那麼默認生成的table大小爲16,不然,會根據sizeCtl的大小設置table大小。

// 容器初始化 操做
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table)  null || tab.length  0) {
        if ((sc = sizeCtl) < 0// 若是正在初始化-1,-N 正在擴容。
            Thread.yield(); // 進行線程讓步等待
     // 讓掉當前線程 CPU 的時間片,使正在運行中的線程從新變成就緒狀態,並從新競爭 CPU 的調度權。
     // 它可能會獲取到,也有可能被其餘線程獲取到。
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 
          //  比較sizeCtl的值與sc是否相等,相等則用 -1 替換,這代表我這個線程在進行初始化了!
            try {
                if ((tab = table)  null || tab.length  0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默認爲16
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2); // sc = 0.75n
                }
            } finally {
                sizeCtl = sc; //設置sizeCtl 相似threshold
            }
            break;
        }
    }
    return tab;
}

unsafe

ConcurrentHashMap中使用了unSafe方法,經過直接操做內存的方式來保證併發處理的安全性,使用的是硬件的安全機制。

 // 用來返回節點數組的指定位置的節點的原子操做
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// cas原子操做,在指定位置設定值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v)
 
{
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子操做,在指定位置設定值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

// 比較table數組下標爲i的結點是否爲c,若爲c,則用v交換操做。不然,不進行交換操做。
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v)
 
{
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

能夠看到得到table[i]數據是經過Unsafe對象經過反射獲取的,取數據直接table[index]不能夠麼,爲何要這麼複雜?在java內存模型中,咱們已經知道每一個線程都有一個工做內存,裏面存儲着table的「副本」,雖然table是volatile修飾的,但不能保證線程每次都拿到table中的最新元素,Unsafe.getObjectVolatile能夠直接獲取指定內存的數據,「保證了每次拿到數據都是最新的」

helpTransfer

// 可能有多個線程在同時幫忙運行helpTransfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // table不是空  且 node節點是轉移類型,而且轉移類型的nextTable 不是空 說明還在擴容ing
        int rs = resizeStamp(tab.length); 
        // 根據 length 獲得一個前16位的標識符,數組容量大小。
        // 肯定新table指向沒有變,老table數據也沒變,而且此時 sizeCtl小於0 還在擴容ing
        while (nextTab  nextTable && table  tab && (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 || sc  rs + MAX_RESIZERS || transferIndex <= 0)
            // 1. sizeCtl 無符號右移16位得到高16位若是不等 rs 標識符變了
            // 2. 若是擴容結束了 這裏能夠看 trePresize 函數第一次擴容操做:
            // 默認第一個線程設置 sc = rs 左移 16 位 + 2,當第一個線程結束擴容了,
            // 就會將 sc 減一。這個時候,sc 就等於 rs + 1。
            // 3. 若是達到了最大幫助線程個數 65535個
            // 4. 若是轉移下標調整ing 擴容已經結束了
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
            // 若是以上都不是, 將 sizeCtl + 1,增長一個線程來擴容
                transfer(tab, nextTab); // 進行轉移
                break;// 結束循環
            }
        }
        return nextTab;
    }
    return table;
}
  • Integer.numberOfLeadingZeros(n)

該方法的做用是「返回無符號整型i的最高非零位前面的0的個數」,包括符號位在內;若是i爲負數,這個方法將會返回0,符號位爲1. 好比說,10的二進制表示爲 0000 0000 0000 0000 0000 0000 0000 1010 java的整型長度爲32位。那麼這個方法返回的就是28

  • resizeStamp 主要用來得到標識符,能夠簡單理解是對當前系統容量大小的一種監控。
static final int resizeStamp(int n) {
   return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); 
   //RESIZE_STAMP_BITS = 16
}

addCount

主要就2件事:一是更新 baseCount,二是判斷是否須要擴容。

private final void addCount(long x, int check) {
 CounterCell[] as; long b, s;
 // 首先若是沒有併發 此時countCells is null, 此時嘗試CAS設置數據值。
 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
     // 若是 counterCells不爲空覺得此時有併發的設置 或者 CAS設置 baseCount 失敗了
     CounterCell a; long v; int m;
     boolean uncontended = true;
     if (as  null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m])  null ||
         !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
         // 1. 若是沒出現併發 此時計數盒子爲 null
         // 2. 隨機取出一個數組位置發現爲空
         // 3. 出現併發後修改這個cellvalue 失敗了
         // 執行funAddCount
         fullAddCount(x, uncontended);// 死循環操做
         return;
     }
     if (check <= 1)
         return;
     s = sumCount(); // 吧counterCells數組中的每個數據進行累加給baseCount。
 }
 // 若是須要擴容
 if (check >= 0) {
  Node<K,V>[] tab, nt; int n, sc;
  while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
   int rs = resizeStamp(n);// 得到高位標識符
   if (sc < 0) { // 是否須要幫忙去擴容
    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
     sc  rs + MAX_RESIZERS || (nt = nextTable)  null || transferIndex <= 0)
     break;
    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
     transfer(tab, nt);
   } // 第一次擴容
   else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
    transfer(tab, null);
   s = sumCount();
  }
 }
}
  1. baseCount添加 ConcurrentHashMap提供了 baseCountcounterCells 兩個輔助變量和一個 CounterCell輔助內部類。sumCount() 就是迭代 counterCells來統計 sum 的過程。put 操做時,確定會影響 size(),在 put() 方法最後會調用 addCount()方法。總體的思惟方法跟LongAdder相似,用的思惟就是借鑑的 ConcurrentHashMap。每個 Cell都用Contended修飾來避免僞共享。
  1. JDK1.7 和 JDK1.8 對 size 的計算是不同的。1.7 中是先不加鎖計算三次,若是三次結果不同在加鎖。
  2. JDK1.8 size 是經過對 baseCount 和 counterCell 進行 CAS 計算,最終經過 baseCount 和 遍歷 CounterCell 數組得出 size。
  3. JDK 8 推薦使用mappingCount 方法,由於這個方法的返回值是 long 類型,不會由於 size 方法是 int 類型限制最大值。
  1. 關於擴容 addCount第一次擴容時候會有騷操做 sc=rs << RESIZE_STAMP_SHIFT) + 2)其中 rs = resizeStamp(n)。這裏須要核心說一點,

若是不是第一次擴容則直接將低16位的數字 +1 便可。

putTreeVal

這個操做幾乎跟HashMap的操做徹底同樣,核心思想就是必定要決定向左仍是向右而後最終嘗試放置新數據,而後balance。不一樣點就是有鎖的考慮。

treeifyBin

這裏的基本思路跟HashMap幾乎同樣,不一樣點就是先變成TreeNode,而後是「單向鏈表」串聯。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        //若是整個table的數量小於64,就擴容至原來的一倍,不轉紅黑樹了
        //由於這個閾值擴容能夠減小hash衝突,沒必要要去轉紅黑樹
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) { //鎖定當前桶
                if (tabAt(tab, index)  b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        //遍歷這個鏈表而後將每一個節點封裝成TreeNode,最終單鏈表串聯起來,
                        // 最終 調用setTabAt 放置紅黑樹
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              nullnull);
                        if ((p.prev = tl)  null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    //經過TreeBin對象對TreeNode轉換成紅黑樹
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

TreeBin

主要功能就是鏈表變化爲紅黑樹,這個紅黑樹用TreeBin來包裝。而且要注意 轉成紅黑樹之後之前鏈表的結構信息仍是有的,最終信息以下:

  1. TreeBin.first = 鏈表中第一個節點。
  2. TreeBin.root = 紅黑樹中的root節點。
TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, nullnullnull);   
            //建立空節點 hash = -2 
            this.first = b;
            TreeNode<K,V> r = null// root 節點
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r  null) {
                    x.parent = null;
                    x.red = false;
                    r = x; // root 節點設置爲x 
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                   // x表明的是轉換爲樹以前的順序遍歷到鏈表的位置的節點,r表明的是根節點
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc  null &&
                                  (kc = comparableClassFor(k))  null) ||
                                 (dir = compareComparables(kc, k, pk))  0)
                            dir = tieBreakOrder(k, pk);    
                            // 當key不能夠比較,或者相等的時候採起的一種排序措施
                            TreeNode<K,V> xp = p;
                        // 放必定是放在葉子節點上,若是還沒找到葉子節點則進行循環往下找。
                        // 找到了目前葉子節點纔會進入 再放置數據
                        if ((p = (dir <= 0) ? p.left : p.right)  null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x); 
                     // 每次插入一個元素的時候都調用 balanceInsertion 來保持紅黑樹的平衡
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

tryPresize

當數組長度小於64的時候,擴張數組長度一倍,調用此函數。擴容後容量大小的核對,可能涉及到初始化容器大小。而且擴容的時候又跟2的次冪聯繫上了!,其中初始化時候傳入map會調用putAll方法直接put一個map的話,在「putAll」方法中沒有調用initTable方法去初始化table,而是直接調用了tryPresize方法,因此這裏須要作一個是否是須要初始化table的判斷。

PS:默認第一個線程設置 sc = rs 左移 16 位 + 2,當第一個線程結束擴容了,就會將 sc 減一。這個時候,sc 就等於 rs + 1,這個時候說明擴容完畢了。

     /**
     * 擴容表爲指能夠容納指定個數的大小(老是2的N次方)
     * 假設原來的數組長度爲16,則在調用tryPresize的時候,size參數的值爲16<<1(32),此時sizeCtl的值爲12
     * 計算出來c的值爲64, 則要擴容到 sizeCtl ≥ c
     *  第一次擴容以後 數組長:32 sizeCtl:24
     *  第三次擴容以後 數組長:128 sizeCtl:96 退出
     */

    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1); // 合理範圍
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
                if (tab  null || (n = tab.length)  0) {
                // 初始化傳入map,今天putAll會直接調用這個。
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    
                //初始化tab的時候,把 sizeCtl 設爲 -1
                    try {
                        if (table  tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2); // sc=sizeCtl = 0.75n
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
             // 初始化時候若是  數組容量<=sizeCtl 或 容量已經最大化了則退出
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                    break;//退出擴張
            }
            else if (tab  table) {
                int rs = resizeStamp(n);

                if (sc < 0) { // sc = siztCtl 若是正在擴容Table的話,則幫助擴容
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc  rs + 1 ||
                        sc  rs + MAX_RESIZERS || (nt = nextTable)  null ||
                        transferIndex <= 0)
                        break// 各類條件判斷是否須要加入擴容工做。
                     // 幫助轉移數據的線程數 + 1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                 // 沒有在初始化或擴容,則開始擴容
                 // 此處切記第一次擴容 直接 +2 
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                      (rs << RESIZE_STAMP_SHIFT) + 2)) {
                        transfer(tab, null);
                }
            }
        }
    }

transfer

這裏代碼量比較大主要分文三部分,而且感受思路很精髓,尤爲「是其餘線程幫着去擴容的騷操做」

  1. 主要是 單個線程能處理的最少桶結點個數的計算和一些屬性的初始化操做。
  2. 每一個線程進來會先領取本身的任務區間 [bound,i],而後開始 --i 來遍歷本身的任務區間,對每一個桶進行處理。若是遇到桶的頭結點是空的,那麼使用 ForwardingNode標識舊table中該桶已經被處理完成了。若是遇到已經處理完成的桶,直接跳過進行下一個桶的處理。若是是正常的桶,對桶首節點加鎖,正常的遷移便可(跟HashMap第三部分同樣思路),遷移結束後依然會將原表的該位置標識位已經處理。

該函數中的finish= true 則說明整張表的遷移操做已經「所有」完成了,咱們只須要重置 table的引用並將 nextTable 賦爲空便可。不然,CAS 式的將 sizeCtl減一,表示當前線程已經完成了任務,退出擴容操做。若是退出成功,那麼須要進一步判斷當前線程是否就是最後一個在執行擴容的。

f ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
   return;

第一次擴容時在addCount中有寫到(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 表示當前只有一個線程正在工做,「相對應的」,若是 (sc - 2) resizeStamp(n) << RESIZE_STAMP_SHIFT,說明當前線程就是最後一個還在擴容的線程,那麼會將 finishing 標識爲 true,並在下一次循環中退出擴容方法。

  1. 幾乎跟 HashMap大體思路相似的遍歷鏈表/紅黑樹而後擴容操做。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用來控制不要佔用太多CPU
        stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16 每一個CPU處理最小長度個數

    if (nextTab  null) { // 新表格爲空則直接新建二倍,別的輔助線程來幫忙擴容則不會進入此if條件
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n; // transferIndex 指向最後一個桶,方便從後向前遍歷
    }
    int nextn = nextTab.length; // 新表長度
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 建立一個fwd節點,這個是用來控制併發的,當一個節點爲空或已經被轉移以後,就設置爲fwd節點
    boolean advance = true;    //是否繼續向前查找的標誌位
    boolean finishing = false// to ensure sweep(清掃) before committing nextTab,在完成以前從新在掃描一遍數組,看看有沒完成的沒
     // 第一部分
    // i 指向當前桶, bound 指向當前線程須要處理的桶結點的區間下限【bound,i】 這樣來跟線程劃分任務。
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
       // 這個 while 循環的目的就是經過 --i 遍歷當前線程所分配到的桶結點
       // 一個桶一個桶的處理
        while (advance) {//  每一次成功處理操做都會將advance設置爲true,然裏來處理區間的上一個數據
            int nextIndex, nextBound;
            if (--i >= bound || finishing) { //經過此處進行任務區間的遍歷
                advance = false;
            }
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;// 任務分配完了
                advance = false;
            }
            // 更新 transferIndex
           // 爲當前線程分配任務,處理的桶結點區間爲(nextBound,nextIndex)
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
               // nextIndex原本等於末尾數字,
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 當前線程全部任務完成 
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {  // 已經完成轉移 則直接賦值操做
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);    //設置sizeCtl爲擴容後的0.75
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1 表示當前線程任務完成。
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { 
                // 判斷當前線程完成的線程是否是最後一個在擴容的,思路精髓
                        return;
                }
                finishing = advance = true;// 若是是則相應的設置參數
                i = n; 
            }
        }
        else if ((f = tabAt(tab, i))  null// 數組中把null的元素設置爲ForwardingNode節點(hash值爲MOVED[-1])
            advance = casTabAt(tab, i, null, fwd); // 若是老節點數據是空的則直接進行CAS設置爲fwd
        else if ((fh = f.hash)  MOVED) //已是個fwd了,由於是多線程操做 可能別人已經給你弄好了,
            advance = true// already processed
        else {
            synchronized (f) { //加鎖操做
                if (tabAt(tab, i)  f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) { //該節點的hash值大於等於0,說明是一個Node節點
                    // 關於鏈表的操做總體跟HashMap相似不過 感受好像更擾一些。
                        int runBit = fh & n; // fh= f.hash first hash的意思,看第一個點 放老位置仍是新位置
                        Node<K,V> lastRun = f;

                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;    //n的值爲擴張前的數組的長度
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;//最後致使發生變化的節點
                            }
                        }
                        if (runBit  0) { //看最後一個變化點是新仍是舊 舊
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun; //看最後一個變化點是新仍是舊 舊
                            ln = null;
                        }
                        /*
                         * 構造兩個鏈表,順序大部分和原來是反的,不過順序也有差別
                         * 分別放到原來的位置和新增長的長度的相同位置(i/n+i)
                         */

                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n)  0)
                                    /*
                                     * 假設runBit的值爲0,
                                     * 則第一次進入這個設置的時候至關於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同爲0的節點)設置到舊的table的第一個hash計算後爲0的節點下一個節點
                                     * 而且把本身返回,而後在下次進來的時候把它本身設置爲後面節點的下一個節點
                                     */

                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                    /*
                                     * 假設runBit的值不爲0,
                                     * 則第一次進入這個設置的時候至關於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同不爲0的節點)設置到舊的table的第一個hash計算後不爲0的節點下一個節點
                                     * 而且把本身返回,而後在下次進來的時候把它本身設置爲後面節點的下一個節點
                                     */

                                hn = new Node<K,V>(ph, pk, pv, hn);    
                        }
                        setTabAt(nextTab, i, ln);    
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) { // 該節點hash值是個負數不然的話是一個樹節點
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null// 舊 頭尾
                        TreeNode<K,V> hi = null, hiTail = null//新頭圍
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, nullnull);
                            if ((h & n)  0) {
                                if ((p.prev = loTail)  null)
                                    lo = p;
                                else
                                    loTail.next = p; //舊頭尾設置
                                loTail = p;
                                ++lc;
                            }
                            else { // 新頭圍設置
                                if ((p.prev = hiTail)  null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                         //ln  若是老位置數字<=6 則要對老位置鏈表進行紅黑樹降級到鏈表,不然就看是否還須要對老位置數據進行新建紅黑樹
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd); //老表中i位置節點設置下
                        advance = true;
                    }
                }
            }
        }
    }
}

get

這個就很簡單了,得到hash值,而後判斷存在與否,遍歷鏈表便可,注意get沒有任何鎖操做!

    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        // 計算key的hash值
        int h = spread(key.hashCode()); 
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) { // 表不爲空而且表的長度大於0而且key所在的桶不爲空
            if ((eh = e.hash)  h) { // 表中的元素的hash值與key的hash值相等
                if ((ek = e.key)  key || (ek != null && key.equals(ek))) // 鍵相等
                    // 返回值
                    return e.val;
            }
            else if (eh < 0// 是個TreeBin hash = -2 
                // 在紅黑樹中查找,由於紅黑樹中也保存這一個鏈表順序
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) { // 對於結點hash值大於0的狀況鏈表
                if (e.hash  h &&
                    ((ek = e.key)  key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

clear

關於清空也相對簡單 ,無非就是遍歷桶數組,而後經過CAS來置空。

public void clear() {
    long delta = 0L;
    int i = 0;
    Node<K,V>[] tab = table;
    while (tab != null && i < tab.length) {
        int fh;
        Node<K,V> f = tabAt(tab, i);
        if (f  null)
            ++i; //這個桶是空的直接跳過
        else if ((fh = f.hash)  MOVED) { // 這個桶的數據還在擴容中,要去擴容同時等待。
            tab = helpTransfer(tab, f);
            i = 0// restart
        }
        else {
            synchronized (f) { // 真正的刪除
                if (tabAt(tab, i)  f) {
                    Node<K,V> p = (fh >= 0 ? f :(f instanceof TreeBin) ?((TreeBin<K,V>)f).first : null);
                        //循環到鏈表/者紅黑樹的尾部
                        while (p != null) {
                            --delta; // 記錄刪除了多少個
                            p = p.next;
                        } 
                        //利用CAS無鎖置null  
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        if (delta != 0L)
            addCount(delta, -1); //調整count
    }

end

ConcurrentHashMap是若是來作到「併發安全」,又是如何作到「高效」的併發的呢?

  1. 首先是讀操做,讀源碼發現get方法中根本沒有使用同步機制,也沒有使用unsafe方法,因此讀操做是支持併發操做的。

  2. 寫操做

  • . 數據擴容函數是 transfer,該方法的只有 addCounthelpTransfertryPresize這三個方法來調用。
  1. addCount是在當對數組進行操做,使得數組中存儲的元素個數發生了變化的時候會調用的方法。
  2. helpTransfer是在當一個線程要對table中元素進行操做的時候,若是檢測到節點的·hash·= MOVED 的時候,就會調用 helpTransfer方法,在 helpTransfer中再調用 transfer方法來幫助完成數組的擴容
  1. tryPresize是在 treeIfybinputAll方法中調用, treeIfybin主要是在 put添加元素完以後,判斷該數組節點相關元素是否是已經超過8個的時候,若是超過則會調用這個方法來擴容數組或者把鏈表轉爲樹。注意 putAll在初始化傳入一個大map的時候會調用。·

總結擴容狀況發生:

  1. 在往map中添加元素的時候,在某一個節點的數目已經超過了8個,同時數組的長度又小於64的時候,纔會觸發數組的擴容。
  2. 當數組中元素達到了sizeCtl的數量的時候,則會調用transfer方法來進行擴容

3. 擴容時候是否能夠進行讀寫。

對於讀操做,由於是沒有加鎖的因此能夠的. 對於寫操做,JDK8中已經將鎖的範圍細膩到table[i]l了,當在進行數組擴容的時候,若是當前節點尚未被處理(也就是說尚未設置爲fwd節點),那就能夠進行設置操做。若是該節點已經被處理了,則當前線程也會加入到擴容的操做中去。

  1. 多個線程又是如何同步處理的 在 ConcurrentHashMap中,同步處理主要是經過 Synchronizedunsafe的硬件級別原子性 這兩種方式來完成的。
  1. 在取得sizeCtl跟某個位置的Node的時候,使用的都是 unsafe的方法,來達到併發安全的目的
  2. 當須要在某個位置設置節點的時候,則會經過 Synchronized的同步機制來鎖定該位置的節點。
  3. 在數組擴容的時候,則經過處理的 步長fwd節點來達到併發安全的目的,經過設置hash值爲MOVED=-1。
  4. 當把某個位置的節點複製到擴張後的table的時候,也經過 Synchronized的同步機制來保證線程安全

套路

  1. 談談你理解的 HashMap,講講其中的 get put 過程。
  2. 1.8 作了什麼優化?
  3. 是線程安全的嘛?
  4. 不安全會致使哪些問題?
  5. 如何解決?有沒有線程安全的併發容器?
  6. ConcurrentHashMap 是如何實現的?1.七、1.8 實現有何不一樣,爲何這麼作。
  7. 1.8中ConcurrentHashMap的sizeCtl做用,大體說下協助擴容跟標誌位。
  8. HashMap 爲何不用跳錶替換紅黑樹呢?

參考

HashMap講解 


本文分享自微信公衆號 - sowhat1412(sowhat9094)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索