Java併發編程筆記之ConcurrentHashMap原理探究

在多線程環境下,使用HashMap進行put操做時存在丟失數據的狀況,爲了不這種bug的隱患,強烈建議使用ConcurrentHashMap代替HashMap。node

HashTable是一個線程安全的類,它使用synchronized來鎖住整張Hash表來實現線程安全,即每次鎖住整張表讓線程獨佔,至關於全部線程進行讀寫時都去競爭一把鎖,致使效率很是低下。ConcurrentHashMap能夠作到讀取數據不加鎖,而且其內部的結構可讓其在進行寫操做的時候可以將鎖的粒度保持地儘可能地小,容許多個修改操做併發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不一樣部分進行的修改。ConcurrentHashMap內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的Hashtable,它們有本身的鎖。只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。面試

 

CouncurrentHashMap實現原理算法

ConcurrentHashMap 爲了提升自己的併發能力,在內部採用了一個叫作 Segment 的結構,一個 Segment 其實就是一個類 Hash Table 的結構,Segment 內部維護了一個鏈表數組,咱們用下面這一幅圖來看下 ConcurrentHashMap 的內部結構,從下面的結構咱們能夠了解到,ConcurrentHashMap 定位一個元素的過程須要進行兩次Hash操做,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的鏈表的頭部,所以,這一種結構的帶來的反作用是 Hash 的過程要比普通的 HashMap 要長,可是帶來的好處是寫操做的時候能夠只對元素所在的 Segment 進行操做便可,不會影響到其餘的 Segment,這樣,在最理想的狀況下,ConcurrentHashMap 能夠最高同時支持 Segment 數量大小的寫操做(恰好這些寫操做都很是平均地分佈在全部的 Segment上),因此,經過這一種結構,ConcurrentHashMap 的併發能力能夠大大的提升。咱們用下面這一幅圖來看下ConcurrentHashMap的內部結構詳情圖,以下:編程

不難看出,ConcurrentHashMap採用了二次hash的方式,第一次hash將key映射到對應的segment,而第二次hash則是映射到segment的不一樣桶(bucket)中。數組

爲何要用二次hash,主要緣由是爲了構造分離鎖,使得對於map的修改不會鎖住整個容器,提升併發能力。固然,沒有一種東西是絕對完美的,二次hash帶來的問題是整個hash的過程比hashmap單次hash要長,因此,若是不是併發情形,不要使用concurrentHashmap。安全

JAVA7以前ConcurrentHashMap主要採用鎖機制,在對某個Segment進行操做時,將該Segment鎖定,不容許對其進行非查詢操做,而在JAVA8以後採用CAS無鎖算法,這種樂觀操做在完成前進行判斷,若是符合預期結果纔給予執行,對併發操做提供良好的優化.數據結構

讓咱們先看JDK1.7的ConcurrentHashMap的原理分析多線程

JDK1.7的ConcurrentHashMap

如上所示,是由 Segment 數組、HashEntry 組成,和 HashMap 同樣,仍然是數組加鏈表。併發

讓咱們看看Segment裏面的成員變量,源碼以下:app

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    transient volatile int count;    //Segment中元素的數量
    transient int modCount;          //對table的大小形成影響的操做的數量(好比put或者remove操做)
    transient int threshold;        //閾值,Segment裏面元素的數量超過這個值那麼就會對Segment進行擴容
    final float loadFactor;         //負載因子,用於肯定threshold
    transient volatile HashEntry<K,V>[] table;    //鏈表數組,數組中的每個元素表明了一個鏈表的頭部
}

 

接着再看看HashEntry中的組成,源碼以下:

  /**
     * ConcurrentHashMap列表Entry。注意,這不會做爲用戶可見的Map.Entry導出。
     */
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * 設置具備volatile寫語義的next字段。
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
     //下一個HashEntry的偏移量
static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class;
          //獲取HashEntry next在內存中的偏移量 nextOffset
= UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

和 HashMap 很是相似,惟一的區別就是其中的核心數據如 value ,以及鏈表都是 volatile 修飾的,保證了獲取時的可見性。

