Java容器系列-ConcurrentHashMap源碼分析

ConcurrentHashMap 用法上與 HashMap 差異不大,但 ConcurrentHashMap 是線程安全的,能夠在多線程環境中使用。這篇文章主要會說明 ConcurrentHashMap 專有的一些特色,與 HashMap 相似部分將再也不贅述。java

本文基於 JDK1.8數組

成員變量及常量

ConcurrentHashMap 的代碼複雜度高了很多,用到了不少的成員變量和常量,先認識一下(HashMap 已經存在的變量或常量就再也不贅述)。安全

常量

// 默認併發度,同時容許多少個線程訪問
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 擴容時每一個線程擴容時至少要遷移的桶的數量,最低不能少於 16
private static final int MIN_TRANSFER_STRIDE = 16;
// 輔助變量,沒啥用
private static int RESIZE_STAMP_BITS = 16;
// 可用於擴容的最大線程數,但通常確定到不了這個數
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
// 會用來計算一個標誌位,實際上也沒什麼用
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 下面三個常量是幾個特殊的哈希值
// MOVED:表示桶正在經被遷移
// TREEBIN:表示桶正在進行樹化
// RESERVED:表示節點運行 computeIfAbsent 等方法
static final int MOVED     = -1;
static final int TREEBIN   = -2;
static final int RESERVED  = -3;
// 用於計算 key 的 hash 值
static final int HASH_BITS = 0x7fffffff; 
// CPU 的數量
static final int NCPU = Runtime.getRuntime().availableProcessors();
複製代碼

變量

這些變量基本都使用了 volatile 關鍵字,那是由於這些變量的再併發環境中必須都保持可見性。微信

// 桶數組,與 HashMap 基本一致,也是延遲加載,不過這裏使用了 volatile 關鍵字
transient volatile Node<K,V>[] table;
// 桶數組,用於擴容
private transient volatile Node<K,V>[] nextTable;
// 記錄全部的元素的個數,相似於 HashMap 的 size
private transient volatile long baseCount;
// 初始化和擴容的標誌位
// 默認值:0
// 初始化前:初始化容量大小
// 正在初始化:-1
// 擴容前:觸發擴容操做的元素個數,至關於 HashMap 的 threshold
// 正在擴容:-(1 + 參與擴容的線程數量)
private transient volatile int sizeCtl;
// 擴容的時候須要對桶內的元素進行遷移,這個變量用來記錄桶的下標,表示遷移的進度,下面會詳細介紹這個變量
private transient volatile int transferIndex;
// 更新 counterCells 時使用的自旋鎖
private transient volatile int cellsBusy;
// 計數用,用於計算還沒來的及更新到 baseCount 中的變化
private transient volatile CounterCell[] counterCells;
複製代碼

具體實現

與 HashMap 相比,ConcurrentHashMap 是線程安全的。容許多個線程併發的訪問容器的不一樣部分來減小線程間的競爭。這個容器設計出來不是爲了替代 HashMap,而是爲了在知足多線程環境下的需求,它有兩個設計目標:多線程

  • 使容器能夠支持併發讀(好比 get(), 迭代器等)同時讓多線程更新競爭的代價最小化。
  • 對空間的消耗與 HashMap 持平甚至更好

總的來講就是 ConcurrentHashMap 既要能支持高併發,也要有高性能。具體實現也通過了屢次變化,特別是在 JDK1.8,幾乎進行了重寫,底層的存儲機制也徹底不一致。JDK 1.7 和 JDK1.8 底層存儲的差別:併發

// JDK1.7
final Segment<K,V>[] segments;
transient volatile HashEntry<K,V>[] table; // 每個分段鎖都會有一個 table
複製代碼
// JDK1.8
transient volatile Node<K,V>[] table;
複製代碼

在 JDK1.8 中,併發的粒度更細一些,能夠認爲 table 的長度就是併發數,而以前的版本中,Segment 的數量是併發度。dom

由於使用了 CAS,因此在 ConcurrentHashMap 中存在大量的自旋操做,自旋操做其實就是一個死循環,等到完成操做時就會經過 break 跳出循環。ide

ConcurrentHashMap 的 hash 函數與 HashMap 的相差不大,不過除了與自身進行 XOR(異或) 操做,還會與 HASH_BITES 進行與運算:函數

// ConcurrentHashMap.spread()
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}
複製代碼

HASH_BITS 的二進制表示是:高併發

01111111 11111111 11111111 11111111
複製代碼

CAS

在 JDK1.8 之前,ConcurrentHashMap 主要使用分段鎖的機制來實現,在 JDK 1.8 及之後,主要使用了 CAS(sun.misc.Unsafe) + synchronized 來實現。CAS 是一種無鎖的併發技術,以高效率著稱,CAS 須要硬件的支持,現在的 CPU 都支持這一特性。

