Java併發5:ConcurrentHashMap

爲何要使用 ConcurrentHashMap

HashMap 是非線程安全的,put操做可能致使死循環。其解決方案有 HashTable 和 Collections.synchronizedMap(hashMap) 。這兩種方案都是對讀寫加鎖,獨佔式,效率比較低下。java

HashMap 在併發執行put操做時會引發死循環,由於多線程致使 HashMap 的 Entry 鏈表造成環形數據結構,則 Entry 的 next 節點永遠不爲空,會死循環獲取 Entry。算法

HashTable 使用 synchronized 來保證線程安全,可是在線程競爭激烈的狀況下,效率很是低。其緣由是全部訪問該容器的線程都必須競爭一把鎖。編程

針對上述問題,ConcurrentHashMap 使用鎖分段技術,容器裏有多把鎖,每一把鎖用於其中一部分數據,當多線程訪問不一樣數據段的數據時,線程間就不會存在鎖的競爭。數組

ConcurrentHashMap 實現(JDK 1.7)

在JDK 1.7中,ConcurrentHashMap 採用了 Segment 數組和 HashEntry 數組的方式進行實現。其中 Segment 是一種可重入鎖(ReentrantLock),扮演鎖的角色。而 HashEntry 則是用於存儲鍵值對的數據。結構以下圖所示:安全

一個 Segment 包含一個 HashEntry 數組,每一個 HashEntry 是一個鏈表結構的元素。每一個 Segment 守護一個 HashEntry 數組的元素。數據結構

初始化

初始化時,計算出 Segment 數組的大小 ssize 和每一個 Segment 中 HashEntry 數組的大小 cap,並初始化 Segment 數組的第一個元素。其中 ssize 爲2的冪次方,默認爲16,cap 大小也是2的冪次方,最小值爲2。最終結果根據初始化容量 initialCapacity 計算。多線程

//計算segment數組長度
if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        //初始化segmentShift和SegmentMask
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //計算每一個Segment中HashEntry數組大小cap
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // 初始化segment數組和segment[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
複製代碼

首先,初始化了 segments 數組,其長度 ssize 是經過 concurrencyLevel 計算得出的。須要保證ssize的長度是2的N次方,segments 數組的長度最大是65536。併發

而後初始化了 segmentShift 和 segmentMask 這兩個全局變量,用於定位 segment 的散列算法。segmentShift 是用於散列運算的位數, segmentMask 是散列運算的掩碼。ssh

以後根據 initialCapaicity 和 loadfactor 這兩個參數來計算每一個 Segment 中 HashEntry 數組的大小 cap。函數

最後根據以上肯定的參數,初始化了 segment 數組以及 segment[0]。

get操做

整個 get 操做過程都不須要加鎖,所以很是高效。首先將 key 通過 Hash 以後定位到 Segment,而後再經過一個 Hash 定位到具體元素。不須要加鎖是由於 get 方法將須要使用的共享變量都定義成 volatile 類型,所以能在線程之間保持可見性,在多線程同時讀時能保證不會讀到過時的值。

put操做

put 方法須要對共享變量進行寫入操做,爲了線程安全,必須加鎖。 put 方法首先定位到 Segment,而後在 Segment 裏進行插入操做。插入操做首先要判斷是否須要對 Segment 裏的 HashEntry 數組進行擴容,而後定位添加元素的位置,將其放入到 HashEntry 數組。

Segment 的擴容比 HashMap 更恰當,由於後者是插入元素後判斷是否已經到達容量,若是到達了就擴容,可是可能擴容後沒有插入,進行了無效的擴容。Segment 在擴容時,首先建立一個原來容量兩倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組。同時爲了高效, ConcurrentHashMap 不會對整個容器進行擴容,而是隻對某個 segment 進行擴容。

size方法

每一個 Segment 都有一個 volatile 修飾的全局變量 count ,求整個 ConcurrentHashMap 的 size 時很明顯就是將全部的 count 累加便可。可是 volatile 修飾的變量卻不能保證多線程的原子性,全部直接累加很容易出現併發問題。可是若是在調用 size 方法時鎖住其他的操做,效率也很低。

ConcurrentHashMap 的作法是先嚐試兩次經過不加鎖的方式進行計算,若是兩次結果相同,說明結果正確。若是計算結果不一樣,則給每一個 Segment 加鎖,進行統計。

ConcurrentHashMap 實現(JDK 1.8)

在JDK 1.8中,改變了分段鎖的思路,採用了 Node數組 + CAS + Synchronized 來保證併發安全。底層仍然採用了數組+鏈表+紅黑樹的存儲結構。

Node

在JDK 1.8中,使用 Node 替換 HashEntry,二者做用相同。在 Node 中, val 和 next 兩個變量都是 volatile 修飾的,保證了可見性。