原理上來講:ConcurrentHashMap 採用了分段鎖技術,其中 Segment 繼承於 ReentrantLock。不會像 HashTable 那樣無論是 put 仍是 get 操做都須要作同步處理,理論上 ConcurrentHashMap 支持 CurrencyLevel (Segment 數組數量)的線程併發。每當一個線程佔用鎖訪問一個 Segment 時,不會影響到其餘的 Segment。

接着讓咱們繼續看看JDK1.7中ConcurrentHashMap的成員變量和構造函數,源碼以下:

// 默認初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
// 默認加載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 默認segment層級
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
// segment最小容量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
// 一個segment最大容量
static final int MAX_SEGMENTS = 1 << 16;
// 鎖以前重試次數
static final int RETRIES_BEFORE_LOCK = 2;

public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // 找到兩種大小的最匹配參數
        int sshift = 0;
        // segment數組的長度是由concurrentLevel計算來的,segment數組的長度是2的N次方,
        // 默認concurrencyLevel = 16, 因此ssize在默認狀況下也是16,此時 sshift = 4
        // sshift至關於ssize從1向左移的次數
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift; 
            ssize <<= 1;
        }
        // 段偏移量,默認值狀況下此時segmentShift = 28
        this.segmentShift = 32 - sshift;
        // 散列算法的掩碼,默認值狀況下segmentMask = 15
        this.segmentMask = ssize - 1;

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;

        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        // 建立ssize長度的Segment數組
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
 }

其中,concurrencyLevel 一經指定,不可改變,後續若是ConcurrentHashMap的元素數量增長致使ConrruentHashMap須要擴容,ConcurrentHashMap不會增長Segment的數量,而只會增長Segment中鏈表數組的容量大小,這樣的好處是擴容過程不須要對整個ConcurrentHashMap作rehash,而只須要對Segment裏面的元素作一次rehash就能夠了。

  整個ConcurrentHashMap的初始化方法仍是很是簡單的,先是根據concurrencyLevel來new出Segment,這裏Segment的數量是不大於concurrencyLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操做來進行hash,加快hash的過程。接下來就是根據intialCapacity肯定Segment的容量的大小,每個Segment的容量大小也是2的指數,一樣使爲了加快hash的過程。

注意一下兩個變量segmentShift和segmentMask,這兩個變量在後面將會起到很大的做用,假設構造函數肯定了Segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。

接下來讓咱們看看JDK1.7中的ConcurrentHashMap的核心方法 put 方法和get 方法。

 public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
     //(1)
int hash = hash(key);
     //(2)
int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j);
     //(3)
return s.put(key, hash, value, false); }

代碼(1)計算key的hash值

代碼(2)根據hash值,segmentShift,segmentMask定位到哪一個Segment。

代碼(3)將鍵值對保存到對應的segment中。

