Java ConcurrentHashMap 高併發安全實現原理解析

本文首發於 vivo互聯網技術 微信公衆號
連接: https://mp.weixin.qq.com/s/4sz6sTPvBigR_1g8piFxug
做者:vivo 遊戲技術團隊

1、概述

ConcurrentHashMap (如下簡稱C13Map) 是併發編程出場率最高的數據結構之一,大量的併發CASE背後都有C13Map的支持,同時也是JUC包中代碼量最大的組件(6000多行),自JDK8開始Oracle對其進行了大量優化工做。node

本文從 HashMap 的基礎知識開始,嘗試逐一分析C13Map中各個組件的實現和安全性保證。編程

2、HashMap基礎知識 

分析C13MAP前,須要瞭解如下的HashMap知識或者約定:數組

  • 哈希表的長度永遠都是2的冪次方,緣由是hashcode%tableSize==hashcode&(tableSize-1),也就是哈希槽位的肯定能夠用一次與運算來替代取餘運算。
  • 會對hashcode調用若干次擾動函數,將高16位與低16位作異或運算,由於高16位的隨機性更強。
  • 當表中的元素總數超過tableSize * 0.75時,哈希表會發生擴容操做,每次擴容的tableSize是原先的兩倍。
  • 下文提到的槽位(bucket)、哈希分桶、BIN均表示同一個概念,即哈希table上的某一列。
  • 舊錶在作搬運時i槽位的node能夠根據其哈希值的第tableSize位的bit決定在新表上的槽位是i仍是i+tableSize。
  • 每一個槽位上有可能會出現哈希衝突,在未達到某個閾值時它是一個鏈表結構,達到閾值後會升級到紅黑樹結構。
  • HashMap自己並不是爲多線程環境設計,永遠不要嘗試在併發環境下直接使用HashMap,C13Map不存在這個安全問題。

3、C13Map的字段定義

C13Map的字段定義緩存

//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
 
//默認初始容量
private static final int DEFAULT_CAPACITY = 16;
 
//數組的最大容量,防止拋出OOM
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 
//最大並行度,僅用於兼容JDK1.7之前版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 
//擴容因子
private static final float LOAD_FACTOR = 0.75f;
 
//鏈表轉紅黑樹的閾值
static final int TREEIFY_THRESHOLD = 8;
 
//紅黑樹退化閾值
static final int UNTREEIFY_THRESHOLD = 6;
 
//鏈表轉紅黑樹的最小總量
static final int MIN_TREEIFY_CAPACITY = 64;
 
//擴容搬運時批量搬運的最小槽位數
private static final int MIN_TRANSFER_STRIDE = 16;
 
 
//當前待擴容table的郵戳位,一般是高16位
private static final int RESIZE_STAMP_BITS = 16;
 
//同時搬運的線程數自增的最大值
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 
//搬運線程數的標識位,一般是低16位
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
 
static final int MOVED     = -1; // 說明是forwardingNode
static final int TREEBIN   = -2; // 紅黑樹
static final int RESERVED  = -3; // 原子計算的佔位Node
static final int HASH_BITS = 0x7fffffff; // 保證hashcode擾動計算結果爲正數
 
//當前哈希表
transient volatile Node<K,V>[] table;
 
//下一個哈希表
private transient volatile Node<K,V>[] nextTable;
 
//計數的基準值
private transient volatile long baseCount;
 
//控制變量,不一樣場景有不一樣用途,參考下文
private transient volatile int sizeCtl;
 
//併發搬運過程當中CAS獲取區段的下限值
private transient volatile int transferIndex;
 
//計數cell初始化或者擴容時基於此字段使用自旋鎖
private transient volatile int cellsBusy;
 
//加速多核CPU計數的cell數組
private transient volatile CounterCell[] counterCells;

4、安全操做Node<K,V>數組

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putReferenceRelease(tab, ((long)i << ASHIFT) + ABASE, v);
}

對Node<K,V>[] 上任意一個index的讀取和寫入都使用了Unsafe輔助類,table自己是volatile類型的並不能保證其下的每一個元素的內存語義也是volatile類型;安全

須要藉助於Unsafe來保證Node<K,V>[]元素的「讀/寫/CAS」操做在多核併發環境下的原子或者可見性。微信

5、讀操做get爲何是線程安全的

首先須要明確的是,C13Map的讀操做通常是不加鎖的(TreeBin的讀寫鎖除外),而讀操做與寫操做有可能並行;能夠保證的是,由於C13Map的寫操做都要獲取bin頭部的syncronized互斥鎖,能保證最多隻有一個線程在作更新,這實際上是一個單線程寫、多線程讀的併發安全性的問題。數據結構

C13Map的get方法多線程

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //執行擾動函數
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

一、若是當前哈希表table爲null

哈希表未初始化或者正在初始化未完成,直接返回null;雖然line5和line18之間其它線程可能經歷了千山萬水,至少在判斷tab==null的時間點key確定是不存在的,返回null符合某一時刻的客觀事實。併發

二、若是讀取的bin頭節點爲null

說明該槽位還沒有有節點,直接返回null。app

三、若是讀取的bin是一個鏈表

說明頭節點是個普通Node。

(1)若是正在發生鏈表向紅黑樹的treeify工做,由於treeify自己並不破壞舊的鏈表bin的結構,只是在所有treeify完成後將頭節點一次性替換爲新建立的TreeBin,能夠放心讀取。

(2)若是正在發生resize且當前bin正在被transfer,由於transfer自己並不破壞舊的鏈表bin的結構,只是在所有transfer完成後將頭節點一次性替換爲ForwardingNode,能夠放心讀取。

(3)若是其它線程正在操做鏈表,在當前線程遍歷鏈表的任意一個時間點,都有可能同時在發生add/replace/remove操做。

  • 若是是add操做,由於鏈表的節點新增從JDK8之後都採用了後入式,無非是多遍歷或者少遍歷一個tailNode。
  • 若是是remove操做,存在遍歷到某個Node時,正好有其它線程將其remove,致使其孤立於整個鏈表以外;但由於其next引用未發生變動,整個鏈表並無斷開,仍是能夠照常遍歷鏈表直到tailNode。
  • 若是是replace操做,鏈表的結構未變,只是某個Node的value發生了變化,沒有安全問題。

