解讀Java8中ConcurrentHashMap是如何保證線程安全的

HashMap是工做中使用頻度很是高的一個K-V存儲容器。在多線程環境下,使用HashMap是不安全的,可能產生各類非指望的結果。java

關於HashMap線程安全問題,可參考筆者的另外一篇文章: 深刻解讀HashMap線程安全性問題node

針對HashMap在多線程環境下不安全這個問題,HashMap的做者認爲這並非bug,而是應該使用線程安全的HashMap。編程

目前有以下一些方式能夠得到線程安全的HashMap:數組

  • Collections.synchronizedMap
  • HashTable
  • ConcurrentHashMap

其中,前兩種方式因爲全局鎖的問題,存在很嚴重的性能問題。因此,著名的併發編程大師Doug Lea在JDK1.5的java.util.concurrent包下面添加了一大堆併發工具。其中就包含ConcurrentHashMap這個線程安全的HashMap。安全

本文就來簡單介紹一下ConcurrentHashMap的實現原理。微信

PS:基於JDK8數據結構

0 ConcurrentHashMap在JDK7中的回顧

ConcurrentHashMap在JDK7和JDK8中的實現方式上有較大的不一樣。首先咱們先來大概回顧一下ConcurrentHashMap在JDK7中的原理是怎樣的。多線程

0.1 分段鎖技術

針對HashTable會鎖整個hash表的問題,ConcurrentHashMap提出了分段鎖的解決方案。併發

分段鎖的思想就是:鎖的時候不鎖整個hash表,而是隻鎖一部分。app

如何實現呢?這就用到了ConcurrentHashMap中最關鍵的Segment。

ConcurrentHashMap中維護着一個Segment數組,每一個Segment能夠看作是一個HashMap。

而Segment自己繼承了ReentrantLock,它自己就是一個鎖。

在Segment中經過HashEntry數組來維護其內部的hash表。

每一個HashEntry就表明了map中的一個K-V,用HashEntry能夠組成一個鏈表結構,經過next字段引用到其下一個元素。

上述內容在源碼中的表示以下:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

    // ... 省略 ...
    /** * The segments, each of which is a specialized hash table. */
    final Segment<K,V>[] segments;

    // ... 省略 ...

    /** * Segment是ConcurrentHashMap的靜態內部類 * * Segments are specialized versions of hash tables. This * subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. */
    static final class Segment<K,V> extends ReentrantLock implements Serializable {
    	// ... 省略 ...
    	/** * The per-segment table. Elements are accessed via * entryAt/setEntryAt providing volatile semantics. */
        transient volatile HashEntry<K,V>[] table;
        // ... 省略 ...
    }
    // ... 省略 ...

    /** * ConcurrentHashMap list entry. Note that this is never exported * out as a user-visible Map.Entry. */
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        // ... 省略 ...
    }
}
複製代碼

因此,JDK7中,ConcurrentHashMap的總體結構能夠描述爲下圖這樣子。

JDK7_ConcurrentHashMap結構

由上圖可見,只要咱們的hash值足夠分散,那麼每次put的時候就會put到不一樣的segment中去。 而segment本身自己就是一個鎖,put的時候,當前segment會將本身鎖住,此時其餘線程沒法操做這個segment, 但不會影響到其餘segment的操做。這個就是鎖分段帶來的好處。

0.2 線程安全的put

ConcurrentHashMap的put方法源碼以下:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    int j = (hash >>> segmentShift) & segmentMask;

    // 根據key的hash定位出一個segment,若是指定index的segment還沒初始化,則調用ensureSegment方法初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
        s = ensureSegment(j);
    // 調用segment的put方法
    return s.put(key, hash, value, false);
}
複製代碼

最終會調用segment的put方法,將元素put到HashEntry數組中,這裏的註釋中只給出鎖相關的說明

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 由於segment自己就是一個鎖
    // 這裏調用tryLock嘗試獲取鎖
    // 若是獲取成功,那麼其餘線程都沒法再修改這個segment
    // 若是獲取失敗,會調用scanAndLockForPut方法根據key和hash嘗試找到這個node,若是不存在,則建立一個node並返回,若是存在則返回null
    // 查看scanAndLockForPut源碼會發現他在查找的過程當中會嘗試獲取鎖,在多核CPU環境下,會嘗試64次tryLock(),若是64次還沒獲取到,會直接調用lock()
    // 也就是說這一步必定會獲取到鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else {
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    // 擴容
                    rehash(node);
                else
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 釋放鎖
        unlock();
    }
    return oldValue;
}
複製代碼