能夠看到首先是經過 key 定位到 Segment,以後在對應的 Segment 中進行具體的 put。 Segment 中進行具體的 put的源碼以下:

  final V put(K key, int hash, V value, boolean onlyIfAbsent) {
       //(1) HashEntry
<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try {
          //(2) HashEntry
<K,V>[] tab = table;
          //(3)
int index = (tab.length - 1) & hash;
          //(4) HashEntry
<K,V> first = entryAt(tab, index);
          //(5)
for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; }
            //(6)
else {
              
if (node != null)
                 //(7) node.setNext(first);
else //(8) node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1;
              //(9)
if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else //(10) setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally {
         //(11) unlock(); }
return oldValue; }

雖然 HashEntry 中的 value 是用 volatile 關鍵詞修飾的,可是並不能保證併發的原子性,因此 put 操做時仍然須要加鎖處理。

代碼(1)首先第一步的時候會嘗試獲取鎖,若是獲取失敗確定就有其餘線程存在競爭,則利用 scanAndLockForPut() 自旋獲取鎖。

代碼(2)每個Segment對應一個HashEntry[ ]數組。

代碼(3)計算對應HashEntry數組的下標 ,每一個segment中數組的長度都是2的N次方,因此這裏通過運算以後,取的是hash的低幾位數據。

代碼(4)定位到HashEntry的某一個結點(對應鏈表的表頭結點)。

代碼(5)遍歷鏈表。

代碼(6)若是鏈表爲空(即表頭爲空)

代碼(7)將新節點插入到鏈表做爲鏈表頭。、

代碼(8)根據key和value 建立結點並插入鏈表。

代碼(9)判斷元素個數是否超過了閾值或者segment中數組的長度超過了MAXIMUM_CAPACITY,若是知足條件則rehash擴容!

代碼(10)不須要擴容時,將node放到數組(HashEntry[])中對應的位置

代碼(11)最後釋放鎖。

總的來講,put 的流程以下:

  1. 將當前 Segment 中的 table 經過 key 的 hashcode 定位到 HashEntry。
  2. 遍歷該 HashEntry,若是不爲空則判斷傳入的 key 和當前遍歷的 key 是否相等,相等則覆蓋舊的 value。
  3. 不爲空則須要新建一個 HashEntry 並加入到 Segment 中,同時會先判斷是否須要擴容。
  4. 最後會解除在 代碼(1) 中所獲取當前 Segment 的鎖。

接着讓咱們看看其擴容,rehash源碼以下:

       /**
         * 兩倍於以前的容量
         */
        @SuppressWarnings("unchecked")
        private void rehash(HashEntry<K,V> node) {

            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
            // 擴大1倍(左移一位)
            int newCapacity = oldCapacity << 1;
            // 計算新的閾值
            threshold = (int)(newCapacity * loadFactor);
            // 建立新的數組
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            // mask
            int sizeMask = newCapacity - 1;
            // 遍歷舊數組數據
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i]; // 對應一個鏈表的表頭結點
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    // 計算e對應的這條鏈表在新數組中對應的下標
                    int idx = e.hash & sizeMask; 
                    if (next == null)   //  只有一個結點時直接放入(新的)數組中
                        newTable[idx] = e;
                    else { // 鏈表有多個結點時:
                        HashEntry<K,V> lastRun = e; // 就鏈表的表頭結點作爲新鏈表的尾結點
                        int lastIdx = idx;
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            // 舊數組中一個鏈表中的數據並不必定在新數組中屬於同一個鏈表,因此這裏須要每次都從新計算
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // lastRun(和以後的元素)插入數組中。
                        newTable[lastIdx] = lastRun;
                        // 從(舊鏈表)頭結點向後遍歷,遍歷到最後一組不一樣於前面hash值的組頭。
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n); // 拼接鏈表
                        }
                    }
                }
            }
            // 將以前的舊數據都添加到新的結構中以後,纔會插入新的結點(依舊是插入表頭)
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
      }

 

接着,再看看scanAndLockForPut() 自旋獲取鎖,源碼以下:

 

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // 定位節點時爲負數
       //(1) while (!tryLock()) { HashEntry<K,V> f; // 首先在下面從新檢查 if (retries < 0) { if (e == null) { if (node == null) // 推測性地建立節點 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; }
          //(2)
else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // 若是Entry改變則從新遍歷 retries = -1; } } return node; }

掃描包含給定key的節點,同時嘗試獲取鎖,若是沒有找到,則建立並返回一個。

返回時,保證鎖被持有。

與大多數方法不一樣,對方法equals的調用不進行篩選:因爲遍歷速度可有可無,咱們還能夠幫助預熱相關代碼和訪問。

代碼(1)嘗試自旋獲取鎖。

代碼(2)若是重試的次數達到了 MAX_SCAN_RETRIES 則改成阻塞鎖獲取,保證能獲取成功。

 