結論:對於鏈表這種線性數據結構,單線程寫且插入操做保證是後入式的前提下,併發讀取是安全的;不會存在誤讀、鏈表斷開致使的漏讀、讀到環狀鏈表等問題。

四、若是讀取的bin是一個紅黑樹

說明頭節點是個TreeBin節點。

(1)若是正在發生紅黑樹向鏈表的untreeify操做,由於untreeify自己並不破壞舊的紅黑樹結構,只是在所有untreeify完成後將頭節點一次性替換爲新建立的普通Node,能夠放心讀取。

(2)若是正在發生resize且當前bin正在被transfer,由於transfer自己並不破壞舊的紅黑樹結構,只是在所有transfer完成後將頭節點一次性替換爲ForwardingNode,能夠放心讀取。

(3)若是其餘線程在操做紅黑樹,在當前線程遍歷紅黑樹的任意一個時間點,均可能有單個的其它線程發生add/replace/remove/紅黑樹的翻轉等操做,參考下面的紅黑樹的讀寫鎖實現。

TreeBin中的讀寫鎖實現

TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
 
    private final void lockRoot() {
        //若是一次性獲取寫鎖失敗,進入contendedLock循環體,循環獲取寫鎖或者休眠等待
        if (!U.compareAndSetInt(this, LOCKSTATE, 0, WRITER))
            contendedLock(); // offload to separate method
    }
 
    private final void unlockRoot() {
        lockState = 0;
    }
    //對紅黑樹加互斥鎖,也就是寫鎖
    private final void contendedLock() {
        boolean waiting = false;
        for (int s;;) {
            //若是lockState除了第二位外其它位上都爲0,表示紅黑樹當前既沒有上讀鎖,又沒有上寫鎖,僅有可能存在waiter,能夠嘗試直接獲取寫鎖
            if (((s = lockState) & ~WAITER) == 0) {
                if (U.compareAndSetInt(this, LOCKSTATE, s, WRITER)) {
                    if (waiting)
                        waiter = null;
                    return;
                }
            }
            //若是lockState第二位是0,表示當前沒有線程在等待寫鎖
            else if ((s & WAITER) == 0) {
                //將lockState的第二位設置爲1,至關於打上了waiter的標記,表示有線程在等待寫鎖
                if (U.compareAndSetInt(this, LOCKSTATE, s, s | WAITER)) {
                    waiting = true;
                    waiter = Thread.currentThread();
                }
            }
            //休眠當前線程
            else if (waiting)
                LockSupport.park(this);
        }
    }
     
    //查找紅黑樹中的某個節點
    final Node<K,V> find(int h, Object k) {
        if (k != null) {
            for (Node<K,V> e = first; e != null; ) {
                int s; K ek;
                //若是當前有waiter或者有寫鎖,走線性檢索,由於紅黑樹雖然替代了鏈表,但其內部依然保留了鏈表的結構,雖然鏈表的查詢性能通常,但根據先前的分析其讀取的安全性有保證。
                //發現有寫鎖改走線性檢索,是爲了不等待寫鎖釋放花去過久時間; 而發現有waiter改走線性檢索,是爲了不讀鎖疊加的太多,致使寫鎖線程須要等待太長的時間; 本質上都是爲了減小讀寫碰撞
                //線性遍歷的過程當中,每遍歷到下一個節點都作一次判斷,一旦發現鎖競爭的可能性減小就改走tree檢索以提升性能
                if (((s = lockState) & (WAITER|WRITER)) != 0) {
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                    e = e.next;
                }
                //對紅黑樹加共享鎖,也就是讀鎖,CAS一次性增長4,也就是增長的只是3~32位
                else if (U.compareAndSetInt(this, LOCKSTATE, s,
                                             s + READER)) {
                    TreeNode<K,V> r, p;
                    try {
                        p = ((r = root) == null ? null :
                             r.findTreeNode(h, k, null));
                    } finally {
                        Thread w;
                        //釋放讀鎖,若是釋放完畢且有waiter,則將其喚醒
                        if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                            (READER|WAITER) && (w = waiter) != null)
                            LockSupport.unpark(w);
                    }
                    return p;
                }
            }
        }
        return null;
    }
    //更新紅黑樹中的某個節點
    final TreeNode<K,V> putTreeVal(int h, K k, V v) {
        Class<?> kc = null;
        boolean searched = false;
        for (TreeNode<K,V> p = root;;) {
            int dir, ph; K pk;
            //...省略處理紅黑樹數據結構的代碼若干          
                else {
                    //寫操做前加互斥鎖
                    lockRoot();
                    try {
                        root = balanceInsertion(root, x);
                    } finally {
                        //釋放互斥鎖
                        unlockRoot();
                    }
                }
                break;
            }
        }
        assert checkInvariants(root);
        return null;
    }
}

紅黑樹內置了一套讀寫鎖的邏輯,其內部定義了32位的int型變量lockState,第1位是寫鎖標誌位,第2位是寫鎖等待標誌位,從3~32位則是共享鎖標誌位。

讀寫操做是互斥的,容許多個線程同時讀取,但不容許讀寫操做並行,同一時刻只容許一個線程進行寫操做;這樣任意時間點讀取的都是一個合法的紅黑樹,總體上是安全的。

有的同窗會產生疑惑,寫鎖釋放時爲什麼沒有將waiter喚醒的操做呢?是否有可能A線程進入了等待區,B線程獲取了寫鎖,釋放寫鎖時僅作了lockState=0的操做。

那麼A線程是否就沒有機會被喚醒了,只有等待下一個讀鎖釋放時的喚醒了呢 ?