0.3 線程安全的擴容(Rehash)

HashMap的線程安全問題大部分出在擴容(rehash)的過程當中。

ConcurrentHashMap的擴容只針對每一個segment中的HashEntry數組進行擴容。

由上述put的源碼可知,ConcurrentHashMap在rehash的時候是有鎖的,因此在rehash的過程當中,其餘線程沒法對segment的hash表作操做,這就保證了線程安全。

1 JDK8中ConcurrentHashMap的初始化

以無參數構造函數爲例,來看一下ConcurrentHashMap類初始化的時候會作些什麼。

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
複製代碼

首先會執行靜態代碼塊和初始化類變量。 主要會初始化如下這些類變量:

// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class<?> k = ConcurrentHashMap.class;
        SIZECTL = U.objectFieldOffset
            (k.getDeclaredField("sizeCtl"));
        TRANSFERINDEX = U.objectFieldOffset
            (k.getDeclaredField("transferIndex"));
        BASECOUNT = U.objectFieldOffset
            (k.getDeclaredField("baseCount"));
        CELLSBUSY = U.objectFieldOffset
            (k.getDeclaredField("cellsBusy"));
        Class<?> ck = CounterCell.class;
        CELLVALUE = U.objectFieldOffset
            (ck.getDeclaredField("value"));
        Class<?> ak = Node[].class;
        ABASE = U.arrayBaseOffset(ak);
        int scale = U.arrayIndexScale(ak);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
    } catch (Exception e) {
        throw new Error(e);
    }
}
複製代碼

這裏用到了Unsafe類,其中objectFieldOffset方法用於獲取指定Field(例如sizeCtl)在內存中的偏移量。

獲取的這個偏移量主要用於幹啥呢?不着急,在下文的分析中,遇到的時候再研究就好。

PS:關於Unsafe的介紹和使用,能夠查看筆者的另外一篇文章 Unsafe類的介紹和使用

2 內部數據結構

先來從源碼角度看一下JDK8中是怎麼定義的存儲結構。

/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. * * hash表,在第一次put數據的時候才初始化,他的大小老是2的倍數。 */
transient volatile Node<K,V>[] table;

/** * 用來存儲一個鍵值對 * * Key-value entry. This class is never exported out as a * user-mutable Map.Entry (i.e., one supporting setValue; see * MapEntry below), but can be used for read-only traversals used * in bulk tasks. Subclasses of Node with a negative hash field * are special, and contain null keys and values (but are never * exported). Otherwise, keys and vals are never null. */
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}
複製代碼

能夠發現,JDK8與JDK7的實現由較大的不一樣,JDK8中不在使用Segment的概念,他更像HashMap的實現方式。

PS:關於HashMap的原理,能夠參考筆者的另外一篇文章 HashMap原理及內部存儲結構

這個結構能夠經過下圖描述出來

JDK8_ConcurrentHashMap結構

3 線程安全的hash表初始化

由上文可知ConcurrentHashMap是用table這個成員變量來持有hash表的。

table的初始化採用了延遲初始化策略,他會在第一次執行put的時候初始化table。

put方法源碼以下(省略了暫時不相關的代碼):

/** * Maps the specified key to the specified value in this table. * Neither the key nor the value can be null. * * <p>The value can be retrieved by calling the {@code get} method * with a key that is equal to the original key. * * @param key key with which the specified value is to be associated * @param value value to be associated with the specified key * @return the previous value associated with {@code key}, or * {@code null} if there was no mapping for {@code key} * @throws NullPointerException if the specified key or value is null */
public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 計算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 若是table是空,初始化之
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 省略...
    }
    // 省略...
}
複製代碼

initTable源碼以下

