ConcurrentHashMap源碼探究 (JDK 1.8)

很早就知道在多線程環境中,HashMap不安全,應該使用ConcurrentHashMap等併發安全的容器代替,對於ConcurrentHashMap也有必定的瞭解,可是因爲沒有深刻到源碼層面,不少理解都是浮於表面,稍微深一點的東西就不是很懂。這兩天終於下定決心將ConcurrentHashMap的源碼探究了一遍,記錄一下心得體會,算是對閱讀源碼的一個總結吧。須要提醒讀者注意,由於我的水平有限,且本文本質上來說是留給將來的本身進行查閱的總結,因此不免會有錯漏,一經發現,本人會盡快糾正,也歡迎你們提出寶貴的意見。數組

1.構造器

先從構造器講起。ConcurrentHashMap共有5個構造器,不管是哪一個構造器,最後初始化後的容量都是2的整數冪,這些構造器簽名分別以下:安全

public ConcurrentHashMap();  //a
public ConcurrentHashMap(int initialCapacity);  //b
public ConcurrentHashMap(Map<? extends K, ? extends V> m);  //c
public ConcurrentHashMap(int initialCapacity, float loadFactor);  //d
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel);  //e

構造器a的方法體是空的,像容量、加載因子這些參數都取默認值;構造器b須要一個初始容量做爲參數,代碼以下:數據結構

public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

首先檢查參數合法性,若是參數initialCapacity超過了最大容許容量(MAXIMUM_CAPACITY = 1 << 30)的一半,則將容量設置爲MAXIMUM_CAPACITY,不然使用tableSizeFor方法來計算容量,最後將sizeCtl參數設置爲容量的值。關於sizeCtltableSizeFor等將在後文介紹。
構造器c使用一個外部的map進行初始化,sizeCtl設置爲默認容量,而後調用putAll方法進行容器初始化和複製操做。多線程

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

構造器d在內部之間調用構造器e,這兩個構造器惟一不一樣的是,構造器e額外提供了一個concurrencyLevel參數,構造器d將這個參數設置爲1併發

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

構造器e的邏輯仍然是先檢查參數合法性,concurrencyLevel參數的惟一做用是做爲initialCapacity的下限,除此以外別無他用。從這裏能夠看出 JDK 1.7 版本的併發度參數DEFAULT_CONCURRENCY_LEVEL已棄用。
構造器be在設置容器初始容量的時候有一點不一樣,構造器b使用的是initialCapacity + (initialCapacity >>> 1) + 1(即1.5*initialCapacity+1)做爲基礎容量,而構造器e使用的是1.0 + (long)initialCapacity / loadFactor(這裏的initialCapacity實際上至關於HashMap中的threshold,當loadFactor = 2/3時二者是相等的),在使用的時候須要注意這一點。less

2.主要字段

  • sizeCtl:該字段出鏡率很是高,取值複雜,要讀懂源碼,該字段是必需要弄清楚的。歸納來講,該字段控制內部數組初始化和擴容操做,其取值以下:
    • 負數
      • -1:表示容器正在初始化。
      • -NN-1個線程正在執行擴容。
      • 擴容前會被修改爲 (resizeStamp(tab.length) << RESIZE_STAMP_SHIFT) + 2,而且每增長一個擴容線程,sizeCtl的值加1,擴容線程完成對應桶的遷移工做,sizeCtl1,擴容完成後該值再次被設置成擴容閾值。
    • 正數和0
      • 在初始化的時候表示容器初始容量。
      • 初始化以後表示容器下次擴容的閾值(相似於HashMap中的threshold)。
  • RESIZE_STAMP_BITSRESIZE_STAMP_SHIFTsizeCtl中記錄stamp的兩個字段,這兩個字段在源碼中沒有任何位置會修改,它們的值目前都是16(不太明白這個stamp是什麼意思)。
  • 紅黑樹相關字段:
    • TREEIFY_THRESHOLD = 8:桶由鏈表轉換成紅黑樹結構的閾值,表示桶中的元素個數大於等於8個時將轉換成紅黑樹。
    • UNTREEIFY_THRESHOLD = 6: 桶由紅黑樹轉換成鏈表結構的閾值,表示桶中的元素個數小於等於6個時將轉換成鏈表。
    • MIN_TREEIFY_CAPACITY = 64:啓用紅黑樹的最小元素個數,當集合中的元素個數不足64時,即便某個桶中的元素已經達到8個,也只是執行擴容操做,而不是升級爲紅黑樹。
  • 容量相關字段:
    • MAXIMUM_CAPACITY = 1 << 30:最大容量
    • DEFAULT_CAPACITY = 16:默認容量
    • LOAD_FACTOR:加載因子,默認爲0.75f
  • table: 存放容器元素的數組對象
  • nextTable:平時爲null,在擴容時指向擴容後的數組。
  • baseCount:記錄元素個數,注意該計數器在多線程環境下不許,須要配合countCells使用。
  • counterCells:多線程環境下,用來暫時存放元素用。(???)
  • transferIndex:表示擴容時將數據從就數組向新數組遷移時的下標,多線程時根據該字段給各個線程分配各自獨立的遷移區間,以實現多線程協做擴容。