接下來,再讓咱們看看JDK1.7中的get方法,源碼以下:

   public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        // 首先計算出segment數組的下標  ((h >>> segmentShift) & segmentMask))
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) { // 根據下標找到segment
            // 而後(tab.length - 1) & h) 獲得對應HashEntry數組的下標
            // 遍歷鏈表
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;

                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

能夠看到get 邏輯沒有前面的方法複雜:

只須要將 Key 經過 Hash 以後定位到具體的 Segment ,再經過一次 Hash 定位到具體的元素上。

因爲 HashEntry 中的 value 屬性是用 volatile 關鍵詞修飾的,保證了內存可見性,因此每次獲取時都是最新值。

ConcurrentHashMap 的 get 方法是很是高效的,由於整個過程都不須要加鎖

 

接着再看看remove方法,源碼以下:

public V remove(Object key) {
        // 計算hash值
        int hash = hash(key);
        // 根據hash值找到對應的segment
        Segment<K,V> s = segmentForHash(hash);
        // 調用Segment.remove 函數
        return s == null ? null : s.remove(key, hash, null);
}
public boolean remove(Object key, Object value) {
        int hash = hash(key);
        Segment<K,V> s;
        return value != null && (s = segmentForHash(hash)) != null &&
            s.remove(key, hash, value) != null;
}

Segment.remove函數的源碼以下:

     /**
         * Remove; match on key only if value null, else match both.
         */
        final V remove(Object key, int hash, Object value) {
            if (!tryLock())
                scanAndLock(key, hash);
            V oldValue = null;
            try {
                HashEntry<K,V>[] tab = table;
                // 計算HashEntry數組下標
                int index = (tab.length - 1) & hash;
                // 找到頭結點
                HashEntry<K,V> e = entryAt(tab, index);
                HashEntry<K,V> pred = null;
                while (e != null) {
                    K k;
                    HashEntry<K,V> next = e.next;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) { // 找到對應節點
                        V v = e.value;
                        if (value == null || value == v || value.equals(v)) {
                            if (pred == null)
                                // 當pred爲空時,表示要移除的是鏈表的表頭節點,從新設置鏈表
                                setEntryAt(tab, index, next);
                            else
                                pred.setNext(next);
                            ++modCount;
                            --count;
                            // 記錄舊value值
                            oldValue = v;
                        }
                        break;
                    }
                    pred = e;
                    e = next;
                }
            } finally {
                unlock();
            }
            return oldValue;
        }

 

JDK1.8中的ConcurrentHashMap原理分析

1.7 已經解決了併發問題,而且能支持 N 個 Segment 這麼屢次數的併發,但依然存在 HashMap 在 1.7 版本中的問題。那麼是什麼問題呢?

  很明顯那就是查詢遍歷鏈表效率過低。

所以 1.8 作了一些數據結構上的調整。,在 JAVA8 中它摒棄了 Segment(鎖段)的概念,而是啓用了一種全新的方式實現,利用 CAS 算法。底層依然由「數組」+鏈表+紅黑樹的方式思想,可是爲了作到併發,又增長了不少輔助的類,例如 TreeBin、Traverser等對象內部類。

如何讓多線程之間,對象的狀態對於各線程的「可視性」是順序一致的:ConcurrentHashMap 使用了 happens-before 規則來實現。 happens-before規則(摘取自 JAVA 併發編程):

  • 程序次序法則:線程中的每一個動做A都 happens-before 於該線程中的每個動做B,其中,在程序中,全部的動做B都能出如今A以後。
  • 監視器鎖法則:對一個監視器鎖的解鎖 happens-before 於每個後續對同一監視器鎖的加鎖。
  • volatile 變量法則:對 volatile 域的寫入操做 happens-before 於每個後續對同一個域的讀寫操做。
  • 線程啓動法則:在一個線程裏,對 Thread.start 的調用會 happens-before 於每一個啓動線程的動做。
  • 線程終結法則:線程中的任何動做都 happens-before 於其餘線程檢測到這個線程已經終結、或者從 Thread.join 調用中成功返回,或 Thread.isAlive 返回 false。
  • 中斷法則:一個線程調用另外一個線程的 interrupt happens-before 於被中斷的線程發現中斷。
  • 終結法則:一個對象的構造函數的結束 happens-before 於這個對象 finalizer 的開始。
  • 傳遞性:若是 A happens-before 於 B,且 B happens-before 於 C,則 A happens-before於C:

        假設代碼有兩條語句,代碼順序是語句1先於語句2執行;那麼只要語句之間不存在依賴關係,那麼打亂它們的順序對最終的結果沒有影響的話,那麼真正交給CPU去執行時,他們的執行順序能夠是先執行語句2而後語句1。