顯然這種狀況違背常理,C13Map不會出現這樣的疏漏,再進一步觀察,紅黑樹的變動操做的外圍,也就是在putValue/replaceNode那一層,都是對BIN的頭節點加了synchornized互斥鎖的,同一時刻只能有一個寫線程進入TreeBin的方法範圍內,當寫線程發現當前waiter不爲空,其實此waiter只能是當前線程本身,能夠放心的獲取寫鎖,不用擔憂沒法被喚醒的問題。

TreeBin在find讀操做檢索時,在linearSearch(線性檢索)和treeSearch(樹檢索)間作了折衷,前者性能差但併發安全,後者性能佳但要作併發控制,可能致使鎖競爭;設計者使用線性檢索來儘可能避免讀寫碰撞致使的鎖競爭,但評估到race condition已消失時,又當即趨向於改用樹檢索來提升性能,在安全和性能之間作到了極佳的平衡。具體的折衷策略請參考find方法及註釋。

因爲有線性檢索這樣一個抄底方案,以及入口處bin頭節點的synchornized機制,保證了進入到TreeBin總體代碼塊的寫線程只有一個;TreeBin中讀寫鎖的總體設計與ReentrantReadWriteLock相比仍是簡單了很多,好比並未定義用於存放待喚醒線程的threadQueue,以及讀線程僅會自旋而不會阻塞等等, 能夠看作是特定條件下ReadWriteLock的簡化版本。

五、若是讀取的bin是一個ForwardingNode

說明當前bin已遷移,調用其find方法到nextTable讀取數據。

forwardingNode的find方法

static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null);
        this.nextTable = tab;
    }
     
    //遞歸檢索哈希錶鏈
    Node<K,V> find(int h, Object k) {
        // loop to avoid arbitrarily deep recursion on forwarding nodes
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        continue outer;
                    }
                    else
                        return e.find(h, k);
                }
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
 
 
}

ForwardingNode中保存了nextTable的引用,會轉向下一個哈希表進行檢索,但並不能保證nextTable就必定是currentTable,由於在高併發插入的狀況下,極短期內就能夠致使哈希表的屢次擴容,內存中極有可能駐留一條哈希錶鏈,彼此以bin的頭節點上的ForwardingNode相連,線程剛讀取時拿到的是table1,遍歷時卻有可能經歷了哈希表的鏈條。

eh<0有三種狀況:

  • 若是是ForwardingNode繼續遍歷下一個哈希表。
  • 若是是TreeBin,調用其find方法進入TreeBin讀寫鎖的保護區讀取數據。
  • 若是是ReserveNode,說明當前有compute計算中,整條bin仍是一個空結構,直接返回null。

六、若是讀取的bin是一個ReserveNode

ReserveNode用於compute/computeIfAbsent原子計算的方法,在BIN的頭節點爲null且計算還沒有完成時,先在bin的頭節點打上一個ReserveNode的佔位標記。

讀操做發現ReserveNode直接返回null,寫操做會由於爭奪ReserveNode的互斥鎖而進入阻塞態,在compute完成後被喚醒後循環重試。

6、寫操做putValue/replaceNode爲何是線程安全的

典型的編程範式以下:

C13Map的putValue方法

Node<K,V>[] tab = table;  //將堆中的table變量賦給線程堆棧中的局部變量
Node f = tabAt(tab, i );
if(f==null){
 //當前槽位沒有頭節點,直接CAS寫入
 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
    break;
}else if(f.hash == MOVED){
 //加入協助搬運行列
 helpTransfer(tab,f);
}
//不是forwardingNode
else if(f.hash != MOVED){
    //先鎖住I槽位上的頭節點
    synchronized (f) {
    //再doubleCheck看此槽位上的頭節點是否仍是f
    if (tabAt(tab, i) == f) {
       ...各類寫操做
    }
  }
}

一、當前槽位若是頭節點爲null時,直接CAS寫入

有人也許會質疑,若是寫入時resize操做已完成,發生了table向nextTable的轉變,是否會存在寫入的是舊錶的bin致使數據丟失的可能 ? 

這種可能性是不存在的,由於一個table在resize完成後全部的BIN都會被打上ForwardingNode的標記,能夠形象的理解爲全部槽位上都插滿了紅旗,而此處在CAS時的compare的變量null,可以保證至少在CAS原子操做發生的時間點table並未發生變動。

二、當前槽位若是頭節點不爲null

這裏採用了一個小技巧:先鎖住I槽位上的頭節點,進入同步代碼塊後,再doubleCheck看此槽位上的頭節點是否有變化。

進入同步塊後還須要doubleCheck的緣由:雖然一開始獲取到的頭節點f並不是ForwardingNode,但在獲取到f的同步鎖以前,可能有其它線程提早獲取了f的同步鎖並完成了transfer工做,並將I槽位上的頭節點標記爲ForwardingNode,此時的f就成了一個過期的bin的頭節點。

然而由於標記操做與transfer做爲一個總體在同步的代碼塊中執行,若是doubleCheck的結果是此槽位上的頭節點仍是f,則代表至少在當前時間點該槽位尚未被transfer到新表(假如當前有transfer in progress的話),能夠放心的對該bin進行put/remove/replace等寫操做。

只要未發生transfer或者treeify操做,鏈表的新增操做都是採起後入式,頭節點一旦肯定不會輕易改變,這種後入式的更新方式保證了鎖定頭節點就等於鎖住了整個bin。

若是不做doubleCheck判斷,則有可能當前槽位已被transfer,寫入的仍是舊錶的BIN,從而致使寫入數據的丟失;也有可能在獲取到f的同步鎖以前,其它線程對該BIN作了treeify操做,並將頭節點替換成了TreeBin, 致使寫入的是舊的鏈表,而非新的紅黑樹;

三、doubleCheck是否有ABA問題

也許有人會質疑,若是有其它線程提早對當前bin進行了的remove/put的操做,引入了新的頭節點,而且剛好發生了JVM的內存釋放和從新分配,致使新的Node的引用地址剛好跟舊的相同,也就是存在所謂的ABA問題。