3.核心方法

ConcurrentHashMap是用來存儲元素的,最經常使用的就是一些增刪改查方法,此外,在容量不足時,會自動觸發擴容操做。 接下來將對ConcurrentHashMap中的主要方法進行分析。dom

  • Unsafe類相關方法
    ConcurrentHashMap廢棄了分段鎖,改用 CAS + Synchronized + valatile保證線程安全,而Java主要經過Unsafe類實現CAS,所以源代碼大量使用了Unsafe類的三個CAS方法,以下:
- compareAndSwapObject(Object o, long offset, Object expected, Object x);
   - compareAndSwapInt(Object o, long offset, int expected, int x);
   - compareAndSwapLong(Object o, long offset, long expected, long x);

這些方法很是類似,區別只是參數expectedx的類型。它們表達的意思是,若是對象ooffset位置的值是expected,則把值修改成x,不然不修改。其中o是給定的對象,offset表示對象內存偏移量,expected表示當前位置的指望值,x表示修改後的新值。
此外,ConcurrentHashMap封裝了三個數組元素訪問方法,底層依然是調用Unsafe類:ide

//從主存獲取tab[i],避免讀到髒數據
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    //將tab[i]的值從c改爲v
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    //將tab[i]的值v寫到主存
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
  • tableSizeFor
private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

tableSizeFor的做用是計算第一個大於等於c2的整數冪,ConcurrentHashMap用這個方法計算獲得的結果來做爲內部數組的長度。關於這個方法的介紹,網上已經有許多資料,這裏再也不贅述,僅僅記錄下源碼,由於代碼確實太驚豔了,提醒本身要時常學習源碼精髓。學習

  • 獲取元素(get(Object key))的源碼以下:
public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //計算key的hash值
        int h = spread(key.hashCode());
        //數組長度大於0,且對應的桶不爲空,其中(n-1) & h是計算哈希值h對應的數組位置
        if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
            //若是頭結點就是目標節點,則返回該節點的值
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //在紅黑樹或者nextTable中進行查找
            //若是桶中的結構是紅黑樹,那麼root節點的hash值是-2,若是容器正在擴容,會把ForwardingNode節點放在桶裏做佔位符,這種類型的節點hash值爲-1
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //走到這裏說明table裏都是正常的鏈表節點,按順序查找便可
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

get方法先進行一系列驗證,以後先判斷頭節點是否是key對應的節點,是就返回,不然檢查頭節點的hash值是否是負數,是負數的話就去紅黑樹或者擴容後的nextTable查找,不是負數則說明key對應的桶裏是鏈表結構,則按順序查找。其中方法spread方法用於計算keyhash值,其代碼以下:this

static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

h ^ (h >>> 16) 是將h右移16位以後,再與h進行亦或,結果中高16位保持不變,低16位是保存的是原來高16位和低16位的亦或結果。HASH_BITS的值是0x7fffffff,即2^31-1,只在spread方法裏用到,(h ^ (h >>> 16)) & HASH_BITS的結果至關於只是去掉了h ^ (h >>> 16)的符號位,後面在計算下標時再次與數組長度做按位與操做,完整的下標計算至關於((h ^ (h >>> 16)) & HASH_BITS) & (n-1),因爲n32位整數,其最大值也不會大於HASH_BITS,因此((h ^ (h >>> 16)) & HASH_BITS) & (n-1)的效果和(h ^ (h >>> 16)) & (n-1)同樣,彷佛這裏並不須要HASH_BITS,和HashMap中的hash()方法保持一致不就好了?這裏留個疑問待之後解答。

  • 添加/修改元素
    put方法底層調用的putVal,源碼以下:
public V put(K key, V value) {
        return putVal(key, value, false);
    }

    //onlyIfAbsent表示key不存在才插入,存在則不更新
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        //計算hash
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //若是數組還沒初始化,則先初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //若是key對應的桶是空的,而且經過原子操做成功的將新節點插入桶中,則本次插入結束,轉入後續的addCount操做
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //若是當前有線程正在轉移數據,則幫助其轉移
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    //這裏再次判斷頭結點有沒有發生變化,由於從上次賦值到加鎖期間,極可能有其餘線程對tab[i]這個桶進行了操做
                    if (tabAt(tab, i) == f) {
                        //fh>=0表示桶內結構是鏈表
                        if (fh >= 0) {
                            //桶內元素計數器
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //找到key,則更新對應的值
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //key不存在,則將新節點插入鏈表結尾
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //桶內結構是紅黑樹
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            //紅黑樹的binCount直接賦值爲2,我的理解是root節點只佔位置,不保存數據?留個疑問後面來解答。
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //桶內元素大於等於8,則轉換成紅黑樹
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //修改計數器的值,成功添加新元素纔會走到這裏
        addCount(1L, binCount);
        return null;
    }

若是數組沒有初始化,須要先執行initTable()進行初始化,從這裏能夠看到,ConcurrentHashMap採用延遲初始化的策略,第一次添加元素的時候纔對內部數組進行初始化。initTable()源碼以下:

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            //sizeCtl < 0 表示當前有其餘線程正在執行初始化,調用Thread.yield()方法讓當前線程讓出CPU時間片,保證了只能有一個線程對數組進行初始化
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            //若是當前線程成功將sizeCtl的值從sc更新爲-1,則由當前線程執行初始化操做,從這裏能夠看出sizeCtl=-1表示當前正在執行初始化
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        //計算數組容量
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //至關於sc = 0.75*n,即數組長度*0.75,相似於HashMap中的threshold
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //初始化完成後,將sizeCtl的值更新成擴容閾值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
  • helpTransfer
    putVal方法中,一個比較有意思的地方在於,若是當前線程發現有其餘線程正在進行數據轉移工做(即在擴容中),就幫助轉移數據,該方法源碼以下:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        //先確認已初始化,再確認f是ForwardingNode類型節點,即當前確實有線程在執行擴容和遷移數據的操做,最後確認擴容後的新數組是否已初始化完畢
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            //由於存在多線程擴容的狀況,每次都須要從新判斷擴容條件是否還知足
            //若是擴容已完成,那麼table=nextTable, nextTable=null,而且sizeCtl變成下次擴容的閾值,下面的三項檢查分別與這裏的狀況對應
            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //若是成功將sizeCtl的值加1,則進入transfer執行數據遷移工做,從這裏也能夠看出,每當有新線程協助擴容時,會將sizeCtl的值加1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

helpTransfer裏面最複雜的就是while循環體內的第一個條件判斷語句,接下來將一一進行分析。判斷條件裏頻繁出現rs這個變量,有必要先分析resizeStam方法,其源碼很是簡單,只有一行:

static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }

