JUC源碼分析-集合篇(一)ConcurrentHashMap

JUC源碼分析-集合篇(一)ConcurrentHashMap

1. 概述

《HashMap 源碼詳細分析(JDK1.8)》:http://www.javashuo.com/article/p-hbiddcap-mp.htmljava

Java7 整個 ConcurrentHashMap 是一個 Segment 數組,Segment 經過繼承 ReentrantLock 來進行加鎖,因此每次須要加鎖的操做鎖住的是一個 segment,這樣只要保證每一個 Segment 是線程安全的,也就實現了全局的線程安全。因此不少地方都會將其描述爲分段鎖。segmentfault

Java8 對 ConcurrentHashMap 進行了比較大的改動。結構上和 Java8 的 HashMap 基本上同樣,不過它要保證線程安全性,因此在源碼上確實要複雜一些。數組

Java8 ConcurrentHashMap結構

2. 源碼分析

ConcurrentHashMap 重點關注它是如何保證線程安全的,和 HashMap 相似的地方就再也不贅述。安全

2.1 構造方法

// 這構造函數裏,什麼都不幹
public ConcurrentHashMap() {
}

// HashMap 初始容量
public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

這個初始化方法有點意思,經過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),而後向上取最近的 2 的 n 次方】。如 initialCapacity 爲 10,那麼獲得 sizeCtl 爲 16,若是 initialCapacity 爲 11,獲得 sizeCtl 爲 32。多線程

在 HashMap 中初始容量直接使用的是 tableSizeFor(initialCapacity),不知道爲何在 ConcurrentHashMap 改在 1.5 * initialCapacity,至於加 1 估計是考慮 initialCapacity=0 的狀況。併發

sizeCtl 這個屬性使用的場景不少,這裏爲第一個使用場景:ConcurrentHashMap 初始化。sizeCtl=0(也就是無參構造器) 表示使用默認的初始化大小,不然使用自定義的容量。less

2.2 putVal 分析

由 put 入手分析 ConcurrentHashMap 中可能出現的線程安全性問題。dom

