ConcurrentHashMap 1.8原理解析

JDK 1.8中,Hash家族有這麼一些存在,HashMap,HashTable,LinkedHashMap,ConcurrentHashMap。這裏面支持線程安全的有HashTable以及ConcurrentHashMap。對Hash有一個基本瞭解能夠參考本人的從Hash到一致性Hash原理(深度好文) 。java

那既然說到ConcurrentHashMap,天然要討論的就是它的線程安全性和效能。咱們先來看一下HashTable的線程安全性以及效能的低下。算法

public synchronized V put(K key, V value) {
    // Make sure the value is not null
    if (value == null) {
        throw new NullPointerException();
    }

    // Makes sure the key is not already in the hashtable.
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    @SuppressWarnings("unchecked")
    Entry<K,V> entry = (Entry<K,V>)tab[index];
    for(; entry != null ; entry = entry.next) {
        if ((entry.hash == hash) && entry.key.equals(key)) {
            V old = entry.value;
            entry.value = value;
            return old;
        }
    }

    addEntry(hash, key, value, index);
    return null;
}
public synchronized V get(Object key) {
    Entry<?,?> tab[] = table;
    int hash = key.hashCode();
    int index = (hash & 0x7FFFFFFF) % tab.length;
    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
        if ((e.hash == hash) && e.key.equals(key)) {
            return (V)e.value;
        }
    }
    return null;
}

咱們對比一下HashMap 1.7的這兩個方法(由於1.8點HashMap會生成紅黑樹,咱們暫時先不考慮紅黑樹的問題)數組

public V put(K key, V value) {
        if (key == null)
            return putForNullKey(value);
        int hash = hash(key);
        int i = indexFor(hash, table.length);
        for (Entry<K,V> e = table[i]; e != null; e = e.next) {
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }

        modCount++;
        addEntry(hash, key, value, i);
        return null;
    }
public V get(Object key) {
        if (key == null)
            return getForNullKey();
        Entry<K,V> entry = getEntry(key);

        return null == entry ? null : entry.getValue();
    }
final int hash(Object k) {
        int h = 0;
        if (useAltHashing) {
            if (k instanceof String) {
                return sun.misc.Hashing.stringHash32((String) k);
            }
            h = hashSeed;
        }

        h ^= k.hashCode();

        // This function ensures that hashCodes that differ only by
        // constant multiples at each bit position have a bounded
        // number of collisions (approximately 8 at default load factor).
        //一種算法,進行4次位移,獲得相對比較分散的鏈表
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }

在這裏咱們能夠看到,他們除了key的hash算法不一樣,HashTable的比較簡單,只是用key的哈希值與0x7FFFFFFF(十進制2147483647,二進制1111111111111111111111111111111)進行一次與運算,即爲只要相同二進制位上不論0,1所有都變成1,再對table數組的長度取模。而HashMap 1.7則爲key的哈希值進行各位無符號位移加異或運算,取得最終哈希值。而後是HashTable不接收null的key,而HashMap接受。他們最大的區別就在於synchronized顯示器鎖了。安全

synchronized的本質是全部對象的字節碼中有一個monitor的對象頭,任何線程拿到了這個monitor的對象頭就能夠對這個對象進行操做,而拿不到monitor對象頭的線程就只能等待,直到拿到了monitor的線程放棄,其餘線程才能爭奪這個對象頭來對對象進行操做。那麼問題來了,當大量線程高併發的時候,只要有一個線程拿到了這個對象頭,其餘線程對這個對象是既不能讀也不能寫。而對於HashMap來講,若是多線程對其進行操做,那麼任意線程均可以胡亂修改裏面值的內容形成髒讀,因此HashMap是線程不安全的。多線程

那麼咱們今天的主角登場了ConcurrentHashMap。咱們一樣來看一下這兩個方法。如下是1.8的源碼,首先咱們要清楚的是1.8跟1.7已經徹底不一樣,1.7是使用segements(16個segement),每一個segement都有一個table(Map.Entry數組),至關於16個HashMap,同步機制爲分段鎖,每一個segment繼承ReentrantLock;而1.8只有1個table(Map.Entry數組),同步機制爲CAS + synchronized保證併發更新。若是不搞清楚這個問題,那麼你看1.8的源碼可能會很懵逼。併發

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //不管key仍是value,不容許空
    if (key == null || value == null) throw new NullPointerException();
    //此處獲取hash值的方法與HashTable相似
    int hash = spread(key.hashCode());
    int binCount = 0;
    //無限循環
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //若是節點數組爲null,或者長度爲0,初始化節點數組
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //若是節點數組的某個節點爲null,則put的時候就會採用無鎖競爭來獲取該節點的頭把交椅
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //須要擴容的時候先擴容,再寫入
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else { //若是hash衝突的時候,即多線程操做時,你們都有同樣的hash值
            V oldVal = null;
            synchronized (f) { //鎖定節點數組的該節點
                if (tabAt(tab, i) == f) {
                    //若是當前該節點爲鏈表形態
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //找鏈表中找到相同的key,把新value替代老value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //若是找不到key,就添加到鏈表到末尾
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //若是當前爲紅黑樹形態,進行紅黑樹到查找和替代(存在相同的key),或者放入紅黑樹到新葉節點上(key不存在)
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                //若是鏈表長度超過了8,鏈表轉紅黑樹
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //統計節點個數,檢查是否須要擴容
    addCount(1L, binCount);
    return null;
}
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

讀取的時候,咱們沒有看見鎖到存在,說明讀不受多線程影響。app

對比ConcurrentHashMap和HashTable,咱們能夠明顯的看到,ConcurrentHashMap在寫的時候,並無鎖住整個節點數組,在新節點上使用的是無鎖競爭,在老節點上鎖住的僅僅是一個節點,讀的時候若是不是剛好讀到寫線程寫入相同Hash值的位置,不受影響(能夠認爲咱們的操做通常是讀多寫少,這種概率也比較低)。而HashTable是對整個節點數組進行鎖定,讀到時候不能寫,寫的時候不能讀,這麼一對比就能夠明顯感受到性能差距是巨大的。框架

雖然ConcurrentHashMap的併發性能還算比較優異,但在億級計算中,卻依然會成爲性能瓶頸,具體能夠參考本人的Fork/Join框架原理和使用探祕 高併發

至於這裏爲何會慢,我認爲在這種超高併發下,節點數組的單節點的的寫寫競爭是互斥的,其次,因爲紅黑樹具備讀快寫慢的特性,它要不斷保持樹的平衡而不斷返轉,因此纔會使得高併發寫的性能急劇降低。性能

相關文章
相關標籤/搜索