Integer.numberOfLeadingZeros(n)是計算當前數組長度n的二進制數表示中,最左邊的1以前有多少個0RESIZE_STAMP_BITS=16,假設n=16,其二進制最左邊有270,那麼resizeStamp返回的結果的二進制就是1000000000011011
瞭解了resizeStamp,接着再回到helpTransfer方法,從前面對sizeCtl的講解知道,在擴容前sizeCtl會被設置爲sc = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,這是一個負數,那麼將sc無符號右移16位恰好獲得resizeStamp(n),即rs變量的值。源碼中先作(sc >>> RESIZE_STAMP_SHIFT) != rs的判斷,保證rssc這兩個對應的數組長度相同,即當前擴容還沒結束。sc == rs + 1 || sc == rs + MAX_RESIZERS 這兩個判斷條件還沒看太懂,大概是限制擴容的線程不要超過最大容許線程數,可是具體爲何不是太清楚,有待後續查證。transferIndex是數據遷移的位置變量,從後往前開始遷移數據,當transferIndex <= 0時,說明遷移已經結束了。

  • transfer
    ConcurrentHashMap的擴容實際上就是新建一個兩倍大的數組,而後將老數據遷移到新數組的過程,這也是transfer的字面意思。數據遷移的真正過程都在transfer方法裏,該方法總共一百多行,下面將結合源碼一點點進行解析。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //stride至關於一個步長變量,每一個線程須要承擔stride個桶數據的遷移工做,這裏初始化的過程會參照機器CPU數量,在多CPU上stride=n/(8*NCPU),單CPU上stride=n,可是stride不能小於16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        //若是新數組還沒初始化,在這裏進行初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                //新建兩倍長度的數組
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //nextTab字段在這裏纔不爲null,而是指向新數組
            nextTable = nextTab;
            //數據遷移的下標,從後往前遷移
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        //是否能夠開始遷移下一個桶的標識符,當前桶遷移完畢才能夠接着遷移前一個桶的數據
        boolean advance = true;
        //記錄當前遷移工做是否結束
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //若是當前桶已遷移完畢,開始處理下一個桶
            while (advance) {
                int nextIndex, nextBound;
                //這裏遷移的桶區間是[bound,i-1],所以--i>=bound說明當前線程的遷移任務還沒結束,須要跳出while循環,執行後面的遷移工做
                if (--i >= bound || finishing)
                    advance = false;
                //若是全部的桶都已經分給了相應的線程進行處理,沒有多餘的桶給當前線程了,將i設置爲-1,讓當前線程退出遷移操做
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                //若是當前線程成功分到[nextIndex-stride,nextIndex-1]的區間,則能夠開始進行數據遷移了,在還有多餘的桶沒有分配時,新加入進來的擴容線程會先執行到這裏領取任務,下一個線程進來將會接着從nextIndex-stride往前遷移stride個桶的數據
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //當出現下面這些狀況時,說明擴容已結束,須要作一些善後工做
            //i<0的狀況在上面的while循環裏出現過,可是i >= n 和 i + n >= nextn這兩個條件會在什麼狀況下出現呢???
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //若是全部的數據遷移都已完成,則執行這裏的邏輯退出
                if (finishing) {
                    //nextTable從新賦值爲null
                    nextTable = null;
                    //table指向新數組,原來的數組將被GC回收
                    table = nextTab;
                    //sizeCtl = 2*n-0.5*n=1.5*n,這個值實際上就是新數組長度的0.75倍,即下一次擴容的閾值。這裏的作法很巧妙,n是原數組的長度,擴容後的長度是2*n,按照0.75的加載因子來算,新數組的擴容閾值就是2*n*0.75=1.5*n,但位運算顯然更快一些
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //若是當前線程的遷移工做已完成,就將sizeCtl的值減1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    //若是sizeCtl的值變成了擴容前的值(即resizeStamp(n) << RESIZE_STAMP_SHIFT + 2),說明擴容完成
                    //問題:上面的原子操做是現將修改前的sizeCtl賦值給sc,而後纔將sizeCtl減1,那麼sc應該永遠取不到resizeStamp(n) << RESIZE_STAMP_SHIFT + 2纔對,下面的return語句不會執行,這裏是否是哪裏理解得不對???留待之後驗證。
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //走到這裏說明全部的遷移工做都完成了,設置相關字段
                    finishing = advance = true;
                    //將i設置爲n,那麼線程將會接着執行最外層for循環,從後向前逐個檢查是否每一個桶都已完成數據遷移
                    i = n; // recheck before commit
                }
            }
            //若是原來的桶裏沒有數據,插入一個ForwardingNode做佔位符,告訴其餘線程當前正在進行擴容
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //這個判斷緊接着上一個判斷,若是已經有佔位符了,說明有線程已經處理過這個桶了,不能再處理這個桶
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //當前桶既不是佔位符也不是空(即桶裏面是鏈表或紅黑樹),會走到這裏
            else {
                //遷移桶內數據須要加鎖,避免其餘線程同時增長、刪除或修改桶裏的數據
                synchronized (f) {
                    //從上一次取桶的頭節點到加鎖以前,可能該桶已經被其餘線程處理過,須要進行二次判斷
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //桶裏是鏈表結構
                        if (fh >= 0) {
                            //這裏值得注意,回憶一下上面對put方法的分析,插入元素時,元素的hash值是根據spread(key.hashCode())進行計算的,hash值在擴容期間確定不會變,變化的僅僅是數組的長度,而數組的長度在擴容後會左移1位,所以元素在新數組中的位置就由hash值從低位開始的第n位決定,若是hash值第n位是0,則元素還在下標爲i的這個桶裏,不然元素的新位置是i+n。這裏的處理邏輯與HashMap中同樣,都是爲了下降計算新位置的開銷。
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            //這個for循環是爲了不新建沒必要要的節點,即通過計算髮現,若是當前鏈表的某個元素a及其後面的全部節點都在同一個桶內,那麼在進行數據轉移時,只須要處理a節點以前的節點便可,a節點及其以後的節點仍然維持原來的鏈表結構放在新數組的對應桶裏,這裏的lastRun就是待肯定的a節點
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            //若是最後幾個節點都呆在原來的桶裏,則設置ln指向lastRun節點,不然將hn指向lastRun節點
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            //這裏纔是真正的數據轉移過程,跟前面的分析同樣,從頭結點處理到lastRun節點便可。
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //如下的邏輯代表,數據遷移是採用在鏈表頭部插入數據的作法,lastRun以前的節點順序會被反轉
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            //ln對應老的桶下標i,hn對應新的桶下標i+n
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            //在元素遷移完以後,會將桶的頭結點設置爲ForwardingNode佔位節點
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //桶內是紅黑樹結構,邏輯與鏈表大同小異,再也不贅述
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

終於分析完transfer的邏輯了~~~如今來整理一下須要注意的地方。首先須要注意的是ForwardingNode節點則這段代碼裏有三處地方會用到,一是在最外層的for循環的邏輯中,當發現桶是null時,會將ForwardingNode節點插入,實際上這種狀況也能夠視爲這個桶數據已遷移完成,另外兩處都是在遷移完數據以後,將ForwardingNode插入做佔位符,所以當遍歷元素遇到佔位符時,都代表當前正在擴容,且這個桶裏的數據已經遷移完了;二是數據遷移的過程當中,鏈表首部的節點會變成倒序。
分析到這裏,put->initTable->helpTransfer->transfer這條線已經分析完了,put方法最後還遺留了一個addCount方法沒有分析,這部份內容跟容器計數有關,先看看addCount的源碼:

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //這部分跟計數有關
        //知足if判斷的條件:①counterCells!=null ② counterCells=null,且併發修改baseCount的值失敗,說明當前有其餘線程也在修改baseCount
        if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            //這個判斷是什麼意思目前不太清楚???留給後面確認並補充。
            if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            //counterCells!=null,而且counterCells中有數據,而且下標a對應的元素不是null,且成功的將CELLVALUE從a.value修改爲a.value+x,纔會執行到這裏
            if (check <= 1)
                return;
            //計算元素個數
            s = sumCount();
        }
        //這部分跟擴容有關
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            //s>=sizeCtl表示達到擴容閾值,須要進行擴容,條件是數組已經初始化而且沒有達到最大容量
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                //sc < 0 表示擴容已開始
                if (sc < 0) {
                    //這裏的判斷跟helpTransfer方法同樣,再也不贅述
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //當前線程加入擴容工做
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //走到這裏說明擴容工做還沒開始,由當前線程開始擴容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

這段代碼裏出現了counterCells字段,可是到目前爲止,並無看到在哪裏對這個字段進行了賦值,全局搜索發現,該字段只在fullAddCount方法裏被賦值,而fullAddCount方法也出如今addCount中,所以接下來將分析一下fullAddCount方法。

//See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        //若是ThreadLocalRandom尚未初始化,這裏先進行初始化
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        //這裏會自旋
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //若是counterCells已經初始化
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //這裏的原子操做限制了每次只能有一個線程執行到此處
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                //若是counterCells在h對應的索引位置還沒初始化,則初始化
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //若是counterCells還沒初始化
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        //counterCells要求長度必須是2的整數冪,所以先初始化爲2
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

從源碼的註釋能夠看到,fullAddCount借鑑了LongAdder的思想,所以源碼並無在這裏給出詳細解釋。本人對這塊也是隻知其一;不知其二,所以上面僅僅是把理解的內容進行了分析,而對於addCountfullAddCount的實現原理並無透徹的理解,這部分後面須要補充,此處就再也不進行額外的解讀,以避免誤人子弟。

  • tryPresize
    跟擴容有關的方法還有tryPresize,這個方法在ConcurrentHashMap中只有兩個地方調用,一個是putAll方法,另外一個是treeifyBin。源碼以下:
private final void tryPresize(int size) {
        //計算擴容後的數組長度
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //若是數組尚未初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                //將sizeCtl設置爲-1,開始執行初始化操做
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            //若是當前元素個數沒有達到擴容閾值(c<=sc),或者數組長度已經到最大值了,不須要擴容
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                //擴容已開始
                if (sc < 0) {
                    Node<K,V>[] nt;
                    //若是擴容已結束或者擴容線程已達到最大,當前線程什麼也不幹
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    //當前線程加入擴容工做
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                //擴容還沒開始,就由當前線程開始擴容
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
  • 統計容器內元素個數
    ConcurrentHashMap中,統計元素個數應該使用sumCount()而不是size()方法,緣由在於:方法返回的是int類型,而實際上concurrentHashMap存儲的元素能夠超過這個範圍,sumCount方法的返回值爲long類型能夠說明這一點,其源碼以下:
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        //元素數量是baseCount和counterCells中保存的元素數量之和
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

4.問題整理

在閱讀代碼的過程當中,仍然有些地方的邏輯不是很懂,在這裏將這些問題記錄下來,方便查閱和後期補充。

  • counterCells字段是用來幹什麼的?
  • spread方法裏爲什麼要和HASH_BITS做按位與計算?
  • putVal方法中,當發現桶中的數據結構是紅黑樹時,binCount爲什麼直接賦值爲2
  • transfer方法中,for循環體重的if條件語句中,什麼狀況下會知足i >= ni + n >= nextn這兩個條件?
  • transfer方法中,下面的if語句彷佛永遠爲false,怎麼理解?
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
  • addCountfullAddCount方法的原理還不是太理解,須要再研究研究。
  • RESIZE_STAMP_BITSRESIZE_STAMP_SHIFT這兩個字段的做用不太瞭解,以及sizeCtl裏面是哪部分在記錄stamp
  • ⑦源碼中在修改sizeCtl的值時,有時候直接使用sizeCtl,有時候又使用相似於U.compareAndSwapInt(this, SIZECTL, sc, -1)這種,二者有何區別?
  • ⑧源碼中屢次出現if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)這種判斷,其中的sc == rs + 1 || sc == rs + MAX_RESIZERS該如何理解?

5.總結

讀源碼的過程是痛苦的,尤爲是剛開始閱讀的時候,有許許多多東西都看不懂,以致於根本不知道從哪裏看起,然而一旦可以看下去了,會發現其實源碼並非太可怕,遇到的困難都是能夠克服的。通過JDK 1.8以後,ConcurrentHashMap的源碼的行數達到了6000+,本文只是記錄了ConcurrentHashMap很是有限的內容,還有許多內容並未觸碰,所以能夠說本人對於ConcurrentHashMap的瞭解也很是有限。最開始讀源碼時,並無想過要寫一篇博客來記載,可是很快發現,若是不記下來,過不了多久就會把已經看過的內容忘個七七八八,所以才動了寫篇讀後感的念頭。固然,在寫做的過程當中,本身的思路也捋順了許多,也算是個額外的收穫。

6.參考文獻

從開始看源碼到寫完本文,其實看了很多前人的優秀文章,這裏先佔個坑位,後面會把相關參考資料整理到這裏。

相關文章
相關標籤/搜索