2.2.1 put 過程分析

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 1. 計算 hash 值,(h ^ (h >>> 16)) & HASH_BITS
    int hash = spread(key.hashCode());
    int binCount = 0;
    // 2. 經過自旋保證新添加的元素必定會成功添加到 HashMap 中
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 2.1 若是數組"空",進行數組初始化。如何保證數組擴容的線程安全(重點)
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 2.2 該 hash 對應的槽位(也叫桶bucket) 爲空,直接將這個新值放入其中便可
        //     數組 tab 是 volatile 修辭的,並不能說明其元素是 volatile 的。 U.getObjectVolatile
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // CAS 操做將這個新值放入該槽位,若是成功就結束了
            // 若是 CAS 失敗,那就是有併發操做,則走第 3 或 4 步
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 2.3 只有擴容時 f.hash==MOVED,該線程先幫助擴容才添加值
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 2.4 鎖住對應的槽位,剩下的操做和 HashMap 就差很少
        else {
            V oldVal = null;
            // 下面的操做都是線程安全的了
            synchronized (f) {
                // f 加鎖後對 f 的全部操做都是線程安全的,但 tab 自己並非線程安全的
                // 也就是說 tab[i] 可能發生變化
                if (tabAt(tab, i) == f) {
                    // 2.4.1 頭結點的 hash>=0,說明是鏈表
                    if (fh >= 0) {
                        ...
                    }
                    // 2.4.2 表示紅黑樹
                    else if (f instanceof TreeBin) {
                        ...
                    }
                }
            }
            // 鏈表binCount表示添加元素後的長度,紅黑樹binCount=2不能進行treeifyBin方法
            if (binCount != 0) {
                // 判斷是否要將鏈表轉換爲紅黑樹,臨界值和 HashMap 同樣,也是 8
                if (binCount >= TREEIFY_THRESHOLD)
                    // 這個方法和 HashMap 中稍微有一點點不一樣,那就是它不是必定會進行紅黑樹轉換,
                    // 若是當前數組的長度小於 64,那麼會選擇進行數組擴容,而不是轉換爲紅黑樹
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 3. 元素個數加1,並判斷是否擴容,如何保證線程安全
    addCount(1L, binCount);
    return null;
}

put 的主流程看完了,可是在下面幾個過程當中是如何保證線程安全的:ide

  • 第一個是數組初始化
  • 第二個是擴容
  • 第三個是幫助數據遷移

2.2.1 數據定位 hash

// 總之就一個目的,讓節點在 HashMap 中分佈更均勻
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

h 是對應 key 的 hashcode,計算節點的槽位是 key.hashcode 對數組的長度取餘(hash%length),但若是數組長度爲 2^n,則能夠直接使用位運算 hash&(length-1)。這個運算實際上只有 hash 的後幾位參與運算,爲了讓 hash 散列的更均勻,也就是 hash 更隨機,讓 hash 的高 16 和低 16 進行異或運算,這樣 hash 的後 16 位就更不容易重複了。函數

注意此時 hashcode 可能爲負值,負數在 ConcurrentHashMap 中有特殊的含義,爲了保證計算的 hash 必定是正數,能夠對於計算獲得的 hash 值,強制把符號位去掉,保證結果只在正數區間。 更多參考 hashcode 可能爲負值

hash = key.hashCode() & Integer.MAX_VALUE;

2.2.2 數組初始化 initTable

初始化方法中的併發問題是經過對 sizeCtl 進行一個 CAS 操做來控制的。

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // sizeCtl=-1 表示數組正在初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    // sizeCtl=0(也就是無參構造器) 表示使用默認的初始化大小,不然使用自定義的容量
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // n-n/4,即 0.75*n,和 HashMap 中閾值的計算方式同樣,只是這裏使用位運算
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

sizeCtl 這個屬性以前在 ConcurrentHashMap 初始化化時已經提到過,這裏引出了 sizeCtl 另外的使用場景:

  • sizeCtl 第一個使用場景:數組初始化前。sizeCtl=0(也就是無參構造器) 表示使用默認的初始化大小,不然使用自定義的初始化容量。
  • sizeCtl 第二個使用場景:數組初始化中。sizeCtl=-1 表示正在初始化數組 table
  • sizeCtl 第三個使用場景:數組初始化後。sizeCtl>0 表示 table 擴容的閾值

2.2.3 鏈表轉紅黑樹 treeifyBin

前面咱們在 put 源碼分析也說過,treeifyBin 不必定就會進行紅黑樹轉換,也多是僅僅作數組擴容。咱們仍是進行源碼分析吧。

// index是須要鏈表轉紅黑樹的節點索引
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        // MIN_TREEIFY_CAPACITY=64,當數組長度小於64時優先擴容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 後面咱們再詳細分析這個方法
            tryPresize(n << 1);
        // 加鎖後鏈表轉紅黑樹,這個和 HashMap 同樣
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

2.3 擴容

2.3.1 擴容 tryPresize

若是說 Java8 ConcurrentHashMap 的源碼不簡單,那麼說的就是擴容操做和遷移操做。數組擴容後真正進行數據遷移的其實是 transfer 方法,讀者應該提早知道這點。這裏的擴容也是作翻倍擴容的,擴容後數組容量爲原來的 2 倍。

// tryPresize 有兩個方法調用 putAll 或 putVal,不論是那個方法 size 都是擴容後的長度
private final void tryPresize(int size) {
    // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
        // 1. 數組沒有初始化先初始化,和數組 initTable 相似
        //    putAll 時可能數組還未初始化
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        // 2. 數組容量已經最大或足夠了,不須要擴容
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        // 3. 真正開始擴容
        else if (tab == table) {
            // 每次擴容時都會生成一個相似時間戳的校驗標記,上一次擴容和下一次擴容都不同
            // 這個值由當前數組的長度決定,格式必定爲 0000 0000 100x xxxx
            // 高 16 所有是0,低 16 的第一位是 1,後五位由 n 決定(n轉二進制後最高位1以前的0個數,最大32)
            int rs = resizeStamp(n);
            // sc<0 表示其它線程正在擴容,幫助擴容(sc=-1表示數組初始化,已經處理過了)
            // 擴容時 sizeCtl 高 16 位表示擴容戳(校驗標記),低 16 位表示(正在參與擴容的線程數+1)
            if (sc < 0) {
                Node<K,V>[] nt;
                // 3.1 若是 sc 的高 16 位不等於標識符(說明sizeCtl已經改變,擴容已經結束)
                // 3.2 若是 sc == 標識符 + 1 (擴容結束了,再也不有線程進行擴容)
                //     第一個線程設置 sc = rs << 16 + 2
                //     第二個線程設置 sc = sc + 1
                //     當一個線程擴容結束 sc = sc -1
                //     最後一個線程擴容結束後 sc == rs + 1(爲何是sc==rs+1,不該該是sc==(rs<<16)+1嗎????)
                // 3.3 若是 sc == 標識符 + 65535(輔助擴容線程數已經達到最大,問題同上????)
                // 3.4 若是 nextTable == null(結束擴容了,或者擴容還未開始)
                // 3.5 若是 transferIndex <= 0 (轉移狀態變化了)
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 開始擴容(網上說sizeCtl=-1表示數組初始化,所以這裏直接+2)
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

首先要注意方法名爲 tryPresize,既然方法名中包含了 try 就說明擴容有多是不成功的。事實上也正是如此,擴容的條件是 sizeCtl>=0,也就是說此時沒有線程在進行數組初始化或擴容的操做纔會進行擴容。

真正的擴容是由方法 transfer 實現的,這個方法的第二個參數表示擴容後的數組。若是是由當前線程發起的擴容第二個參數爲 null,若是其它線程已經在擴容,則當前線程也加入到擴容中去,擴容後的數組已經存在 nextTable。

sizeCtl

前面已經講解了 sizeCtl 在數組初始化前中後值的變化,這裏須要重點關注一下 sizeCtl 在擴容中的使用。sizeCtl 分爲兩部分,高 16 位表示擴容戳(校驗標記),低 16 位表示正在參與擴容的線程數(線程數+1)。

高 16 位表示擴容戳(校驗標記)

static final int resizeStamp(int n) {
    // Integer.numberOfLeadingZeros(n) 表示 n 轉二進制後最高位 1 以前的 0 個數,這個數必定小於 32
    // 1 << (RESIZE_STAMP_BITS - 1) 表示 0x7fff,也就是將低 16 位的第一位改爲 1
    // 最終結果爲 0000 0000 100x xxxx
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

這個值由當前數組的長度決定,格式必定爲 0000 0000 100x xxxx,這個數 rs<<16 後最高位必定是 1,也就是說 sizeCtl 是一個負數。

低 16 位正在擴容的線程數 + 1

初始化時直接 +2,以後每多一個線程參與擴容 +1,這個線程擴容線束則 -1,最終擴容完成則是 1。

2.3.2 數據遷移 helpTransfer

putVal 時發現節點爲節點的 f.hash=MOVED,說明有其它線程在對數組進行擴容則會調用 helpTransfer,也就是當前線程先幫助數組擴容後再添加元素。

// 和 tryPresize 差很少,最複雜的部分仍是這個if條件判斷
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            // 即然節點已經修改成 ForwardingNode 則說明擴容後的數組已經建立
            // 因此這裏的條件判斷少了一個 nextTable=null
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

2.3.3 數據遷移 transfer

閱讀源碼以前,先要理解併發操做的機制。原數組長度爲 n,因此咱們有 n 個遷移任務,讓每一個線程每次負責一個小任務是最簡單的,每作完一個任務再檢測是否有其餘沒作完的任務,幫助遷移就能夠了,而 Doug Lea 使用了一個 stride,簡單理解就是步長,每一個線程每次負責遷移其中的一部分,如每次遷移 16 個小任務。因此,咱們就須要一個全局的調度者來安排哪一個線程執行哪幾個任務,這個就是屬性 transferIndex 的做用。

第一個發起數據遷移的線程會將 transferIndex 指向原數組最後的位置,而後從後往前的 stride 個任務屬於第一個線程,而後將 transferIndex 指向新的位置,再往前的 stride 個任務屬於第二個線程,依此類推。固然,這裏說的第二個線程不是真的必定指代了第二個線程,也能夠是同一個線程,這個讀者應該能理解吧。其實就是將一個大的遷移任務分爲了一個個任務包。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    // 1. stride 能夠理解爲」步長「,有 n 個位置是須要進行遷移的
    //    將這 n 個任務分爲多個任務包,每一個任務包有 stride 個任務
    //    stride 在單核下直接等於 n,多核模式下爲 (n>>>3)/NCPU,最小值是 16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range

    // 2. 若是 nextTab 爲 null,先進行一次初始化,爲何是線程安全的????
    //    前面咱們說了,外圍會保證第一個發起遷移的線程調用此方法時,參數 nextTab=null
    //    以後參與遷移的線程調用此方法時,nextTab!=null
    if (nextTab == null) {            // initiating
        try {
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;

    // 3. ForwardingNode 是佔位用的,標記該節點已經處理過了
    //    這個構造方法會生成一個 Node,key、value 和 next 都爲 null,關鍵是 hash 爲 MOVED
    //    後面咱們會看到,原數組中位置 i 處的節點完成遷移工做後,
    //    就會將位置 i 處設置爲這個 ForwardingNode,用來告訴其餘線程該位置已經處理過了
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    // advance=true 表示一個節點的數據已經處理完了,準備獲取下一個節點
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab

    // i 每次任務的上邊界,bound 是下邊界,注意是從後往前
    // --i < bound 時就領取一次的任務,直到任務處理完畢
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        // 4. advance 爲 true 表示能夠進行下一個位置的遷移了
        //    第一次while循環:當前線程領取任務,走第三個else(一直自旋嘗試領取任務)
        //          最終的結果爲:i 指向了 transferIndex,bound 指向了 transferIndex-stride
        //    以後每處理完一個節點:走第一個if,處理的下一個槽位的節點,直到當前線程領取的任務處理完畢
        //    再次走第三個else,領取步長stride的任務直到transferIndex<=0
        while (advance) {
            int nextIndex, nextBound;
            // 4.1 --i表示處理一下槽位的節點
            if (--i >= bound || finishing)
                advance = false;
            // 4.2 transferIndex每領取一次任務減去一個步長stride
            //     transferIndex初始值爲table.length
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // 4.3 自旋嘗試領取步長stride的任務
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                // nextBound 是此次遷移任務的下邊界,注意,是從後往前
                bound = nextBound;
                // i 是此次遷移任務的上邊界
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 5. 若是 i 小於0 (不在 tab 下標內,按照上面的判斷,領取最後一段區間的線程擴容結束)
        //    若是 i >= tab.length(不知道爲何這麼判斷)
        //    若是 i + tab.length >= nextTable.length(不知道爲何這麼判斷)
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            // 5.1 完成擴容
            if (finishing) {                    // 完成擴容
                nextTable = null;
                table = nextTab;                // 更新 table
                sizeCtl = (n << 1) - (n >>> 1); // 更新閾值
                return;
            }
            // 5.2 sizeCtl 在遷移前會設置爲 (rs << RESIZE_STAMP_SHIFT) + 2
            //     而後,每有一個線程參與遷移就會將 sizeCtl 加 1
            //     這裏使用 CAS 操做對 sizeCtl 進行減 1,表明作完了屬於本身的任務
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 不相等說明有其它線程在輔助擴容,當前線程直接返回,注意 sc 是-1以前的值
                // 還有其它線程在參與擴容,也就是說擴容還未結束,當前線程直接返回
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                // 相等說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 6. 若是位置 i 處是空的,沒有任何節點,那麼放入剛剛初始化的 ForwardingNode 」空節點「
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        // 7. 該位置處是一個 ForwardingNode,表明該位置已經遷移過了
        else if ((fh = f.hash) == MOVED)
            advance = true; // already processed
        // 8. 數組遷移,原理和 HashMap 同樣
        else {
            // 對數組該位置處的結點加鎖,開始處理數組該位置處的遷移工做
            synchronized (f) {
                ...
            }
        }
    }
}

上面這部分代碼看懂了,下面這部分數據遷移的代碼能夠先不看,和 HashMap 是同樣的。

2.4 計數器累加 addCount

只要對 ConcurrentHashMap 中的元素進行增刪,元素的個數就會發生變化,這時就須要調用 addCount 方法。在看這個方法以前先看一下 ConcurrentHashMap 是如何記錄元素的個數的。

2.4.1 ConcurrentHashMap 元素個數記錄

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

// sumCount 用於統計當前的元素個數
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

能夠看到元素的個數分爲兩部分,一是 baseCount;二是 counterCells 數組中的元素累加。爲何這麼複雜?弄個 int 自增,或者多線程環境使用 AtomicInteger 不就能夠了?

AtomicLong缺點

咱們都知道 AtomicLong 是經過 CAS 自旋的方法去設置 value,直到成功爲止。那麼當併發數比較多時,就會致使 CAS 的失敗機率變高,重試次數更多,越多的線程重試,CAS 失敗的機率越高,造成惡性循環,從而下降了效率。

《LongAddr源碼解析》:https://www.jianshu.com/p/d9d4be67aa56

2.4.2 addCount 元素個數統計

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // 1. 統計元素個數
    // 1.1 嘗試直接使用 cas 修改 baseCount 的值,若是不存在鎖競爭,元素個數修改爲功,直接結束
    //     若是失敗則存在鎖競爭,使用更小粒度的 cas 操做
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        // 1.1 uncontended表示是否存在鎖競爭
        boolean uncontended = true;
        // 1.2 將線程分流到 CounterCell[] 數組中,使用更小粒度的 cas 操做
        //     (1) CounterCell[]未初始化
        //     (2) CounterCell[]中的節點未初始化
        //     (3) 對 as[random] 進行 cas 操做,失敗則存在鎖競爭問題uncontended=false,成功直接返回
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
        }
        if (check <= 1)
            return;
        // 1.3 統計當前元素的個數
        s = sumCount();
    }
    // 2. 擴容操做,見 2.3 節
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            // sc<0 表示有其它線程在擴容
            if (sc < 0) {
                // 擴容已經結束
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // 加入到擴容中,擴容的線程數+1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            // 當前線程發起擴容,注意線程數是直接+2
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

addCount 方法分爲兩部分:一是統計元素個數,二是查看更不須要擴容。這裏只關注第一部分:

  • 首先嚐試直接使用 cas 修改 baseCount 的值,若是不存在鎖競爭,元素個數修改爲功,直接結束
  • 若是存在鎖競爭,將線程分流到 CounterCell[] 中,使用使用更小粒度的 cas 操做,若是成功直接返回
  • 上述操做失敗,則uncontended=false表示存在鎖競爭,調用 fullAddCount
//基礎值,沒有競爭時會使用這個值
private transient volatile long baseCount;
//存放Cell的hash表,大小爲2的冪
private transient volatile CounterCell[] counterCells;
// 經過cas實現的鎖,0無鎖,1得到鎖
private transient volatile int cellsBusy;

2.4.3 fullAddCount

private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        CounterCell[] as; CounterCell a; int n; long v;
        // 1. counterCells 已經初始化
        if ((as = counterCells) != null && (n = as.length) > 0) {
            // 1.1 counterCells 子節點未初始化
            if ((a = as[(n - 1) & h]) == null) {
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            // 1.2 執行到這裏說明 cell 位置不爲空
            //     wasUncontended=false時表示有鎖競爭,直接空輪詢一次????
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // 1.3 經過cas將x值加到a的value上
            else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                break;
            // 1.4 counterCells表大於可用cpu數量,或者as數組過期;設置碰撞標識爲false
            else if (counterCells != as || n >= NCPU)
                collide = false;  
            // 1.5 collide=false則空輪詢一次
            else if (!collide)
                collide = true;
            // 1.6 counterCells擴容一倍
            else if (cellsBusy == 0 &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == as) {// Expand table unless stale
                        CounterCell[] rs = new CounterCell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        counterCells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = ThreadLocalRandom.advanceProbe(h);
        }
        // 2. counterCells 初始化長度爲 2
        else if (cellsBusy == 0 && counterCells == as &&
                 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == as) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        // 3. 再次嘗試修改 baseCount,失敗就一直自旋直至成功
        else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
            break;                          // Fall back on using base
    }
}

天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索