這個能夠經過反證法來推翻,在帶有GC機制的語言環境下一般不會發生ABA問題,由於當前線程包含了對頭節點f的引用,當前線程並未消亡,不可能存在f節點的內存被GC回收的可能性。

還有人會質疑,若是在寫入過程當中主哈希表發生了變化,是否可能寫入的是舊錶的bin致使數據丟失,這個也能夠經過反證法來推翻,由於table向nextTable的轉化(也就是將resize後的新哈希表正式commit)只有在全部的槽位都已經transfer成功後纔會進行,只要有一個bin未transfer成功,則說明當前的table未發生變化,在當前的時間點能夠放心的向table的bin內寫入數據。

四、如何操做才安全

能夠總結出規律,在對table的槽位成功進行了CAS操做且compare值爲null,或者對槽位的非forwardingNode的頭節點加鎖後,doubleCheck頭節點未發生變化,對bin的寫操做都是安全的。

7、原子計算相關方法

原子計算主要包括:computeIfAbsent、computeIfPresent、compute、merge四個方法。

一、幾個方法的比較 

主要區別以下:

(1)computeIfAbsent只會在判斷到key不存在時纔會插入,判空與插入是一個原子操做,提供的FunctionalInterface是一個二元的Function, 接受key參數,返回value結果;若是計算結果爲null則不作插入。

(2)computeIfPresent只會在判讀單到Key非空時纔會作更新,判斷非空與插入是一個原子操做,提供的FunctionalInterface是一個三元的BiFunction,接受key,value兩個參數,返回新的value結果;若是新的value爲null則刪除key對應節點。

(3)compute則不加key是否存在的限制,提供的FunctionalInterface是一個三元的BiFunction,接受key,value兩個參數,返回新的value結果;若是舊的value不存在則以null替代進行計算;若是新的value爲null則保證key對應節點不會存在。

(4)merge不加key是否存在的限制,提供的FunctionalInterface是一個三元的BiFunction,接受oldValue, newVALUE兩個參數,返回merge後的value;若是舊的value不存在,直接以newVALUE做爲最終結果,存在則返回merge後的結果;若是最終結果爲null,則保證key對應節點不會存在。

二、什麼時候會使用ReserveNode佔位

若是目標bin的頭節點爲null,須要寫入的話有兩種手段:一種是生成好新的節點r後使用casTabAt(tab, i, null, r)原子操做,由於compare的值爲null能夠保證併發的安全;

另一種方式是建立一個佔位的ReserveNode,鎖住該節點並將其CAS設置到bin的頭節點,再進行進一步的原子計算操做;這兩種辦法都有可能在CAS的時候失敗,須要自旋反覆嘗試。

(1)爲何只有computeIfAbsent/compute方法使用佔位符的方式

computeIfPresent只有在BIN結構非空的狀況下才會展開原子計算,天然不存在須要ReserveNode佔位的狀況;鎖住已有的頭節點便可。

computeIfAbsent/compute方法在BIN結構爲空時,須要展開Function或者BiFunction的運算,這個操做是外部引入的須要耗時多久沒法準確評估;這種狀況下若是採用先計算,再casTabAt(tab, i, null, r)的方式,若是有其它線程提早更新了這個BIN,那麼就須要從新鎖定新加入的頭節點,並重復一次原子計算(C13Map沒法幫你緩存上次計算的結果,由於計算的入參有可能會變化),這個開銷是比較大的。

而使用ReserveNode佔位的方式無需等到原子計算出結果,能夠第一時間先搶佔BIN的全部權,使其餘併發的寫線程阻塞。

(2)merge方法爲什麼不須要佔位

緣由是若是BIN結構爲空時,根據merge的處理策略,老的value爲空則直接使用新的value替代,這樣就省去了BiFunction中新老value進行merge的計算,這個消耗幾乎是沒有的;所以可使用casTabAt(tab, i, null, r)的方式直接修改,避免了使用ReserveNode佔位,鎖定該佔位ReserveNode後再進行CAS修改的兩次CAS無謂的開銷。

C13Map的compute方法

public V compute(K key,
                 BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
    if (key == null || remappingFunction == null)
        throw new nullPointerException();
    int h = spread(key.hashCode());
    V val = null;
    int delta = 0;
    int binCount = 0;
    for (Node<K, V>[] tab = table; ; ) {
        Node<K, V> f;
        int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
            //建立佔位Node
            Node<K, V> r = new ReservationNode<K, V>();
           //先鎖定該佔位Node
            synchronized (r) {
                //將其設置到BIN的頭節點
                if (casTabAt(tab, i, null, r)) {
                    binCount = 1;
                    Node<K, V> node = null;
                    try {
                        //開始原子計算
                        if ((val = remappingFunction.apply(key, null)) != null) {
                            delta = 1;
                            node = new Node<K, V>(h, key, val, null);
                        }
                    } finally {
                        //設置計算後的最終節點
                        setTabAt(tab, i, node);
                    }
                }
            }
            if (binCount != 0)
                break;
        } else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                       //此處省略對普通鏈表的變動操做
                    } else if (f instanceof TreeBin) {
                       //此處省略對紅黑樹的變動操做
                    }
                }
            }
            
        }
    }
    if (delta != 0)
        addCount((long) delta, binCount);
    return val;
}

三、如何保證原子性

computeIfAbsent/computeIfPresent中判空與計算是原子操做,根據上述分析主要是經過casTabAt(tab, i, null, r)原子操做,或者使用ReserveNode佔位並鎖定的方式,或者鎖住bin的頭節點的方式來實現的。

也就是說整個bin一直處於鎖定狀態,在獲取到目標KEY的value是否爲空之後,其它線程沒法變動目標KEY的值,判空與計算天然是原子的。

而casTabAt(tab, i, null, r)是由硬件層面的原子指令來保證的,可以保證同一個內存區域在compare和set操做之間不會有任何其它指令對其進行變動。

8、resize過程當中的併發transfer

