併發編程之 ConcurrentHashMap(JDK 1.8) putVal 源碼分析

前言

咱們以前分析了Hash的源碼,主要是 put 方法。同時,咱們知道,HashMap 在併發的時候是不安全的,爲何呢?由於當多個線程對 Map 進行擴容會致使鏈表成環。不僅僅是這個問題,當多個線程相同一個槽中插入數據,也是不安全的。而在這以後,咱們學習了併發編程,而併發編程中有一個重要的東西,就是JDK 自帶的併發容器,提供了線程安全的特性且比同步容器性能好出不少。一個典型的表明就是 ConcurrentHashMap,對,又是 HashMap ,可是這個 Map 是線程安全的,那麼一樣的,咱們今天就看看該類的 put 方法是如何實現線程安全的。java

源碼加註釋分析 putVal 方法

/** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 死循環執行
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化
                tab = initTable();
            // 獲取對應下標節點,若是是kong,直接插入
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // CAS 進行插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 若是 hash 衝突了,且 hash 值爲 -1,說明是 ForwardingNode 對象(這是一個佔位符對象,保存了擴容後的容器)
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 若是 hash 衝突了,且 hash 值不爲 -1
            else {
                V oldVal = null;
                // 同步 f 節點,防止增長鏈表的時候致使鏈表成環
                synchronized (f) {
                    // 若是對應的下標位置 的節點沒有改變
                    if (tabAt(tab, i) == f) {
                        // 而且 f 節點的hash 值 不是大於0
                        if (fh >= 0) {
                            // 鏈表初始長度
                            binCount = 1;
                            // 死循環,直到將值添加到鏈表尾部,並計算鏈表的長度
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 若是 f 節點的 hasj 小於0 而且f 是 樹類型
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 鏈表長度大於等於8時,將該節點改爲紅黑樹樹
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 判斷是否須要擴容
        addCount(1L, binCount);
        return null;
    }
複製代碼

樓主在代碼中寫了不少註釋,可是仍是說一下步驟(該方法和HashMap 的高度類似,可是多了不少同步操做)。編程

  1. 校驗key value 值,都不能是null。這點和 HashMap 不一樣。
  2. 獲得 key 的 hash 值。
  3. 死循環並更新 tab 變量的值。
  4. 若是容器沒有初始化,則初始化。調用 initTable 方法。該方法經過一個變量 + CAS 來控制併發。稍後咱們分析源碼。
  5. 根據 hash 值找到數組下標,若是對應的位置爲空,就建立一個 Node 對象用CAS方式添加到容器。並跳出循環。
  6. 若是 hash 衝突,也就是對應的位置不爲 null,則判斷該槽是否被擴容了(-1 表示被擴容了),若是被擴容了,返回新的數組。
  7. 若是 hash 衝突 且 hash 值不是 -1,表示沒有被擴容。則進行鏈表操做或者紅黑樹操做,注意,這裏的 f 頭節點被鎖住了,保證了同時只有一個線程修改鏈表。防止出現鏈表成環。
  8. 和 HashMap 同樣,若是鏈表樹超過8,則修改鏈表爲紅黑樹。
  9. 將數組加1(CAS方式),若是須要擴容,則調用 transfer 方法(很是複雜,之後再詳解)進行移動和從新散列,該方法中,若是是槽中只有單個節點,則使用CAS直接插入,若是不是,則使用 synchronized 進行同步,防止併發成環。

這裏說一說 initTable 方法:數組

/** * Initializes table, using the size recorded in sizeCtl. */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 小於0說明被其餘線程改了
            if ((sc = sizeCtl) < 0)
                // 自旋等待
                Thread.yield(); // lost initialization race; just spin
            // CAS 修改 sizeCtl 的值爲-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        // sc 在初始化的時候用戶可能會自定義,若是沒有自定義,則是默認的
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        // 建立數組
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // sizeCtl 計算後做爲擴容的閥值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }


複製代碼

該方法爲了在併發環境下的安全,加入了一個 sizeCtl 變量來進行判斷,只有當一個線程經過CAS修改該變量成功後(默認爲0,改爲 -1),該線程才能初始化數組。保證了初始化數組時的安全性。安全

總結

ConcurrentHashMap 是併發大師 Doug Lea 的傑做,能夠說鬼斧神工,總的來講,使用了 CAS 加 synchronized 來保證了 put 操做併發時的危險(特別是鏈表),相比 同步容器 hashTable 來講,若是容器大小是16,併發的性能是他的16倍,注意,讀的時候是沒有鎖的,徹底併發,而 HashTable 在 get 方法上直接加上了 synchronized 關鍵字,性能差距不言而喻。併發

固然,樓主這篇文章可能之寫到了 ConcurrentHashMap 的皮毛,關於如何擴容,樓主沒有詳細介紹,而樓主在閱讀源碼的收穫也不少,發現了不少有趣的東西,好比 ThreadLocalRandom 類在 addCount 方法中的應用,你們能夠看看該類,很是的實用。dom

注意:這篇文章僅僅是 ConcurrentHashMap 的開頭,關於 ConcurrentHashMap 裏面的精華太多,值得咱們好好學習。性能

good luck !!!!!學習

相關文章
相關標籤/搜索