/** * Initializes table, using the size recorded in sizeCtl. */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // #1
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl的默認值是0,因此最早走到這的線程會進入到下面的else if判斷中
        // #2
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // 嘗試原子性的將指定對象(this)的內存偏移量爲SIZECTL的int變量值從sc更新爲-1
        // 也就是將成員變量sizeCtl的值改成-1
        // #3
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 雙重檢查,緣由會在下文分析
                // #4
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 默認初始容量爲16
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    // #5
                    table = tab = nt; // 建立hash表,並賦值給成員變量table
                    sc = n - (n >>> 2);
                }
            } finally {
                // #6
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
複製代碼

成員變量sizeCtl在ConcurrentHashMap中的其中一個做用至關於HashMap中的threshold,當hash表中元素個數超過sizeCtl時,觸發擴容; 他的另外一個做用相似於一個標識,例如,當他等於-1的時候,說明已經有某一線程在執行hash表的初始化了,一個小於-1的值表示某一線程正在對hash表執行resize。

這個方法首先判斷sizeCtl是否小於0,若是小於0,直接將當前線程變爲就緒狀態的線程。

當sizeCtl大於等於0時,當前線程會嘗試經過CAS的方式將sizeCtl的值修改成-1。修改失敗的線程會進入下一輪循環,判斷sizeCtl<0了,被yield住;修改爲功的線程會繼續執行下面的初始化代碼。

在new Node[]以前,要再檢查一遍table是否爲空,這裏作雙重檢查的緣由在於,若是另外一個線程執行完#1代碼後掛起,此時另外一個初始化的線程執行完了#6的代碼,此時sizeCtl是一個大於0的值,那麼再切回這個線程執行的時候,是有可能重複初始化的。關於這個問題會在下圖的併發場景中說明。

而後初始化hash表,並從新計算sizeCtl的值,最終返回初始化好的hash表。

下圖詳細說明了幾種可能致使重複初始化hash表的併發場景,咱們假設Thread2最終成功初始化hash表。

  • Thread1模擬的是CAS更新sizeCtl變量的併發場景
  • Thread2模擬的是table的雙重檢查的必要性

ConcurrentHashMap初始化的併發場景

由上圖能夠看出,在Thread1中若是不對sizeCtl的值更新作併發控制,Thread1是有可能走到new Node[]這一步的。 在Thread3中,若是不作雙重判斷,Thread3也會走到new Node[]這一步。

4 線程安全的put

put操做可分爲如下兩類

  • 當前hash表對應當前key的index上沒有元素時
  • 當前hash表對應當前key的index上已經存在元素時(hash碰撞)

4.1 hash表上沒有元素時

對應源碼以下

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
                 new Node<K,V>(hash, key, value, null)))
        break;                   // no lock when adding to empty bin
}

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
複製代碼

tabAt方法經過Unsafe.getObjectVolatile()的方式獲取數組對應index上的元素,getObjectVolatile做用於對應的內存偏移量上,是具有volatile內存語義的。

若是獲取的是空,嘗試用cas的方式在數組的指定index上建立一個新的Node。

4.2 hash碰撞時

對應源碼以下

else {
    V oldVal = null;
    // 鎖f是在4.1中經過tabAt方法獲取的
    // 也就是說,當發生hash碰撞時,會以鏈表的頭結點做爲鎖
    synchronized (f) {
        // 這個檢查的緣由在於:
        // tab引用的是成員變量table,table在發生了rehash以後,原來index上的Node可能會變
        // 這裏就是爲了確保在put的過程當中,沒有收到rehash的影響,指定index上的Node仍然是f
        // 若是不是f,那這個鎖就沒有意義了
        if (tabAt(tab, i) == f) {
            // 確保put沒有發生在擴容的過程當中,fh=-1時表示正在擴容
            if (fh >= 0) {
                binCount = 1;
                for (Node<K,V> e = f;; ++binCount) {
                    K ek;
                    if (e.hash == hash &&
                        ((ek = e.key) == key ||
                         (ek != null && key.equals(ek)))) {
                        oldVal = e.val;
                        if (!onlyIfAbsent)
                            e.val = value;
                        break;
                    }
                    Node<K,V> pred = e;
                    if ((e = e.next) == null) {
                        // 在鏈表後面追加元素
                        pred.next = new Node<K,V>(hash, key,
                                                  value, null);
                        break;
                    }
                }
            }
            else if (f instanceof TreeBin) {
                Node<K,V> p;
                binCount = 2;
                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                               value)) != null) {
                    oldVal = p.val;
                    if (!onlyIfAbsent)
                        p.val = value;
                }
            }
        }
    }
    if (binCount != 0) {
        // 若是鏈表長度超過8個,將鏈表轉換爲紅黑樹,與HashMap相同,相對於JDK7來講,優化了查找效率
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }
}
複製代碼