C13Map中總共有三處地方會觸發transfer方法的調用,分別是addCount、tryPresize、helpTransfer三個函數。

  • addCount用於寫操做完成後檢驗元素數量,若是超過了sizeCtl中的閾值,則觸發resize擴容和舊錶向新表的transfer。
  • tryPresize是putAll一次性插入一個集合前的自檢,若是集合數目較大,則預先觸發一次resize擴容和舊錶向新表的transfer。
  • helpTransfer是寫操做過程當中發現bin的頭節點是ForwardingNode, 則調用helpTransfer加入協助搬運的行列。

一、開始transfer前的檢查工做 

以addCount中的檢查邏輯爲例:

addCount中的transfer檢查

Node<K, V>[] tab, nt;
int n, sc;
//當前的tableSize已經超過sizeCtl閾值,且小於最大值
while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&
        (n = tab.length) < MAXIMUM_CAPACITY) {
    int rs = resizeStamp(n);
    //已經在搬運中
    if (sc < 0) {
        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                transferIndex <= 0)
            break;
        //搬運線程數加一
        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
            transfer(tab, nt);
    } else if (U.compareAndSwapInt(this, SIZECTL, sc,
            (rs << RESIZE_STAMP_SHIFT) + 2))
        //還沒有搬運,當前線程是本次resize工做的第一個線程,設置初始值爲2,很是巧妙的設計
        transfer(tab, null);
    s = sumCount();
}

多處應用了對變量sizeCtl的CAS操做,sizeCtl是一個全局控制變量。

參考下此變量的定義:private transient volatile int sizeCtl;

  • 初始值是0表示哈希表還沒有初始化
  • 若是是-1表示正在初始化,只容許一個線程進入初始化代碼塊
  • 初始化或者reSize成功後,sizeCtl=loadFactor * tableSize也就是觸發再次擴容的閾值,是一個正整數
  • 在擴容過程當中,sizeCtrl是一個負整數,其高16位是與當前的tableSize關聯的郵戳resizeStamp,其低16位是當前從事搬運工做的線程數加1

在方法的循環體中每次都將table、sizeCtrl、nextTable賦給局部變量以保證讀到的是當前的最新值,且保證邏輯計算過程當中變量的穩定。

若是sizeCtrl中高16位的郵戳與當前tableSize不匹配,或者搬運線程數達到了最大值,或者全部搬運的線程都已經退出(只有在遍歷完全部槽位後纔會退出,不然會一直循環),或者nextTable已經被清空,跳過搬運操做。

若是知足搬運條件,則對sizeCtrl作CAS操做,sizeCtrl>=0時設置初始線程數爲2,sizeCtrl<0時將其值加1,CAS成功後開始搬運操做,失敗則進入下一次循環從新判斷。

首個線程設置初始值爲2的緣由是:線程退出時會經過CAS操做將參與搬運的總線程數-1,若是初始值按照常規作法設置成1,那麼減1後就會變爲0。

此時其它線程發現線程數爲0時,沒法區分是沒有任何線程作過搬運,仍是有線程作完搬運但都退出了,也就沒法判斷要不要加入搬運的行列。

值得注意的是,代碼中的「sc == rs + 1 || sc == rs + MAX_RESIZERS「是JDK8中的明顯的BUG,少了rs無符號左移16位的操做;JDK12已經修復了此問題。

二、併發搬運過程和退出機制  

C13Map的transfer方法