但 ConcurrentHashMap 並無實現本身的 CAS,而是直接使用了 sun.misc.Unsafe(最新的 JDK 中已經換成 jdk.internal.misc.Unsafe)。

ConcurrentHashMap 利用 CAS 實現了了如下三個原子方法來訪問桶的第一個元素

// 獲取桶的某個位置,任何狀況下可使用
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
// 插入桶的第一個鍵值對,能夠在併發環境下,任何狀況下可使用
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
// 把鍵值對插入到桶中,只在有鎖的的區域使用
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}
複製代碼

桶的第一個元素有特殊的意義,在 ConcurrentHashMap 中一般被用做桶的鎖

CAS 除了用來訪問桶以外,在用在其餘須要併發更新變量的地方。好比更新 sizeCtl 變量:

// ConcurrentHashMap.initTable()
// 將容器的狀態設置爲正在擴容
U.compareAndSetInt(this, SIZECTL, sc, -1)
複製代碼

synchronized 給人的印象是很慢,很臃腫,其實這是一個誤解,synchronized 底層通過不斷的優化,目前性能已經與可重入鎖至關。並且 synchronized 使用簡單,也不會形成是死鎖的狀況,因此通常狀況下能用 synchronized 就別用鎖了,除非知足不了需求再考慮用鎖。

在 ConcurrentHashMap 中 synchronized 使用時粒度都比較小,被 synchronized 包裹的代碼不是不少,因此仍是能夠保持高性能。這也是 ConcurrentHashMap 與 Hashtable 的最大區別。Hashtable 也是使用 synchronized 來保證線程安全,可是 synchronized 都是在方法級別使用,這樣就會讓整個容器的併發級別很低。

擴容機制

擴容是一個很慢的操做,能夠事先預估好大小,能夠減小擴容的次數。擴容機制與 HashMap 有些不一樣,由於 ConcurrentHashMap 能夠併發訪問,因此在擴容時寫操做的線程都不能繼續,可是這些線程也能夠被利用起來,參與到擴容操做中。

對容器的擴容分爲兩種狀況:

  • **初始化:**首次插入元素時初始化,也稱之爲延時加載
  • **擴容:**存儲的元素達到臨界值開始擴容

初始化和擴容這兩個過程不是獨立存在的,經過下面這個圖來看清總體流程時如何進行的:

實例化時會肯定table大小,可是不初始化table,以及肯定下一次擴容的臨界點,若是構造函數傳入的是另外一個 Map,調用 tryPresize 來擴容。

在首次插入元素時,會初始化 table(延遲加載),調用 initTable() 進行初始化。

若是不是首次插入元素,判斷是否正在擴容,若是是,則中止操做(除了 get() 操做),參與擴容流程。擴容完成後,經過自旋再次進行操做(插入或者更新),插入元素時須要檢查是否達到樹化的條件,若是知足,將鏈表轉成樹。插入完成後調用 addCount() 檢查容器狀態,若是元素大於等於擴容臨界點的值,則開始擴容

初始化經過 initTable() 方法來完成。