不一樣於JDK7中segment的概念,JDK8中直接用鏈表的頭節點作爲鎖。 JDK7中,HashMap在多線程併發put的狀況下可能會造成環形鏈表,ConcurrentHashMap經過這個鎖的方式,使同一時間只有有一個線程對某一鏈表執行put,解決了併發問題。

5 線程安全的擴容

put方法的最後一步是統計hash表中元素的個數,若是超過sizeCtl的值,觸發擴容。

擴容的代碼略長,可大體看一下里面的中文註釋,再參考下面的分析。 其實咱們主要的目的是弄明白ConcurrentHashMap是如何解決HashMap的併發問題的。 帶着這個問題來看源碼就好。關於HashMap存在的問題,參考本文一開始說的筆者的另外一篇文章便可。

其實HashMap的併發問題多半是因爲put和擴容併發致使的。

這裏咱們就來看一下ConcurrentHashMap是如何解決的。

擴容涉及的代碼以下:

/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. * 業務中使用的hash表 */
transient volatile Node<K,V>[] table;

/** * The next table to use; non-null only while resizing. * 擴容時才使用的hash表,擴容完成後賦值給table,並將nextTable重置爲null。 */
private transient volatile Node<K,V>[] nextTable;

/** * Adds to count, and if table is too small and not already * resizing, initiates transfer. If already resizing, helps * perform transfer if work is available. Rechecks occupancy * after a transfer to see if another resize is already needed * because resizings are lagging additions. * * @param x the count to add * @param check if <0, don't check resize, if <= 1 only check if uncontended */
private final void addCount(long x, int check) {
    // ----- 計算鍵值對的個數 start -----
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    // ----- 計算鍵值對的個數 end -----
    // ----- 判斷是否須要擴容 start -----
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 當上面計算出來的鍵值對個數超出sizeCtl時,觸發擴容,調用核心方法transfer
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 若是有已經在執行的擴容操做,nextTable是正在擴容中的新的hash表
                // 若是併發擴容,transfer直接使用正在擴容的新hash表,保證了不會出現hash表覆蓋的狀況
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 更新sizeCtl的值,更新成功後爲負數,擴容開始
            // 此時沒有併發擴容的狀況,transfer中會new一個新的hash表來擴容
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
    // ----- 判斷是否須要擴容 end -----
}

/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            // 初始化新的hash表,大小爲以前的2倍,並賦值給成員變量nextTable
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 擴容完成時,將成員變量nextTable置爲null,並將table替換爲rehash後的nextTable
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        else {
            // 接下來是遍歷每一個鏈表,對每一個鏈表的元素進行rehash
            // 仍然用頭結點做爲鎖,因此在擴容的時候,沒法對這個鏈表執行put操做
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        // setTabAt方法調用了Unsafe.putObjectVolatile來完成hash表元素的替換,具有volatile內存語義
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}
複製代碼

根據上述代碼,對ConcurrentHashMap是如何解決HashMap併發問題這一疑問進行簡要說明。

  • 首先new一個新的hash表(nextTable)出來,大小是原來的2倍。後面的rehash都是針對這個新的hash表操做,不涉及原hash表(table)。
  • 而後會對原hash表(table)中的每一個鏈表進行rehash,此時會嘗試獲取頭節點的鎖。這一步就保證了在rehash的過程當中不能對這個鏈表執行put操做。
  • 經過sizeCtl控制,使擴容過程當中不會new出多個新hash表來。
  • 最後,將全部鍵值對從新rehash到新表(nextTable)中後,用nextTable將table替換。這就避免了HashMap中get和擴容併發時,可能get到null的問題。
  • 在整個過程當中,共享變量的存儲和讀取所有經過volatile或CAS的方式,保證了線程安全。

6 總結

多線程環境下,對共享變量的操做必定要當心。要充分從Java內存模型的角度考慮問題。

ConcurrentHashMap中大量的用到了Unsafe類的方法,咱們本身雖然也能拿到Unsafe的實例,但在生產中不建議這麼作。 多數狀況下,咱們能夠經過併發包中提供的工具來實現,例如Atomic包下面的能夠用來實現CAS操做,lock包下能夠用來實現鎖相關的操做。

善用線程安全的容器工具,例如ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue等,由於咱們在工做中沒法像ConcurrentHashMap這樣經過Unsafe的getObjectVolatile和setObjectVolatile原子性的更新數組中的元素,因此這些併發工具是很重要的。


歡迎關注個人微信公衆號

公衆號
相關文章
相關標籤/搜索