jdk8 ConcurrentHashMap 源碼解析

why

今天面試新同窗, 整理面試題的時候, 看到ConcurrentHashMap, 好久以前瞭解過, 記得是按segment分段鎖提升併發效率,jdk8重寫了這個類, 日常業務代碼中用到的也比較少, 忽略了,今天從新拾起來看一下, 作一個筆記, 有錯誤之處, 歡迎批評指正java

jdk7 和 jdk8 的差別

jdk7 使用 ReentrantLock + segment + hashentry + unsafe
jdk8 使用 Synchronized + CAS + Node + NodeTree + Unsafenode

重點方法

從兩個最重要的方法提及, get, put面試

先說重點put方法, 對於併發而言, 讀取比較簡單,不涉及到數據改動, 就不須要鎖。瞭解在put數據邏輯就能更清楚的知道ConcurrentHashMap是如何工做的算法

put 方法

採用無限循環邏輯,檢查table中當前下標的值數組

  1. 檢查table 是否初始, 沒有的話初始化table,從新循環
  2. 根據hash值模運算,計算出數組下標, 取出數組下標所在的值,若是值是null, 則用CAS設置到該下標處, 若是設置成功結束, 若是設置失敗(失敗緣由多是其它線程設置該下標的值) 從新循環
  3. 待定
  4. 若是當前下標的值不爲空,進入同步代碼塊
    1. 再次檢查當前下標的值是否有改變,有改變結束當前,從新循環, 沒有改變且是鏈表狀況,邏輯比較好理解取出下標的值, 比較key 是否至關, 相等則設置新值, 不相等掛載鏈表, 同時記錄鏈表長度
    2. 若是是紅黑樹,則把值設置到紅黑樹(紅黑樹這裏不作展開)
    3. 根據鏈表長度,判斷是否須要轉換成紅黑樹, 默認閥值是8

上圖更清晰

源碼(關鍵部分加了註釋)

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 若是table爲空, 初始化table, 詳見下面
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 判斷當前hash 的位置有沒有值,沒有值, 直接使使cas 無阻塞設置
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 只是鎖住單個對象, 鎖粒度更小
                synchronized (f) {
                    // 再次檢查是否有變動
                    if (tabAt(tab, i) == f) {
                        // 若是這個節點hash 值不爲0, 意思是當前節點爲普通節點的時候, 這裏應該比較容易理解, 比較hash 值, key equals 是否相等, 若是hash 衝突就添加鏈表, 記錄鏈表長度(binCount),以後會根據長度調整, 是否使用紅黑樹代替鏈表
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 若是已是樹結構, 就按照樹的結構來了
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 檢查說閥值,默認是8, 超過會轉換成樹
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
複製代碼

get 方法(註釋說明)

get 方法相對簡潔不少, 主要邏輯已經put方法中處理bash

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // node 是紅黑樹時,查找對應節點
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
                
            // 爲鏈表時, 循環找出對應節點
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
複製代碼

初始化map底層數組 table(選讀)

須要瞭解的兩個前置基本概念併發

  1. Unsafe

簡單講一下這個類。Java沒法直接訪問底層操做系統,而是經過本地(native)方法來訪問。不過儘管如此,JVM仍是開了一個後門,JDK中有一個類Unsafe,它提供了硬件級別的原子操做。高併發

這個類儘管裏面的方法都是public的,可是並無辦法使用它們,JDK API文檔也沒有提供任何關於這個類的方法的解釋。總而言之,對於Unsafe類的使用都是受限制的,只有授信的代碼才能得到該類的實例,固然JDK庫裏面的類是能夠隨意使用的。ui

  1. CAS

CAS,Compare and Swap即比較並交換,設計併發算法時經常使用到的一種技術,java.util.concurrent包全完創建在CAS之上,沒有CAS也就沒有此包,可見CAS的重要性。this

當前的處理器基本都支持CAS,只不過不一樣的廠家的實現不同罷了。CAS有三個操做數:內存值V、舊的預期值A、要修改的值B,當且僅當預期值A和內存值V相同時,將內存值修改成B並返回true,不然什麼都不作並返回false。

  1. 源碼

初始化數組大小時,沒有加鎖,由於用了個 sizeCtl 變量,將這個變量置爲-1,就代表table正在初始化。

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        // sizeCtl: table 初始化和resize的標誌位,表初始化和調整大小控件。當爲負值時,將初始化或調整表的大小
            if ((sc = sizeCtl) < 0)
                // 若是是-1 表示正在初始化或者調整大小, 這時放棄cpu使用, 進行下一次循環檢查
                Thread.yield(); // lost initialization race; just spin
            // 設置SIZECTL爲-1,設置成功開始初始化, 不成功繼續循環。  
            // compareAndSwapInt 非阻塞同步原語: arg0, arg1, arg2, arg3 分別爲對象實例,目標對象屬性,當前預期值,要設的值, 設置成功返回 true, 失敗 false
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
複製代碼

總結

  1. 用 Synchronized + CAS + Node + NodeTree 代替 Segment ,只有在hash 衝突, 或者修改已經值的時候纔去加鎖, 鎖的粒度更小,大幅減小阻塞

  2. 鏈表節點數量大於8時,會將鏈表轉化爲紅黑樹進行存儲,查詢時間複雜度從O(n),變成遍歷紅黑樹O(logN)。

以前也看過幾篇別的幾篇關於ConcurrentHashMap 的貼子, 看完以後容易忘記, 看源碼就像精讀書同樣, 若是能仔細看一遍,理解了,就能熟記於心, 寫中間件, 底層的同窗更值得讀一下

相關文章
相關標籤/搜索