使用 table 變量存放 Node 節點數組,默認爲 null, 默認大小爲16,且每次擴容時大小老是2的冪次方。在擴容時,使用 nextTable 存放新生成的數據,數組爲 table 的兩倍。

ForwardingNode 是一個特殊的 Node 節點,hash 值爲-1,存儲了 nextTable 的引用。只有table 發生擴容時,其發生做用,做爲佔位符放在 table 中表示當前節點爲null或者已經被移動。

TreeNode

在 HashMap 中,其核心的數據結構是鏈表。而在 ConcurrentHashMap 中,若是鏈表的數據過長會轉換爲紅黑樹來處理。經過將鏈表的節點包裝成 TreeNode 放在 TreeBin 中,而後經由 TreeBin 完成紅黑樹的轉換。

TreeBin

TreeBin 不負責鍵值對的包裝,用於在鏈表轉換爲紅黑樹時包裝 TreeNode 節點,用來構建紅黑樹。

初始化:initTable()

在構造函數中,ConcurrentHashMap 僅僅設置了一些參數。當首次插入元素時,才經過 initTable() 方法進行了初始化。

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //有其餘線程在初始化,掛起當前線程
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
        //得到了初始化的權利,使用CAS將sizeCtl設置爲-1,表示本線程正在初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            //進行初始化
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //下次擴容的大小,至關於0.75*n
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
複製代碼

該方法的關鍵爲sizeCtl。

sizeCtl:控制標識符,用來控制table初始化和擴容操做的,在不一樣的地方有不一樣的用途,其值也不一樣,所表明的含義也不一樣:

  • 負數表明正在進行初始化或擴容操做
  • -1表明正在初始化
  • -N 表示有N-1個線程正在進行擴容操做
  • 正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小

sizeCtl 默認爲0。若是該值小於0,表示有其餘線程在初始化,須要暫停該線程。若是該線程獲取了初始化的權利,先將其設置爲-1。最後將 sizeCtl 設置爲 0.75*n,表示擴容的閾值。

put操做

put操做的核心思想依然是根據 hash 計算節點插入在 table 的位置,若是爲空,直接插入,不然插入到鏈表或樹中。

首先計算hash值,而後進入循環中遍歷table,嘗試插入。

int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
//詳細代碼接下來分別講述
}
複製代碼

首先判斷 table 是否爲空,若是爲空或者是 null,則先進行初始化操做。

if (tab == null || (n = tab.length) == 0)
                tab = initTable();
複製代碼

若是已經初始化過,且插入的位置沒有節點,直接插入該節點。使用CAS嘗試插入該節點。

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
複製代碼

若是有線程在擴容,先幫助擴容。

//當前位置的hashcode等於-1,須要擴容
 else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
複製代碼

若是都不知足,使用 synchronized 鎖寫入數據。根據數據結構的不一樣,若是是鏈表則插入尾部;若是是樹節點,使用樹的插入操做。

else {
V oldVal = null;
//對該節點進行加鎖處理(hash值相同的鏈表的頭節點)
synchronized (f) {
    if (tabAt(tab, i) == f) {
        //fh > 0 表示爲鏈表,將該節點插入到鏈表尾部
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                //hash 和 key 都同樣,替換value
                if (e.hash == hash &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    //putIfAbsent()
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                //鏈表尾部 直接插入
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                            value, null);
                    break;
                }
            }
        }
        //樹節點,按照樹的插入操做進行插入
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                    value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}
複製代碼

在for循環的最後,判斷鏈表的長度是否須要鏈表轉換爲樹結構。

if (binCount != 0) {
    // 若是鏈表長度已經達到臨界值8,把鏈表轉換爲樹結構
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}
複製代碼

最後,若是是更新節點,前邊已經返回了 oldVal,不然就是插入新的節點。還須要使用 addCount() 方法,爲 size 加一。

總結步驟以下:

  1. 判斷 key 和 value 是否爲 null,若是是則拋出異常
  2. 計算 hash
  3. 遍歷 table,插入節點
    • 若是 table 爲空,進行初始化
    • 插入位置爲空,直接插入,無需加鎖
    • 若是是 ForwardingNode 節點,表示有其餘線程正在擴容,幫助線程一塊兒進行擴容。
    • 若是是鏈表結構,遍歷鏈表,若是存在 key 則更新 value,不然插入到鏈表尾部;若是是 TreeBin 節點,按照紅黑樹的方法更新或增長節點。
    • 若是完成後發現鏈表長度大於設定的閾值,將其裝換爲紅黑樹
  4. 若是是更新,返回oddVal;若是是插入,使用 addCount() 方法,增長 size, 返回 null。

get方法

  1. 計算 hash 值
  2. 判斷 table 是否爲空,爲空返回 null
  3. 根據 hash 獲取 Node 節點,若是是該節點則返回 value
  4. 根據鏈表或紅黑樹查找到對應節點,返回 value
  5. 找不到則返回 null

參考資料

相關文章
相關標籤/搜索