// ConcurrentHashMap.initTable()
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 檢查當前桶是否爲空,爲空則開始初始化
    while ((tab = table) == null || tab.length == 0) {
        // 發現正在初始化或者在擴容,則什麼也不作,進入自旋狀態等待表被初始化(或擴容)完成
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // 放棄 CPU 資源
        // 線程開始擴容時會把 sizeCtl 的值置爲 -1,讓其餘線程發現正在進行初始化
        else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // 肯定初始化桶的數量,若是 sizeCtl 大於 0 則使用 sizeCtl 的值,不然使用默認容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 設置擴容閥值,ConcurrentHashMap 中的裝載因子僅僅在構造函數中使用
                    sc = n - (n >>> 2); // 至關於 n * 0.75
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
複製代碼

擴容操做經過下面兩個方法來發起:

  • addcount
  • tryPresize

addcount() 在改變容器元素的方法中被調用,主要就是檢查容器當前的狀態,判斷是否須要擴容,若是須要,就會進行擴容。

// ConcurrentHashMap.addcount()
// 這個方法主要用來給當前容器的數量進行計數順便檢查一下是否須要擴容
private final void addCount(long x, int check) {
    CounterCell[] cs; long b, s;
    // 給容器中的元素進行增或者減
    // 若是 cs 不爲 null(說明有併發狀況)或者 baseCount 增減運算失敗,
    if ((cs = counterCells) != null ||
        !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell c; long v; int m;
        boolean uncontended = true;
        // 那麼就會經過 cs 來進行計數,
        // 若是 cs 是空(還不是併發)或者 (cs 中隨機取餘一個數組位置爲空 或者 cs 這個位置的變量失敗)
        // 說明經過 cs 來計數也失敗了,最後纔會調用 fullAddCount 來進行計數
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
                U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
            // 與 LongAdder 實現一致,能夠理解爲併發狀況下的一個計數器
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        // 統計當前節點的數量
        s = sumCount();
    }
    // 在增長元素的操做中 check 都會知足這個條件
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        // 檢查擴容條件:
        // 1. 是否達到閥值: s >= sizeCtl (上文已經解釋了 sizeCtl,sizeCtl 大於 0 時表示下次擴容的臨界點)
        // 2. 是否能夠擴容: tab != null && tab 當前的長度小於 1 << 30
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            // 根據當前桶的數量生成一個標誌位
            int rs = resizeStamp(n);
            // 若是正在擴容
            if (sc < 0) {
                // 檢查當前擴容的進展:
                // 1. 若是 sc 的低 16 位不等於標識位( sizeCtl 變化了,說明容器狀態已經變化),退出
                // 2. 若是 sc == 標識位 + 1 (經過下面代碼可知,剛開始擴容時, sc = rs + 2,若是 sc = rs + 1,說明已經沒有線程在擴容),退出
                // 3. 若是 sc == 標識符 + 65535,參與擴容的線程已經達到最大數量,當前線程再也不參與,退出
                // 4. 若是 nextTable == null 說明擴容結束(nextTable 在擴容中起中轉做用,全部的元素會被限移到 nextTable 中,最後讓 tab = nextTable,nextTable == null 來完成擴容),退出
                // 5. transferIndex <= 0 說明沒有桶還須要遷移了(transferIndex 用於標識當前遷移到哪一個桶了,小於等於 0 說明已經遷移到最後一個桶或者已經遷移完成,遷移的順序是從最後一個桶開始),退出。
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 若是遷移仍是進行,當前線程嘗試參與擴容
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 若是當前不在擴容中,則發起一個新的擴容
            else if (U.compareAndSetInt(this, SIZECTL, sc,
                                            (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            // 統計當前節點的數量
            s = sumCount();
        }
    }
}
複製代碼

tryPresize 相比於 addcount 方法相對簡單,就是嘗試進行擴容:

// ConcurrentHashMap.tryPresize()
private final void tryPresize(int size) {
    // 根據 size 計算擴容的容量
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    // 判斷是否能夠進行擴容,若是 sizeCtl <= 0,說明已經在擴容中,那麼久不會再進行擴容
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 若是當前容器尚未初始化,則進行初始化,與 initTable 相同
        if (tab == null || (n = tab.length) == 0) {
            // 當前的擴容閥值與傳入的值之間選大的做爲此次初始化的大小
            n = (sc > c) ? sc : c;
            // 進入初始化狀態
            if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2); // 至關於 n * 0.75
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 若是還每達到擴容的閥值或者超過了最大容量,則中止擴容
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            // 開始進行擴容
            int rs = resizeStamp(n);
            if (U.compareAndSetInt(this, SIZECTL, sc,
                                    (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
複製代碼

擴容的具體操做是經過 transfer() 方法來完成。

// ConcurrentHashMap.transfer() 該方法用於將元素都遷移到 nextTable 中
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 在遷移元素時,會將桶分段,stride 表示每段的長度,最小值爲 16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    // 初始化 nextTable
    if (nextTab == null) {
        try {
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 這個變量用於記錄當前遷移的進度,須要注意的是遷移元素從最後一個桶開始
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // fwd 是一個特殊的 Node,沒有 key,也沒有 val,hash 值爲 MOVED,用來標識一個桶已經遷移完畢
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // 用來控制遷移的進展,若是爲 true 說明當前此次循環要乾的事情已經完成,能夠開始下一個循環
    boolean advance = true;
    // 標示當前線程全部桶的遷移是否完成
    boolean finishing = false;
    // 當前線程須要處理的桶的範圍 [nextBound, nextindex)
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            // transferIndex <= 0 表示已經遷移完成
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // 爲當前線程分配桶的區間,當前線程須要將負責這個區間內的桶元素遷移到 nextTable 中
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 判斷當前線程是否完成全部桶的遷移
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 若是爲 true,說明全部的遷移任務已經完成
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1); // 至關於 n * 0.75
                return;
            }
            // 將參與擴容的線程數量減 1
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 若是不相等說明還有其餘的線程在參與擴容,當前線程直接退出就行,這行代碼與 tryPresize() 中傳入的參數有關,第一個進行擴容的線程傳入的 sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,因此若是這是最後一個線程,那麼 sc - 2 == resizeStamp(n) << RESIZE_STAMP_SHIFT
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 最後退出的線程須要再檢查一遍容器的狀態
                finishing = advance = true;
                i = n;
            }
        }
        // 若是桶中的元素都遷移完成了,則在桶的節點置爲 MOVED,表示桶中的元素都遷移完成了
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // 當前桶已經被處理
        else {
            // 若是上面條件都不知足說明要開始遷移桶中的元素
            synchronized (f) {
             // 省略搬運元素的代碼...
            }
        }
    }
}
複製代碼

樹化操做

樹化的方式與時機和 HashMap 基本一致。在單個桶的鏈表元素個數大於 8 時嘗試進行樹化操做,可是若是此時整個容器的容量少於 64 時,會進行擴容操做,而不是進行樹化操做,樹化後一樣也維護元素的 next 指針來保持鏈接關係。

樹化操做只須要對當前線程所訪問的桶進行操做,因此整個過程比擴容要簡單不少,是經過 CAS + synchronized 來完成。

// ConcurrentHashMap.treeifyBin()
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n;
    if (tab != null) {
        // 若是容器的容量小於 64,則會進行擴容操做,而不是進行樹化操做
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        // 利用 CAS + synchronized 來把鏈表轉成紅黑樹
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 把轉換好的樹放到桶上
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
複製代碼

增刪改查操做

在每一個桶上插入第一個元素的時候使用 CAS 就夠了。若是插入的不是桶上的第一個元素,或者是刪除或者更新操做,就仍是要用到 synchronized。但不會爲每個元素都建立一個鎖對象,而是使用桶上的第一個元素做爲鎖對象。可是僅僅將第一個元素上鎖還不夠,在更新以前,還須要驗證它依然是這個桶的第一個節點,若是不是,就要進行重試。

除了 get() 操做以外,其餘的 put()、clear() 等操做,都須要使用 CAS + synchronized 來進行併發訪問。get 操做相對簡單,直接經過 tabAt 方法獲取就行。其餘的操做邏輯總體就是同樣的。這裏主要介紹 putVal() 方法,put()、add()等向容器中增長或者更新元素的方法都是經過 putVal() 方法來完成的。

// ConcurrentHashMap.putVal()
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key 和 value 都不容許爲 null
    if (key == null || value == null) throw new NullPointerException();
    // 作 hash 運算
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 進入自旋
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // 若是桶尚未被初始化,則進入初始化(延遲加載)
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 若是這個桶爲空,直接使用 CAS 方式來插入元素
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;                  
        }
        // 若是發現正在擴容,則參與進擴容,擴容完成以後,經過自旋的方式再次執行插入操做
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 執行 computeOnlyAbsent 之類的方法
        else if (onlyIfAbsent
                    && fh == hash
                    && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                    && (fv = f.val) != null)
            return fv;
        else {
            V oldVal = null;
            // 使用 CAS + synchronized 機制插入元素
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 對現有的鍵值對進行更新
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 使用尾插法插入新的元素
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    // 若是桶上掛的是樹,那就按照樹的方法來插入節點
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    // 若是發現這個節點正在進行 computeIfAbsent 之類的操做,則拋出異常
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (binCount != 0) {
                // 檢查桶上節點的數量,若是超過 8 了,則嘗試進行樹化操做
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 若是是更新節點操做,那麼節點數量就沒有增長,直接返回便可
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 用這個方法來檢查是否知足擴容的條件,與上面的 helpTransfer 方法不一樣,addCount 是在鍵值對插入以後再去檢查是否須要擴容
    addCount(1L, binCount);
    return null;
}
複製代碼

其餘操做如 clear、comput、remove 等會改變容器元素的方法原理都相似,都是經過 CAS + synchronized 來更新元素,最後調用 addcount 方法來更新計數以及判斷是否須要擴容。

其餘功能

由於是支持併發的,因此 size 方法的實現也會有點不同,size 實際調用的是 sumCount 方法:

//ConcurrentHashMap.sumCount()
final long sumCount() {
    // 統計 cs 和 baseCount 的和
    CounterCell[] cs = counterCells;
    long sum = baseCount;
    if (cs != null) {
        for (CounterCell c : cs)
            if (c != null)
                sum += c.value;
    }
    return sum;
}
複製代碼

在擴容代碼中咱們看到了 cs 和 baseCount 其實都是用來的統計容器個數,在併發狀況下,會先記錄到 cs 最後可是須要注意的是,由於 sumCount 沒有加鎖,因此最後返回的值也不是徹底準確的。

另外 ConcurrentHashMap 使用的是 fail-safe 的機制,也就是說在迭代的過程當中若是容器中的元素變化,也不會拋出 ConcurrentModificationException 異常。

最後說一下迭代器的問題,KeySetView,ValuesView,EntrySetView 這三個類分別能夠迭代鍵、值、和鍵值對。具體的實現相對比較簡單,並且對於迭代的過程也沒有加上併發的控制,因此最後遍歷的結果也不必定是準確的。

原文

相關文章

關注微信公衆號,聊點其餘的

相關文章
相關標籤/搜索