首先來看下底層的組成結構(下圖是百度來的,懶得畫了):

能夠看到JDK1.8ConcurrentHashMap 和JDK1.8的HashMap是很類似的。其中拋棄了原有的 Segment 分段鎖,而採用了 CAS + synchronized 來保證併發安全性。

//鍵值輸入。 此類永遠不會做爲用戶可變的Map.Entry導出(即,一個支持setValue;請參閱下面的MapEntry),
//但能夠用於批量任務中使用的只讀遍歷。 具備負哈希字段的節點的子類是特殊的,而且包含空鍵和值(但永遠不會導出)。 不然,鍵和val永遠不會爲空。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry<?,?> e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry<?,?>)o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * 對map.get()的虛擬化支持; 在子類中重寫。 */ Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }

也將 1.7 中存放數據的 HashEntry 改成 Node,但做用都是相同的。

其中的 val next 都用了 volatile 修飾,保證了可見性。

接着再看看put方法的源碼,源碼以下:

   public V put(K key, V value) {
        return putVal(key, value, false);
   }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
     //(1)
if (key == null || value == null) throw new NullPointerException();
     //(2)
int hash = spread(key.hashCode()); int binCount = 0;
     //(3)
for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh;
       //(4)
if (tab == null || (n = tab.length) == 0) tab = initTable();
       //(5)
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
       //(6)
else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null;
          //(7) synchronized (f) {
if (tabAt(tab, i) == f) {
              //(8)
if (fh >= 0) { binCount = 1;
                 //(9)
for (Node<K,V> e = f;; ++binCount) { K ek;
                   //(10)
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e;
                   //(11)若是遍歷到了最後一個結點,那麼就證實新的節點須要插入 就把它插入在鏈表尾部
if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }
              //(12)
else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) {
            //(13)
if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } }
     //代碼(14) addCount(
1L, binCount); return null; }

代碼(1)若爲空 拋異常

代碼(2)計算hash值

代碼(3)

代碼(4)判斷是否須要進行初始化。

代碼(5)f 即爲當前 key 定位出的 Node,若是爲空表示當前位置能夠寫入數據,利用 CAS 嘗試寫入,失敗則自旋保證成功。

代碼(6)若是當前位置的 hashcode == MOVED == -1,則須要進行擴容。

代碼(7)若是都不知足,則利用 synchronized 鎖寫入數據。結點上鎖 這裏的結點能夠理解爲hash值相同組成的鏈表的頭結點

代碼(8)fh〉0 說明這個節點是一個鏈表的節點 不是樹的節點.

代碼(9)在這裏遍歷鏈表全部的結點

代碼(10)若是hash值和key值相同 則修改對應結點的value值

代碼(11)若是遍歷到了最後一個結點,那麼就證實新的節點須要插入 就把它插入在鏈表尾部

代碼(12)若是這個節點是樹節點,就按照樹的方式插入值

代碼(13)若是鏈表長度已經達到臨界值8 就須要把鏈表轉換爲樹結構。若是數量大於 TREEIFY_THRESHOLD 則要轉換爲紅黑樹。

代碼(14)將當前ConcurrentHashMap的元素數量+1

 

接着我咱們在看看JDK1.8中ConcurrentHashMap的get方法源碼,源碼以下:

// GET方法(JAVA8)
public V get(Object key) {  
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
    //計算hash值  
    int h = spread(key.hashCode());  
    //根據hash值肯定節點位置  
    if ((tab = table) != null && (n = tab.length) > 0 &&  
        (e = tabAt(tab, (n - 1) & h)) != null) {  
        //若是搜索到的節點key與傳入的key相同且不爲null,直接返回這個節點    
        if ((eh = e.hash) == h) {  
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                return e.val;  
        }  
        //若是eh<0 說明這個節點在樹上 直接尋找  
        else if (eh < 0)  
            return (p = e.find(h, key)) != null ? p.val : null;  
         //不然遍歷鏈表 找到對應的值並返回  
        while ((e = e.next) != null) {  
            if (e.hash == h &&  
                ((ek = e.key) == key || (ek != null && key.equals(ek))))  
                return e.val;  
        }  
    }  
    return null;  
}  

 

