ConcurrentHashMap 1.8 源碼分析

         前兩已經分析過幾篇關於CHM的源碼,本篇分析下1.8中的實現,已經棄用以前 segment 雙桶的機制。可是本質仍是將鎖細化達到性能的提高,可是不是以前版本中定義的segment 上鎖,而是用了synchronized關鍵字鎖住 table (1.8中只維護了和hashmap中相似的數據結構。一個數據,數組內的結構是鏈表或者紅黑樹)中本次所操做對象的鏈表頭。一樣大量使用了UnSafe的 本地方法。如CAS,putOr而且最大的改進在於實現了併發的擴容。同時內部數據結構與Java8中HashMap同樣,增長了紅黑樹。當鏈表中的長度大於必定值後,轉化爲紅黑樹(紅黑樹結構的同時也仍然保持了鏈表的結構,下面會詳細介紹)。html

其實對於1.8的源碼分析 我所引用 的兩篇分章已經分析的十分詳細了,本篇本章就不對每一個方法或參數在詳細說明了,只對其中我認爲難以理解的對方加以歸納,重點對如何實現併發擴容進行了分析。若有不對的地方,歡迎你們一塊兒來交流。node

 

       先簡單分析下put操做。咱們能夠猜測,在以前的實現中,首先定位segment,上鎖。然後操做在對segment中的map進一步處理。如今的實現中並無用segment,而是延用 hashMap中單桶,定位到鏈表後,直接 上鎖,然後對鏈表就行操做,一樣是將鎖細化。簡化了不少的計算操做。 從put的源碼能夠看出整體與hashmap的put操做相差不大,除了加鎖,另外就是增長了判斷當前結點是不是ForwardingNode。表示當前正在擴容,且該結點已經擴容完畢。然後經過helpTransfer 判斷是否參與擴容的過程。數組

 

紅黑樹

與1.8中的HashMap同樣,當鏈表長度超過必定長度時,會轉換成紅黑樹。可是 二者之間有一點細微的區別。以下代碼,表示HashMap的樹節點的數據結構。安全

HashMap
/**
 * Entry for Tree bins. Extends LinkedHashMap.Entry (which in turn
 * extends Node) so can be used as extension of either regular or
 * linked node.
 */   也是繼承於HashMap中的Node,能夠將該節點作爲數組中的鏈表節點。
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;   。。。}

HashMap中TreeNode的實現了紅黑樹功能的方法。數據結構

ConcurrentHashMap中的TreeNode相對來講很簡單,只定義了基本的數據結構和必定查找的方法。以下併發

/**
 * Nodes for use in TreeBins
 */
ConcurrentHashMap  樹的成員與上面相似,可是
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
  }

並無實現紅黑樹應實現的方法。可是二者有一個共通點,就是與單純的紅黑樹的數據結構多了一個變量,就是prev,增長這個變量的意義在於刪除紅黑樹的node時,須要找到被刪node的上一個結點,由於若是隻是單向鏈表與紅黑樹的結構。刪除紅黑樹的結點時,單向鏈表就不能維護了,由於找不到被冊結點的上一個結點。所以這裏增長了一個prev的結構,造成雙向鏈表。less

而這樣作是其增長了一個TreeBin來包裝TreeNode,而這個容器不直接保存用戶的key,value信息。hash值爲定值-2,在遍歷時可經過hash值判斷是當前Node是哪一種結構。-1表示正在ForwardingNode,-2爲TreeBin,大於0爲鏈表。ide

ConcurrentHashMap  
/**
 * TreeNodes used at the heads of bins. TreeBins do not hold user
 * keys or values, but instead point to list of TreeNodes and
 * their root. They also maintain a parasitic read-write lock
 * forcing writers (who hold bin lock) to wait for readers (who do
 * not) to complete before tree restructuring operations.
 */
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;     //紅黑樹的根結點。
    volatile TreeNode<K,V> first;   // 指向鏈表的頭部,雖然爲紅黑樹,但保持了鏈表的結構。在unTree化時簡化。
    volatile Thread waiter;
    volatile int lockState; 。。。}

 

 

接下來介紹下由鏈表是怎麼轉化爲紅黑樹的。首先經過以下的方法將原來的Node轉換成TreeNode,從且從單身鏈表轉成雙向鏈表。此時尚未發生紅黑樹的操做。源碼分析

/* ---------------- Conversion from/to TreeBins -------------- */

/**
 * Replaces all linked nodes in bin at given index unless table is
 * too small, in which case resizes instead.
 */
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
setTabAt(tab, index, new TreeBin<K,V>(hd));

而後new封裝轉換的TreeNode,在構造 方法裏轉換成紅黑樹。hd爲原鏈表的頭部。如今作爲紅黑樹的first。在構造方法裏執行的過程就和hashMap中轉紅黑樹的過程相似 ,根據key的 systemCode的大小,決定在樹中的位置,造成一個顆二叉樹。最後經過r = balanceInsertion(r, x);  由二叉樹轉爲紅黑樹。r爲root指針。性能

而在HashMap中就沒有first指針的概念,雖然其內部一樣仍是個雙向鏈表(經過prev實現)。它是在轉換成紅黑樹以後 ,經過將紅黑樹的root結點作爲first node。

//   HashMap 中treeify後執行的方法,將root作爲鏈表的first node
/**
 * Ensures that the given root is the first node of its bin.
 */