private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
    int n = tab.length, stride;
    //一次搬運多少個槽位
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE;
    if (nextTab == null) {           
        try {
            //首個搬運線程,負責初始化nextTable
            Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {     
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        //初始化當前搬運索引
        transferIndex = n;
    }
    int nextn = nextTab.length;
    //公共的forwardingNode
    ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // 保證提交nextTable以前已遍歷舊錶的全部槽位
    for (int i = 0, bound = 0; ; ) {
        Node<K, V> f;
        int fh;
        //循環CAS獲取下一個搬運區段
        while (advance) {
            int nextIndex, nextBound;
            //搬運已結束,或者當前區段還沒有完成,退出循環體;最後一次抄底掃描時,僅輔助作i減一的運算
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            } else if (U.compareAndSwapInt
                    (this, TRANSFERINDEX, nextIndex,
                            nextBound = (nextIndex > stride ?
                                    nextIndex - stride : 0))) {
                 
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //並不是最後一個退出的線程
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;               
                finishing = advance = true;
                //異常巧妙的設計,最後一個線程推出前將i回退到最高位,等因而強制作最後一次的全表掃描;程序直接執行後續的else if代碼,看有沒有哪一個槽位漏掉了,或者說是否所有是forwardingNode標記;
                //能夠視爲抄底邏輯,雖然檢測到漏掉槽位的機率基本是0
                i = n;
            }
        } else if ((f = tabAt(tab, i)) == null)
            //空槽位直接打上forwardingNode標記,CAS失敗下一次循環繼續搬運該槽位,成功則進入下一個槽位
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; //最後一次抄底遍歷時,正常狀況下全部的槽位應該都被打上forwardingNode標記
        else {
            //鎖定頭節點
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K, V> ln, hn;
                    if (fh >= 0) {
                        //......此處省略鏈表搬運代碼:職責是將鏈表拆成兩份,搬運到nextTable的i和i+n槽位
                        setTabAt(nextTab, i, ln); 
                        setTabAt(nextTab, i + n, hn);
                        //設置舊錶對應槽位的頭節點爲forwardingNode
                        setTabAt(tab, i, fwd);
                        advance = true;
                    } else if (f instanceof TreeBin) {
                        //......此處省略紅黑樹搬運代碼:職責是將紅黑樹拆成兩份,搬運到nextTable的i和i+n槽位,若是知足紅黑樹的退化條件,順便將其退化爲鏈表
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        //設置舊錶對應槽位的頭節點爲forwardingNode
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

多個線程併發搬運時,若是是首個搬運線程,負責nextTable的初始化工做;而後藉助於全局的transferIndex變量從當前table的n-1槽位開始依次向低位掃描搬運,經過對transferIndex的CAS操做一次獲取一個區段(默認是16),當transferIndex達到最低位時,再也不可以獲取到新的區段,線程開始退出,退出時會在sizeCtl上將總的線程數減一,最後一個退出的線程將掃描座標i回退到最高位,強迫作一次抄底的全局掃描。

三、transfer過程當中的讀寫安全性分析

(1)首先是transfer過程當中是否有可能全局的哈希表table發生屢次resize,或者說存在過時的風險?

觀察nextTable提交到table的代碼,發現只有在全部線程均搬運完畢退出後纔會commit,因此但凡是有一個線程在transfer代碼塊中,table都不可能被替換;因此不存在table過時的風險。

(2)有併發的寫操做時,是否存在安全風險?

由於transfer操做與寫操做都要競爭bin的頭節點的syncronized鎖,二者是互斥串行的;當寫線程獲得鎖後,還要作doubleCheck,發現不是一開始的頭節點時什麼事情都不會作,發現是forwardingNode,就會加入搬運行列直到新表被提交,而後去直接操做新表。

nextTable的提交老是在全部的槽位都已經搬運完畢,插上ForwardingNode的標識以後的,所以只要新表已提交,舊錶一定沒法寫入;這樣就可以有效的避免數據寫入舊錶。

推理:獲取到bin頭節點的同步鎖開始寫操做----------> transfer必然未完成--------->新表必然未提交-------→寫入的必然是當前表。

也就說永遠不可能存在新舊兩張表同時被寫入的狀況,table被寫入時nextTable永遠都只能被讀取。

(3)有併發的讀操做時,是否存在安全風險?

transfer操做並不破壞舊的bin結構,若是還沒有開始搬運,將會照常遍歷舊的BIN結構;若是已搬運完畢,會調用到forwadingNode的find方法到新表中遞歸查詢,參考上文中的forwadingNode介紹。

9、Traverser遍歷器

由於iterator或containsValue等通用API的存在,以及某些業務場景確實須要遍歷整個Map,設計一種安全且有性能保證的遍歷機制顯得理所固然。

C13Map遍歷器實現的難點在於讀操做與transfer可能並行,在掃描各個bin時若是遇到forwadingNode該如何處理的問題。

因爲併發transfer機制的存在,在某個槽位上遇到了forwadingNode,僅代表當前槽位已被搬運,並不能表明其後的槽位必定被搬運或者還沒有被搬運;也就是說其後的若干槽位是一個不可控的狀態。

解決辦法是引入了相似於方法調用堆棧的機制,在跳轉到nextTable時記錄下當前table和已經抵達的槽位並進行入棧操做,而後開始遍歷下一個table的i和i+n槽位,若是遇到forwadingNode再一次入棧,周而復始循環往復;

每次若是i+n槽位若是到了右半段快要溢出的話就會遵循原來的入棧規則進行出棧,也就是回到上一個上下文節點,最終會回到初始的table也就是initialTable中的節點。

C13Map的Traverser組件

static class Traverser<K,V> {
    Node<K,V>[] tab;        // current table; updated if resized
    Node<K,V> next;         // the next entry to use
    TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes
    int index;              // index of bin to use next
    int baseIndex;          // current index of initial table
    int baseLimit;          // index bound for initial table
    final int baseSize;     // initial table size
 
    Traverser(Node<K,V>[] tab, int size, int index, int limit) {
        this.tab = tab;
        this.baseSize = size;
        this.baseIndex = this.index = index;
        this.baseLimit = limit;
        this.next = null;
    }
 
    /**
     * 返回下一個節點
     */
    final Node<K,V> advance() {
        Node<K,V> e;
        if ((e = next) != null)
            e = e.next;
        for (;;) {
            Node<K,V>[] t; int i, n;  // 局部變量保證穩定性
            if (e != null)
                return next = e;
            if (baseIndex >= baseLimit || (t = tab) == null ||
                (n = t.length) <= (i = index) || i < 0)
                return next = null;
            if ((e = tabAt(t, i)) != null && e.hash < 0) {
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    e = null;
                    pushState(t, i, n);
                    continue;
                }
                else if (e instanceof TreeBin)
                    e = ((TreeBin<K,V>)e).first;
                else
                    e = null;
            }
            //當前若是有跳轉堆棧直接回放
            if (stack != null)
                recoverState(n);
            //沒有跳轉堆棧說明已經到initalTable
            else if ((index = i + baseSize) >= n)
                index = ++baseIndex; // visit upper slots if present
        }
    }
 
    /**
     * 遇到ForwardingNode時保存當前上下文
     */
    private void pushState(Node<K,V>[] t, int i, int n) {
        TableStack<K,V> s = spare;  // reuse if possible
        if (s != null)
            spare = s.next;
        else
            s = new TableStack<K,V>();
        s.tab = t;
        s.length = n;
        s.index = i;
        s.next = stack;
        stack = s;
    }
 
    /**
     * 彈出上下文
     *
     */
    private void recoverState(int n) {
        TableStack<K,V> s; int len;
        //若是當前有堆棧,且index已經到達右半段後溢出當前table,說明該回去了
        //若是index還在左半段,則只輔助作index+=s.length操做
        while ((s = stack) != null && (index += (len = s.length)) >= n) {
            n = len;
            index = s.index;
            tab = s.tab;
            s.tab = null;
            TableStack<K,V> next = s.next;
            s.next = spare; // save for reuse
            stack = next;
            spare = s;
        }
        //已經到initialTable,索引自增
        if (s == null && (index += baseSize) >= n)
            index = ++baseIndex;
    }
}

假設在整個遍歷過程當中初始表initalTable=table1,遍歷到結束時最大的表爲table5,也就是在遍歷過程當中經歷了四次擴容,屬於一邊遍歷一邊擴容的最複雜場景;

那麼整個遍歷過程就是一個以初始化表initalTable爲基準表,如下一張表的i和i+n槽位爲forwadingNode的跳轉目標,相似於粒子裂變通常的從最低表向最高表放射的過程;

traverser並不能保證必定遍歷某張表的全部的槽位,但若是假設低階表的某個槽位在最高階表老是有相應的投影,好比table1的一個節點在table5中就會對應16個投影;

traverser可以保證一次遍歷的全部槽位在最高階表上的投影,能夠佈滿整張最高階表,而不會有任何遺漏。

10、併發計數

與HashMap中直接定義了size字段相似,獲取元素的totalCount在C13MAP中確定不會去遍歷完整的數據結構;那樣元素較多時性能會很是差,C13MAP設計了CounterCell[]數組來解決併發計數的問題。

CounterCell[]機制並不理會新舊table的更迭,無論是操做的新表仍是舊錶,對於計數而言沒有本質的差別,CounterCell[]只關注總量的增長或減小。

一、從LongAdder到CounterCell內存對齊

C13MAP借鑑了JUC中LongAdder和Striped64的計數機制,有大量代碼與LongAdder和Striped64是重複的,其核心思想是多核環境下對於64位long型數據的計數操做,雖然藉助於volatile和CAS操做可以保證併發的安全性,可是由於多核操做的是同一內存區域,而每一個CPU又有本身的本地cache,例如LV1 Cache,LVL2 Cache,寄存器等。

因爲內存一致性協議MESI的存在,會致使本地Cache的頻繁刷新影響性能,一個比較好的解決思路是每一個CPU只操做固定的一塊內存對齊區域,最終採用求和的方式來計數。

這種方式能提升性能,可是並不是全部場景都適用,由於其最終的value是求和估算出來的,CounterCell累加求和的過程並不是原子,不能表明某個時刻的精準value,因此像compareAndSet這樣的原子操做就沒法支持。

二、CounterCell[] 、cellBusy、baseCount的做用 

CounterCell[]中存放2的指數冪個CounterCell,併發操做期間有可能會擴容,每次擴容都是原有size的兩倍,一旦超過了CPU的核數即再也不擴容,由於CPU的總數一般也是2的指數冪,因此其size每每等於CPU的核數CounterCell[]初始化、擴容、填充元素時,藉助cellBusy其進行spinLock控制baseCount是基礎數據。

在併發量不那麼大,CAS沒有出現失敗時直接基於baseCount變量作計數;一旦出現CAS失敗,說明有併發衝突,就開始考慮CounterCell[]的初始化或者擴容操做,但在初始化未完成時,仍是會將其視爲抄底方案進行計數。

因此最終的技術總和=baseCount+全部CounterCell中的value。

C13Map的addCount方法

private final void addCount(long x, int check) {
    CounterCell[] cs; long b, s;
   //初始時老是直接對baseCount計數,直到出現第一次失敗,或者已經有現成的CounterCell[]數組可用
    if ((cs = counterCells) != null ||
        !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell c; long v; int m;
        //是否存在競態,爲true時表示無競態
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            //先生成隨機數再對CounterCell[]數組size求餘,也就是隨機分配到其中某個槽位
            (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
            //該槽位還沒有初始化或者CAS操做又出現競態
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    //檢測元素總數是否超過sizeCtl閾值
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
            if (sc < 0) {
                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                    (nt = nextTable) == null || transferIndex <= 0)
                    break;
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

其中ThreadLocalRandom是線程上下文內的隨機數生成器,能夠不受其它線程的影響,提升隨機數生成的性能;老是在CAS失敗之後,也就是明確感知到存在多線程的競爭的前提下,纔會對CounterCell[]進行初始化或者擴容操做。

C13Map的fullAddCount方法

//完整的計數,與LongAdder的代碼基本雷同
private final void fullAddCount(long x, boolean wasUncontended) {
    int h;
    if ((h = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();      // force initialization
        h = ThreadLocalRandom.getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // 是否有新的衝突
    for (;;) {
        CounterCell[] cs; CounterCell c; int n; long v;
        if ((cs = counterCells) != null && (n = cs.length) > 0) {       
            if ((c = cs[(n - 1) & h]) == null) {
                //隨機匹配的槽位還沒有有CounterCell元素則初始化之
                if (cellsBusy == 0) {            // Try to attach new Cell
                    CounterCell r = new CounterCell(x); // Optimistic create
                    if (cellsBusy == 0 &&
                        U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                        boolean created = false;
                        try {               // Recheck under lock
                            CounterCell[] rs; int m, j;
                            if ((rs = counterCells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)      
                wasUncontended = true;      //fullAddCount前已經存在cas失敗但並不當即擴容,從新生成一個隨機數進行CAS重試
            else if (U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))
                break;
            else if (counterCells != cs || n >= NCPU)
                collide = false;            // 超過CPU的最大核數,或者檢測到counterCells已擴容,都將衝突狀態置爲無
            else if (!collide)
                collide = true;             // 以上的若干條件都不知足,能夠斷定一定有衝突,再生成一個隨機數試探一下
            else if (cellsBusy == 0 &&
                     U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
                try {
                    if (counterCells == cs)   //對counterCells進行doubleCheck
                        counterCells = Arrays.copyOf(cs, n << 1);   //擴容,容量翻倍
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // 對性的counterCell[]進行重試CAS操做
            }
            h = ThreadLocalRandom.advanceProbe(h);   //以舊的隨機數爲基數生成一個新的隨機數
        }
        else if (cellsBusy == 0 && counterCells == cs &&
                 U.compareAndSetInt(this, CELLSBUSY, 0, 1)) {
            //第一次初始化工做,初始的數組大小爲2
            boolean init = false;
            try {                           // Initialize table
                if (counterCells == cs) {
                    CounterCell[] rs = new CounterCell[2];
                    rs[h & 1] = new CounterCell(x);
                    counterCells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        //初始化過程當中其它線程的抄底方案
        else if (U.compareAndSetLong(this, BASECOUNT, v = baseCount, v + x))
            break;                        
    }
}

循環生成新的隨機數匹配到新的槽位進行CAS的計數操做,出現CAS失敗後並不急於擴容;而是老是在連續出現CAS失敗的狀況纔會嘗試擴容。

CounterCell[]的總體方案相對獨立,與C13Map的關係並不大,能夠視爲一種成熟的高性能技術方案在各個場景使用。

11、與stream相似的bulk操做支持

一、bulkTask類的子類

全部的批量任務執行類均爲bulkTask的子類, bulkTask內置了與traverser相似的實現,用以支持對C13Map的遍歷;同時它也是ForkJoinTask的子類,支持以fork/join的方式來完成各類批量任務的執行。

由於ForkJoinTask並不是本文的重點,這裏僅列出幾種有表明性的批量方法,以及相應的的task實現。

二、幾種有表明性的批量方法

C13Map的批量任務

//將全部的entry按照transformer函數進行二元計算,再對全部生成的結果執行action一元函數
public <U> void forEach(long parallelismThreshold,
                        BiFunction<? super K, ? super V, ? extends U> transformer,
                        Consumer<? super U> action);
 
 
//對全部的entry執行searchFunction二元計算,一旦發現任意一個計算結果不爲null,即全盤返回
public <U> U search(long parallelismThreshold,
                    BiFunction<? super K, ? super V, ? extends U> searchFunction);
 
//對全部的entry執行transformer二元計算,再對全部的結果執行reducer收斂函數
public <U> U reduce(long parallelismThreshold,
                    BiFunction<? super K, ? super V, ? extends U> transformer,
                    BiFunction<? super U, ? super U, ? extends U> reducer)
 
 
//對全部的entry中的value執行transformer二元計算,再對全部的結果執行reducer收斂函數
public <U> U reduceValues(long parallelismThreshold,
                          Function<? super V, ? extends U> transformer,
                          BiFunction<? super U, ? super U, ? extends U> reducer)

以上全部的批量方法都有惟一與其對應的批量task執行類,背後均是基於fork/join思想實現。

三、批量task的實現

以2中列出的reduce方法所對應的MapReduceMappingsTask爲例,有關fork/join中的實現細節不屬於本文的範疇,不作詳細討論。

C13Map的MapReduceMappingsTask

static final class MapReduceMappingsTask<K,V,U> extends BulkTask<K,V,U> {
    final BiFunction<? super K, ? super V, ? extends U> transformer;
    final BiFunction<? super U, ? super U, ? extends U> reducer;
    U result;
    MapReduceMappingsTask<K,V,U> rights, nextRight;
    MapReduceMappingsTask
        (BulkTask<K,V,?> p, int b, int i, int f, Node<K,V>[] t,
         MapReduceMappingsTask<K,V,U> nextRight,
         BiFunction<? super K, ? super V, ? extends U> transformer,
         BiFunction<? super U, ? super U, ? extends U> reducer) {
        super(p, b, i, f, t); this.nextRight = nextRight;
        this.transformer = transformer;
        this.reducer = reducer;
    }
    public final U getRawResult() { return result; }
    public final void compute() {
        final BiFunction<? super K, ? super V, ? extends U> transformer;
        final BiFunction<? super U, ? super U, ? extends U> reducer;
        if ((transformer = this.transformer) != null &&
            (reducer = this.reducer) != null) {
            for (int i = baseIndex, f, h; batch > 0 &&
                     (h = ((f = baseLimit) + i) >>> 1) > i;) {
                addToPendingCount(1);
                //裂變出新的fork-join任務
                (rights = new MapReduceMappingsTask<K,V,U>
                 (this, batch >>>= 1, baseLimit = h, f, tab,
                  rights, transformer, reducer)).fork();
            }
            U r = null;
            //遍歷本batch元素
            for (Node<K,V> p; (p = advance()) != null; ) {
                U u;
                //對本batch作reduce收斂操做
                if ((u = transformer.apply(p.key, p.val)) != null)
                    r = (r == null) ? u : reducer.apply(r, u);
            }
            //對本身和本身fork出的子任務作reducer收斂操做
            result = r;
            CountedCompleter<?> c;
            for (c = firstComplete(); c != null; c = c.nextComplete()) {
                @SuppressWarnings("unchecked")
                MapReduceMappingsTask<K,V,U>
                    t = (MapReduceMappingsTask<K,V,U>)c,
                    s = t.rights;
                while (s != null) {
                    U tr, sr;
                    if ((sr = s.result) != null)
                        t.result = (((tr = t.result) == null) ? sr :
                                    reducer.apply(tr, sr));
                    s = t.rights = s.nextRight;
                }
            }
        }
    }
}

12、小結

自JDK8開始C13Map摒棄了JDK7中的Segment段實現方案,將鎖的粒度細化到了每一個bin上,鎖的粒度更小併發能力更強。用syncronized關鍵字代替原先的ReentrantLock互斥鎖,因JDK8中對syncronized作了大量優化,能夠達到比ReentrantLock更優的性能。

引入併發transfer的機制支持多線程搬運,寫操做和transfer操做在不一樣bin上可並行。引入ForwardingNode支持讀操做和transfer並行,並進一步支持transfer過程有可能存在的哈希錶鏈的遍歷。引入ReserveNode在compute原子計算可能耗時較長的狀況下搶先佔位,避免重複計算。

引入紅黑樹來優化哈希衝突時的檢索性能,其內部實現了輕量級的讀寫鎖保證讀寫安全,在線性檢索和tree檢索之間作了智能切換,達到了性能與安全的極佳的平衡。引入CounterCell機制優化多核場景的計數,解決內存僞共享問題。

引入 ForkJoinTask的子類優化bulk計算時的性能。整個C13Map的實現過程大量使用volatile保證可見,使用CAS保證原子,是一種局部無鎖的lockFree dataStructure的典範實現。

與HashMap的單線程讀寫操做不一樣的是,HashMap讀到的數據在下一次寫操做間是一直穩定的,在多個寫操做之間是一個穩定的snapshot,而C13Map由於併發線程的存在,數據瞬息萬變,讀到的永遠只是某個時間點的正確數據,寫入成功也只是在某個時間點保證寫入是安全的,所以C13Map一般只談安全而不談實時,這極大提升了編程的難度,也是單線程和併發數據結構之間的明顯差別。

相關文章
相關標籤/搜索