接着再看看JDK1.8中ConcurrentHashMap的remove方法源碼,源碼以下:

// REMOVE OR REPLACE方法(JAVA8)
 final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 數組不爲空,長度不爲0,指定hash碼值爲0
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        // 是一個 forwardNode
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        // 循環尋找
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            // equal 相同 取出
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                 // value爲null或value和查到的值相等  
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    // 如果樹 紅黑樹高效查找/刪除
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

能夠看出 JDK1.8 和 JDK1.7 對 ConcurrentHashMap 的實現改變,筆者更喜歡 CAS 無鎖機制,若是隻是看我寫以上代碼註釋明顯不足以瞭解 JAVA8 的 ConcurrentHashMap 的實現,我也僅僅提供源碼閱讀的思路,其中 cas、volatile、final 等注意已經給解釋,因此若是你們真的感興趣仍是寫程序,打斷點,一步步看看這個代碼的實現.

1.8 在 1.7 的數據結構上作了大的改動,採用紅黑樹以後能夠保證查詢效率(O(logn)),甚至取消了 ReentrantLock 改成了 synchronized,這樣能夠看出在新版的 JDK 中對 synchronized 優化是很到位的。

 

相信到這裏爲止,理解上面的內容,遇到面試,問題都迎刃而解,下面是網上找的面試題,以下:

(1)你知道 HashMap 的工做原理嗎?你知道 HashMap 的 get() 方法的工做原理嗎?

HashMap 是基於 hashing 的原理,咱們使用 put(key, value) 存儲對象到 HashMap 中,使用 get(key) 從 HashMap 中獲取對象。當咱們給 put() 方法傳遞鍵和值時,咱們先對鍵調用 hashCode() 方法,返回的 hashCode 用於找到 bucket 位置來儲存 Entry 對象。

(2)你知道 ConcurrentHashMap 的工做原理嗎?你知道 ConcurrentHashMap 在 JAVA8 和 JAVA7 對比有哪些不一樣呢?

ConcurrentHashMap 爲了提升自己的併發能力,在內部採用了一個叫作 Segment 的結構,一個 Segment 其實就是一個類 Hash Table 的結構,Segment 內部維護了一個鏈表數組,咱們用下面這一幅圖來看下 ConcurrentHashMap 的內部結構,從下面的結構咱們能夠了解到,ConcurrentHashMap 定位一個元素的過程須要進行兩次Hash操做,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的鏈表的頭部,所以,這一種結構的帶來的反作用是 Hash 的過程要比普通的 HashMap 要長,可是帶來的好處是寫操做的時候能夠只對元素所在的 Segment 進行操做便可,不會影響到其餘的 Segment,這樣,在最理想的狀況下,ConcurrentHashMap 能夠最高同時支持 Segment 數量大小的寫操做(恰好這些寫操做都很是平均地分佈在全部的 Segment上),因此,經過這一種結構,ConcurrentHashMap 的併發能力能夠大大的提升。

JAVA7以前ConcurrentHashMap主要採用鎖機制,在對某個Segment進行操做時,將該Segment鎖定,不容許對其進行非查詢操做,而在JAVA8以後採用CAS無鎖算法,這種樂觀操做在完成前進行判斷,若是符合預期結果纔給予執行,對併發操做提供良好的優化

(3)當兩個對象的hashcode相同會發生什麼?

由於hashcode相同,因此它們的bucket位置相同,‘碰撞’會發生。由於Map使用LinkedList存儲對象,這個Entry(包含有鍵值對的Map.Entry對象)會存儲在LinkedList中。(當向 Map 中添加 key-value 對,由其 key 的 hashCode() 返回值決定該 key-value 對(就是 Entry 對象)的存儲位置。當兩個 Entry 對象的 key 的 hashCode() 返回值相同時,將由 key 經過 eqauls() 比較值決定是採用覆蓋行爲(返回 true),仍是產生 Entry 鏈(返回 false)),此時若你能講解JDK1.8紅黑樹引入,面試官或許會另眼相看。