static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) {
    int n;
    if (root != null && tab != null && (n = tab.length) > 0) {
        int index = (n - 1) & root.hash;
        TreeNode<K,V> first = (TreeNode<K,V>)tab[index];
        if (root != first) {
            Node<K,V> rn;
            tab[index] = root;
            TreeNode<K,V> rp = root.prev;
            if ((rn = root.next) != null)
                ((TreeNode<K,V>)rn).prev = rp;
            if (rp != null)
                rp.next = rn;
            if (first != null)
                first.prev = root;
            root.next = first;
            root.prev = null;
        }
        assert checkInvariants(root);
    }
}

UNSAFE   

以下,這三個操做是實現併發訪問的關鍵。經過UNSAFE的本地方法,1.7中就已經引入過,而在1.8中繼續扮演着重要的角色。而且大量使用了CAS的操做。

關於  UNSAFE方法  在以前1.7就已經分析過
@SuppressWarnings("unchecked")  //定位tab中index爲i的 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//CAS設置tab中index爲i的結點爲node
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//Volatile  寫 tab中的 Node。在加鎖的狀況下調用 
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

併發擴容

擴容這一操做相比於以前的實現,1.8中最大的不一樣在於實現了併發擴容。其核心 是經過了一個特殊的Node。其定義以下 若是遍歷時發現node的 hash值是 -1,表示當前正在擴容。且當前table中的該node已經容完成。遍歷下一個node進行擴容。

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    Node<K,V> find(int h, Object k) {
       。。。。
    }
}

而其併發到底又是怎麼實現的呢 ? 接下來看,詳細代碼就不貼了,這裏分析關鍵地方,

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range   
。。。。。。。
}

stride 很是關鍵,大體的意思 是每一個處理器處理的node個數不小於16。而這一步的實如今下面的方法 。

i=0開始,第一步CAS  設置  transferIndex,該值初始爲原表的大小,假設n爲64,第一個線程將其CAS設爲64-16等於48.則第一個線程就先執行64到48以前的node的擴容操做,若是在此期間第二個線程要執行到transfer方法,則transferIndex爲 32.其執行48到32之間。可是若是沒有其它線程進來執行,則它們就接着往下,爭取下一個stride數量的擴容操做。可是當最後transferIndex的值小於等於0時。表示此時已經不須要參與擴容了。如此經過CAS設置transferIndex的值,解決併發的衝突。同時每一個node擴容時,要上鎖。以防其它操做改變該鏈表的結構。這樣就能夠有多個線程併發擴容,且不會產生衝突。

Node<K,V> f; int fh;
while (advance) {
    int nextIndex, nextBound;
    if (--i >= bound || finishing)
        advance = false;
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

以下是在transfer過程當中,原node是鏈表的狀況下擴容過程。

if (tabAt(tab, i) == f) {
    Node<K,V> ln, hn;
    if (fh >= 0) {
        int runBit = fh & n;
        Node<K,V> lastRun = f;
        for (Node<K,V> p = f.next; p != null; p = p.next) {
            int b = p.hash & n;
            if (b != runBit) {
                runBit = b;
                lastRun = p;
            }
        }
        if (runBit == 0) {
            ln = lastRun;
            hn = null;
        }
        else {
            hn = lastRun;
            ln = null;
        }
        for (Node<K,V> p = f; p != lastRun; p = p.next) {
            int ph = p.hash; K pk = p.key; V pv = p.val;
            if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
            else
                hn = new Node<K,V>(ph, pk, pv, hn);
        }
        setTabAt(nextTab, i, ln);
        setTabAt(nextTab, i + n, hn);
        setTabAt(tab, i, fwd);
        advance = true;
    }

下圖表示遍歷到第i個結點,上鎖後,判斷是鏈表後,首先是找到lastRun,其表示尾部  hash&n  相同的排在最前面的node。1表示擴容後在高位hn,0表示在低位ln(也就是原來的位置)。而之因此 用hash&n 的 表達式來決定原node在擴容後的位置,而沒有經過hash計算新數組的大小 來決定 位置 。這個實現十分的巧妙,直接將hash值與原表的大小& 一下。 這樣的作的緣由是由於, put操做時決定結點所在位置時經過 hash& n-1。而n咱們這裏定義的是2的x次方。n-1顯然就是 11..11的結點,而n就是 1..00。舉個例子n爲16,則n-1爲 1111。而n爲10000。對應擴容後的大小爲100000,原結點在新數組的位置就是hash& 11111。如今咱們發現 原以前的差異就在於 原數組的&操做。也就是 11111中第一個1。因此擴容時,不須要從新計算。而只須要將原來的hash值與n進行&操做(這樣的操做簡化了計算的複雜度)。就能夠肯定在新table的位置。0表示原索引,而1表示 i+n的位置。且不須要考慮併發的問題,nextTable的  i和 i+n的位置 只會由原table的i中的node給從新佔據,而咱們一開始就對table的 node  i已經上鎖了。因此是安全的。

上面分析了鏈表的反序處理。紅黑樹相似,可是須要判讀是否unTree。

總結:進一步將鎖細化,不在是設置的併發級別,隨着擴容以後 ,鎖粒度進一步細化,而且提供了併發擴容,且大量使用了UNSAFE的本地方法,性能也進一步提高。
 

本文沒有對CHM的全部操做進行分析 ,如get,size,remove等,在下面引用的兩篇文章已經很詳細的分析,上文只是對部門關鍵點代表了個人一點點的見解。若有不正之處但願你們指出來。本文後面接着在完善。

 

 

 

 

http://www.importnew.com/22007.html

http://blog.csdn.net/u010723709/article/details/48007881

相關文章
相關標籤/搜索