(4)若是兩個鍵的 hashcode 相同,你如何獲取值對象?

當咱們調用get()方法,HashMap 會使用鍵對象的 hashcode 找到 bucket 位置,而後獲取值對象。若是有兩個值對象儲存在同一個 bucket,將會遍歷 LinkedList 直到找到值對象。找到 bucket 位置以後,會調用 keys.equals() 方法去找到 LinkedList 中正確的節點,最終找到要找的值對象。(當程序經過 key 取出對應 value 時,系統只要先計算出該 key 的 hashCode() 返回值,在根據該 hashCode 返回值找出該 key 在 table 數組中的索引,而後取出該索引處的 Entry,最後返回該 key 對應的 value 便可)。

(5)若是HashMap的大小超過了負載因子(load factor)定義的容量,怎麼辦?

當一個map填滿了75%的bucket時候,和其它集合類(如ArrayList等)同樣,將會建立原來HashMap大小的兩倍的bucket數組,來從新調整map的大小,並將原來的對象放入新的bucket數組中。這個過程叫做rehashing,由於它調用hash方法找到新的bucket位置。

(6)你瞭解從新調整HashMap大小存在什麼問題嗎?

當從新調整HashMap大小的時候,確實存在條件競爭,由於若是兩個線程都發現HashMap須要從新調整大小了,它們會同時試着調整大小。在調整大小的過程當中,存儲在LinkedList中的元素的次序會反過來,由於移動到新的bucket位置的時候,HashMap並不會將元素放在LinkedList的尾部,而是放在頭部,這是爲了不尾部遍歷(tail traversing)。若是條件競爭發生了,那麼就死循環了。這個時候,你能夠質問面試官,爲何這麼奇怪,要在多線程的環境下使用HashMap呢?

(7)請問ConcurrentHashMap中變量使用final和volatile修飾有什麼用呢?其中鏈表是final的next屬性,那麼發生刪除某個元素,如何實現的?

使用final來實現不變模式(immutable),他是多線程安全裏最簡單的一種保障方式。由於你拿他沒有辦法,想改變它也沒有機會。不變模式主要經過final關鍵字來限定的。在JMM中final關鍵字還有特殊的語義。Final域使得確保初始化安全性(initialization safety)成爲可能,初始化安全性讓不可變形對象不須要同步就能自由地被訪問和共享。

使用volatile來保證某個變量內存的改變對其餘線程即時可見,在配合CAS能夠實現不加鎖對併發操做的支持

remove執行的開始就將table賦給一個局部變量tab,將tab依次複製出來,最後直到該刪除位置,將指針指向下一個變量。

(8)描述一下ConcurrentHashMap中remove操做,有什麼須要注意的?

須要注意以下幾點。第一,當要刪除的結點存在時,刪除的最後一步操做要將count的值減一。這必須是最後一步操做,不然讀取操做可能看不到以前對段所作的結構性修改。第二,remove執行的開始就將table賦給一個局部變量tab,這是由於table是volatile變量,讀寫volatile變量的開銷很大。編譯器也不能對volatile變量的讀寫作任何優化,直接屢次訪問非volatile實例變量沒有多大影響,編譯器會作相應優化。

(9)HashTable與ConcurrentHashMap有什麼區別,描述鎖分段技術。

HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由,是由於全部訪問HashTable的線程都必須競爭同一把鎖,那假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效的提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。有些方法須要跨段,好比size()和containsValue(),它們可能須要鎖定整個表而而不只僅是某個段,這須要按順序鎖定全部段,操做完畢後,又按順序釋放全部段的鎖。這裏「按順序」是很重要的,不然極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,而且其成員變量實際上也是final的,可是,僅僅是將數組聲明爲final的並不保證數組成員也是final的,這須要實現上的保證。這能夠確保不會出現死鎖,由於得到鎖的順序是固定的。

相關文章
相關標籤/搜索