Kill_Java -- ConcurrentHashMap源碼分析

KillCode系列 -- Java篇

原文發佈在個人我的博客中killCodehtml

由於JDK1.8 與 1.7 裏對ConcurrentHashMap 有不少不一樣的更改以提升性能。因此特別找出相似的方面,進行分析。java

1. 內部參數

//初始容積爲 16 
private static final int DEFAULT_CAPACITY = 16;
//加載因子 0.75
private static final float LOAD_FACTOR = 0.75f;

/** 
* 盛裝Node元素的數組 它的大小是2的整數次冪 
* Size is always a power of two. Accessed directly by iterators. 
*/  
transient volatile Node<K,V>[] table;  

/*
 *   hash表初始化或擴容時的一個控制位標識量。 
 *   負數表明正在進行初始化或擴容操做 
 *   -1表明正在初始化 
 *   -N 表示有N-1個線程正在進行擴容操做 
 *   正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小 
 *   
 *   **既表明 HashMap 的 threshold**
 *   又表明 **進行擴容時的進程數**
*/
private transient volatile int sizeCtl;

// 如下兩個是用來控制擴容的時候 單線程進入的變量  
// resize校驗碼
private static int RESIZE_STAMP_BITS = 16;  
// resize校驗碼的位移量。
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;  
    
    
  /* 
   * Encodings for Node hash fields. See above for explanation. 
   */  
  static final int MOVED     = -1; // hash值是-1,表示這是一個forwardNode節點  
  static final int TREEBIN   = -2; // hash值是-2  表示這時一個TreeBin節點  
  static final int RESERVED  = -3; // hash for transient reservations
  //在 spread() 方法中 用來對 hashcode 進行 高位hash 減小可能發生的碰撞。
  static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

上面的 sizectl 很重要。是解決 concurrenthashmap 擴容的基礎node

2. 內部類

2.1. Node

HashMap 最大的區別是 加入了對val 與 next 用了volatile關鍵字修飾
而且 setValue() 方法 直接拋出異常,能夠看出,val 是不能直接改變的。
是經過 Unsafe 類的 方法進行所有替換算法

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    //相比於 HashMap ,加入了 volatile 關鍵字
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

2.2 TreeNode

HashMap 不一樣的是編程

  1. 此次 TreeNode 再也不是繼承自 LinkedHashMap.Entry 而是繼承自本類中的 Node.數組

  2. 並不直接用於紅黑樹的結點,而是將 結點包裝成 TreeNode 後,用下面的 TreeBin 進行二次包裝。安全

  3. 優勢是可使用 Node 類的 next 指針,方便TreeBin 後續 從 鏈表紅黑樹 的轉換。
    構造函數能夠看出,原先對TreeNode 的初始化只是設置了其的後續結點。組成了鏈表。多線程

static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;

    TreeNode(int hash, K key, V val, Node<K,V> next,
                TreeNode<K,V> parent) {
        super(hash, key, val, next);
        this.parent = parent;
    }

2.3. TreeBin

特色: 1. 不持有key與val ,指向TreeNode 的 root 與 list。併發

2. 加入讀寫鎖。方便併發的訪問。
static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        //經過鎖的狀態 , 判斷鎖的類型。
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock

構造方法以下
root 表明 TreeNode 的根結點
使用first ,是用於第一次初始化時,由於root的特殊性,因此不便於 this.root = b 所以經過 first代替第一次的初始化過程。
而後在 過程當中 用r 表明root ,直到結束 紅黑樹的初始化後,再 root =r 保證root的安全性。app

TreeBin(TreeNode<K,V> b) {  
        super(TREEBIN, null, null, null);  
        this.first = b;  
        TreeNode<K,V> r = null;  
        for (TreeNode<K,V> x = b, next; x != null; x = next) {  
            next = (TreeNode<K,V>)x.next;  
            x.left = x.right = null;  
            if (r == null) {  
                x.parent = null;  
                x.red = false;  
                r = x;  
            }  
            else {  
                K k = x.key;  
                int h = x.hash;  
                Class<?> kc = null;  
                for (TreeNode<K,V> p = r;;) {  
                    int dir, ph;  
                    K pk = p.key;  
                    if ((ph = p.hash) > h)  
                        dir = -1;  
                    else if (ph < h)  
                        dir = 1;  
                    else if ((kc == null &&  
                                (kc = comparableClassFor(k)) == null) ||  
                                (dir = compareComparables(kc, k, pk)) == 0)  
                        dir = tieBreakOrder(k, pk);  
                        TreeNode<K,V> xp = p;  
                    if ((p = (dir <= 0) ? p.left : p.right) == null) {  
                        x.parent = xp;  
                        if (dir <= 0)  
                            xp.left = x;  
                        else  
                            xp.right = x;  
                        r = balanceInsertion(r, x);  
                        break;  
                    }  
                }  
            }  
        }  
        this.root = r;  
        assert checkInvariants(root);  
    }

2.4. ForwardingNode

做用是在 transfer() 過程當中,插入到 TreeBin 之間,用做連接做用。

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

3. Unsafe 類 與 經常使用的操做

3.1. Unsafe 與 靜態代碼塊

Unsafe提供了硬件級別的原子操做。內部的方法均爲 native方法 ,能夠訪問系統底層。
這裏用了 CAS 算法(compare and swap) 大大的避免了使用時對性能的消耗,以及保證了使用時的安全性。

**注:** CAS 算法的核心是 將須要改變的參數,與內存中已經存在的變量的值進行對比,一致就改變,不一致就放棄此次操做。與之相相似的優化操做還有 LL/SC(Load-Linked/Store-Conditional : 加載連接/條件存儲) 、 Test-and-Set(測試並設置)

這裏額外介紹一下 Unsafe 類的 compareAndSwapInt 方法。

/**
* 比較obj的offset處內存位置中的值和指望的值,若是相同則更新。此更新是不可中斷的。
* 
* @param obj 須要更新的對象
* @param offset obj中整型field的偏移量
* @param expect 但願field中存在的值
* @param update 若是指望值expect與field的當前值相同,設置filed的值爲這個新值
* @return 若是field的值被更改返回true
*/
public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);

下面是 ConcurrentHashMap 中有關的應用

// Unsafe mechanics
    private static final sun.misc.Unsafe U;
    //對應於 類中的 sizectl
    private static final long SIZECTL;
    //在 transfer() 方法的使用時,計算索引
    private static final long TRANSFERINDEX;
    // 用於對 ConcurrentHashMap 的 size 統計。
    // 下文 第8點關於 size 會說明。
    private static final long BASECOUNT;
    // 輔助類 countercell 類中的屬性,用於分佈式計算
    // 是實現  java8 中 londAddr 的基礎
    private static final long CELLSBUSY;
    private static final long CELLVALUE;
    // 用來肯定在數組中的位置
    // 數組中的偏移地址
    private static final long ABASE;
    // 數組中的增量地址
    private static final int ASHIFT;

    static {
        try {
            //經過反射調用 類中的值,從而對 這些變量賦值
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

3.2 經常使用方法

在操做過程當中,常常會看到如下幾個,或者相相似的方法。
其核心是

//得到 i 位置上的 Node 節點
 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {  
       return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
   }  
    //利用CAS算法設置i位置上的Node節點。
   static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,  
                                       Node<K,V> c, Node<K,V> v) {  
       return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  
   }  
    //利用volatile方法設置節點位置的值  
   static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {  
       U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);  
   }

4. 初始化函數 initTable

調用ConcurrentHashMap的構造方法僅僅是設置了一些參數而已,而整個table的初始化是在向ConcurrentHashMap中插入元素的時候發生的。
當向 map 插入數據的時候 table == null , 則會調用 initTable()方法 。
put 方法 簡單展現一下。

final V putVal(K key, V value, boolean onlyIfAbsent) {
        ...
        ...
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
        ...
        ...
    }

initTable() 方法展現以下
其中有 sizectl 變量,這裏回顧一下

hash表初始化或擴容時的一個控制位標識量。 
負數表明正在進行初始化或擴容操做 
-1表明正在初始化 
-N 表示有N-1個線程正在進行擴容操做 
正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小
/** 
     * Initializes table, using the size recorded in sizeCtl. 
     */  
    private final Node<K,V>[] initTable() {  
        Node<K,V>[] tab; int sc;  
        while ((tab = table) == null || tab.length == 0) {  

                //sizeCtl <0 表示有其餘線程正在進行初始化操做,把線程掛起。對於table的初始化工做,只能有一個線程在進行。  
            if ((sc = sizeCtl) < 0)  
                Thread.yield(); // lost initialization race; just spin  

                //利用CAS方法把sizectl的值置爲-1 表示本線程正在進行初始化  
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {  
                    if ((tab = table) == null || tab.length == 0) {  
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;  
                        @SuppressWarnings("unchecked")  
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];  
                        table = tab = nt;  
                        //至關於0.75*n 設置一個擴容的閾值  
                        // sc = n - n/4
                        sc = n - (n >>> 2);
                    }  
                } finally {  
                    // 更新 sizectl
                    sizeCtl = sc;  
                }  
                break;  
            }  
        }  
        return tab;  
    }

5. transfer() 擴容操做

當ConcurrentHashMap容量不足的時候,須要對table進行擴容。這個方法的基本思想跟HashMap是很像的,可是因爲它是支持併發擴容的,因此要複雜的多。緣由是它支持多線程進行擴容操做,而並無加鎖。我想這樣作的目的不只僅是爲了知足concurrent的要求,而是但願利用併發處理去減小擴容帶來的時間影響。由於在擴容的時候,老是會涉及到從一個「數組」到另外一個「數組」拷貝的操做,若是這個操做可以併發進行,那真真是極好的了。

整個擴容操做分爲兩個部分:

1. 第一部分是構建一個nextTable,它的容量是原來的兩倍,這個操做是單線程完成的。這個單線程的保證是經過RESIZE_STAMP_SHIFT這個常量通過一次運算來保證的,這個地方在後面會有提到;
2. 第二個部分就是將原來table中的元素複製到nextTable中,這裏容許多線程進行操做。

先來看一下單線程是如何完成的:
它的大致思想就是遍歷、複製的過程。首先根據運算獲得須要遍歷的次數i,而後利用tabAt方法得到i位置的元素:

1. 若是這個位置爲空,就在原table中的i位置放入forwardNode節點,這個也是觸發併發擴容的關鍵點;
2. 若是這個位置是Node節點(fh>=0),就構造兩個鏈表,一個表明高位爲 0 , 一個表明高位爲 1 。將原來的結點 分別放在nextTable的i和i+n的位置上,而且除了lastRun的位置相對位於鏈表的底部外,其他元素均爲 **反序** 。
3. 若是這個位置是TreeBin節點(fh<0),也作一個處理,而且判斷是否須要untreefi,把處理的結果分別放在nextTable的i和i+n的位置上

遍歷過全部的節點之後就完成了複製工做,這時讓nextTable做爲新的table,而且更新sizeCtl爲新容量的0.75倍 ,完成擴容。

再看一下多線程是如何完成的:

//若是遍歷到ForwardingNode節點  說明這個點已經被處理過了,直接跳過  這裏是控制併發擴容的核心
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed

這是一個判斷,若是遍歷到的節點是forward節點,就向後繼續遍歷,再加上給節點上鎖的機制,就完成了多線程的控制。多線程遍歷節點,處理了一個節點,就把對應點的值set爲forward,另外一個線程看到forward,就向後遍歷。這樣交叉就完成了複製工做。並且還很好的解決了線程安全的問題。

如圖:
clipboard.png
clipboard.png

下面是源碼:

/**
     * 一個過渡的table表  只有在擴容的時候纔會使用
     */
    private transient volatile Node<K, V>[] nextTable;

    /**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     */
    private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {
        int n = tab.length, stride;
        // 經過計算 NCPU CPU的核心數與 表的大小的比值,將表進行範圍的細分,以方便 併發。
        // 感受上 有點像 segment 分段鎖的意思。
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
                //構造一個nextTable對象 它的容量是原來的兩倍。
                @SuppressWarnings("unchecked")
                Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //原來的 容量限制爲 1<<30
                //HashMap 在擴容時,會用 resize() 方法,擴大 threshold 的值
                //當大於 MAXIMUM_CAPACITY 時,會將 threshold 設置爲 Integer.MAX_VALUE
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);//構造一個連節點指針 用於標誌位
        boolean advance = true;//併發擴容的關鍵屬性 若是等於true 說明這個節點已經處理過
        boolean finishing = false; // to ensure sweep before committing nextTab

        for (int i = 0, bound = 0; ; ) {
            Node<K, V> f;
            int fh;
            //這個while循環體的做用就是在控制i遞減  經過i能夠依次遍歷原hash表中的節點
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                } else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    //若是全部的節點都已經完成複製工做  就把nextTable賦值給table 清空臨時對象nextTable
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);//擴容閾值設置爲原來容量的1.5倍  依然至關於如今容量的0.75倍
                    return;
                }
                //利用CAS方法更新這個擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //若是遍歷到的節點爲空 則放入ForwardingNode指針
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //若是遍歷到ForwardingNode節點  說明這個點已經被處理過了,直接跳過  這裏是控制併發擴容的核心
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                //節點上鎖
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K, V> ln, hn;
                        //若是fh>=0 證實這是一個Node節點
                        if (fh >= 0) {
                            // runBit 表明正在 運行的 Node 節點的 分類
                            // 所以鏈表根據高位爲0或者1分爲兩個子鏈表,高位爲0的節點桶位置沒有發生變化,高位爲1的節點桶位置增長了n,
                            // 因此有setTabAt(nextTab, i, ln);和 setTabAt(nextTab, i + n, hn);
                            // n = 2的冪 。 二進制 0001000
                            // fh & n =  1. 1000
                            //           2. 0000  因此劃分出兩個鏈表。
                            int runBit = fh & n;
                            // lastRun 是正在運行的節點
                            Node<K, V> lastRun = f;
                            //如下的部分在完成的工做是構造兩個鏈表  一個是高位爲 0 的鏈表  另外一個是高位爲 1 的鏈表 
                            // 找出最後一個 與後面的結點不一樣的 結點
                            for (Node<K, V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            // 將最後一個 結點保存起來
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            } else {
                                hn = lastRun;
                                ln = null;
                            }

                            for (Node<K, V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash;
                                K pk = p.key;
                                V pv = p.val;
                                //這個鏈表是從低層向上構建
                                // ln 或 hn = lastRun, 構建一個 node 結點
                                // 其下一個結點爲 lastRun 。
                                if ((ph & n) == 0) // 構建低位鏈表
                                    ln = new Node<K, V>(ph, pk, pv, ln);
                                else     // 構建高位鏈表
                                    hn = new Node<K, V>(ph, pk, pv, hn);
                            }

                            //在nextTable的i位置上插入一個鏈表
                            setTabAt(nextTab, i, ln);
                            //在nextTable的i+n的位置上插入另外一個鏈表
                            setTabAt(nextTab, i + n, hn);
                            //在table的i位置上插入forwardNode節點  表示已經處理過該節點
                            setTabAt(tab, i, fwd);
                            //設置advance爲true 返回到上面的while循環中 就能夠執行 --i 操做
                            advance = true;
                        }
                        //對TreeBin對象進行處理  與上面的過程相似
                        else if (f instanceof TreeBin) {
                            TreeBin<K, V> t = (TreeBin<K, V>) f;
                            TreeNode<K, V> lo = null, loTail = null;
                            TreeNode<K, V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            //構造高位和低位兩個鏈表
                            for (Node<K, V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K, V> p = new TreeNode<K, V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                } else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //若是擴容後已經再也不須要tree的結構 反向轉換爲鏈表結構
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K, V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K, V>(hi) : t;
                            //在nextTable的i位置上插入一個鏈表
                            setTabAt(nextTab, i, ln);
                            //在nextTable的i+n的位置上插入另外一個鏈表
                            setTabAt(nextTab, i + n, hn);
                            //在table的i位置上插入forwardNode節點  表示已經處理過該節點
                            setTabAt(tab, i, fwd);
                            //設置advance爲true 返回到上面的while循環中 就能夠執行 --i 操做
                            advance = true;
                        }
                    }
                }
            }
        }
    }

6. put 方法

put方法依然沿用HashMap的put方法的思想,根據hash值計算這個新插入的點在table中的位置i。

注:1. hash = spread(key.hashCode())
    2. spread(int h) --> return (h ^ (h >>> 16)) & HASH_BITS;  --> 經過hashCode()的高16位異或低16位優化高位運算的算法
    3. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {     
            if (casTabAt(tab, i, null,      
                         new Node<K,V>(hash, key, value, null)))    
                break;                   // no lock when adding to empty bin    
        }

若是i位置是空的,直接放進去,不然進行判斷,
若是i位置是樹節點,按照樹的方式插入新的節點,不然把i插入到鏈表的末尾
不一樣點:ConcurrentHashMap不容許key或value爲null值。

多線程的狀況下:

  1. 若是一個或多個線程正在對ConcurrentHashMap進行擴容操做,當前線程也要進入擴容的操做中。這個擴容的操做之因此能被檢測到,是由於transfer方法中在空結點上插入forward節點,若是檢測到須要插入的位置被forward節點佔有,就幫助進行擴容; --> helpTransfer() 方法。

  2. 若是檢測到要插入的節點是非空且不是forward節點,就對這個節點加鎖,這樣就保證了線程安全。儘管這個有一些影響效率,可是仍是會比hashTable的synchronized要好得多。

    1. 首先判斷這個節點的類型。若是是鏈表節點(fh>0),則獲得的結點就是hash值相同的節點組成的鏈表的頭節點。須要依次向後遍歷肯定這個新加入的值所在位置。若是遇到hash值與key值都與新加入節點是一致的狀況,則只須要更新value值便可。不然依次向後遍歷,直到鏈表尾插入這個結點。

    2. 若是加入這個節點之後鏈表長度大於8,就把這個鏈表轉換成紅黑樹。

    3. 若是這個節點的類型已是樹節點的話,直接調用樹節點的插入方法進行插入新的值。

源碼以下:

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    //不容許 key或value爲null  
    if (key == null || value == null) throw new NullPointerException();
    //計算hash值  
    int hash = spread(key.hashCode());
    //計算該鏈表 節點的數量
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 第一次 put 操做的時候初始化,若是table爲空的話,初始化table  
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();

        //根據hash值計算出在table裏面的位置   
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 根據對應的key hash 到具體的索引,若是該索引對應的 Node 爲 null,則採用 CAS 操做更新整個 table
            // 若是這個位置沒有值 ,直接放進去,不須要加鎖  
            if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //當遇到錶鏈接點時,須要進行整合表的操做  
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 結點上鎖,只是對鏈表頭結點做鎖操做
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //fh > 0 說明這個節點是一個鏈表的節點 不是樹的節點  
                    if (fh >= 0) {
                        binCount = 1;
                        //在這裏遍歷鏈表全部的結點  
                        //而且計算鏈表裏結點的數量
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //若是hash值和key值相同  則修改對應結點的value值  
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //若是遍歷到了最後一個結點,那麼就證實新的節點須要插入 就把它插入在鏈表尾部  
                            if ((e = e.next) == null) {
                                // 插入到鏈表尾
                                pred.next = new Node<K,V>(hash, key,
                                                            value, null);
                                break;
                            }
                        }
                    }
                    //若是這個節點是樹節點,就按照樹的方式插入值  
                    else if (f instanceof TreeBin) {
                        // 若是是紅黑樹結點,按照紅黑樹的插入
                        Node<K,V> p;
                        // 若是爲樹節點, binCount一直爲2,不會引起擴容。
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 若是這個鏈表結點達到了臨界值8,那麼把這個鏈表轉換成紅黑樹
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //將當前ConcurrentHashMap的元素數量+1,table的擴容是在這裏發生的
    addCount(1L, binCount);
    return null;
}

6.1 helpTransfer() 方法

出現於 put 方法 以下地點

//當遇到錶鏈接點時,須要進行整合表的操做  
    else if ((fh = f.hash) == MOVED)  
        tab = helpTransfer(tab, f);

helpTransfer() 方法的源碼以下

final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {  
       Node<K,V>[] nextTab; int sc;  
       // 當前 table 不爲 null , 且 f 爲 forwardingNode 結點 , 且存在下一張表
       if (tab != null && (f instanceof ForwardingNode) &&  
           (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {  
           int rs = resizeStamp(tab.length);//計算一個擴容校驗碼  
            // 當 sizeCtl < 0 時,表示有線程在 transfer().
           while (nextTab == nextTable && table == tab &&  
                  (sc = sizeCtl) < 0) {  
                //正常狀況下 sc >>> RESIZE_STAMP_SHIFT  == resizeStamp(tab.length);
               if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||  
                   sc == rs + MAX_RESIZERS || transferIndex <= 0)  
                   break;  
                //將 擴容的線程先行減一,表示,這是來輔助 transfer,而非進行 transfer的線程。
               if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {  
                   transfer(tab, nextTab);  
                   break;  
               }  
           }  
           return nextTab;  
       }  
       return table;  
   }

6.2 treeifyBin() 方法

涉及變量 MIN_TREEIFY_CAPACITY = 64;
若是數組長度n小於閾值MIN_TREEIFY_CAPACITY,默認是64,則會調用tryPresize方法把數組長度擴大到原來的兩倍,並觸發transfer方法,從新調整節點的位置。
出現於 put 方法 以下地點

if (binCount != 0) {
        // TREEIFY_THRESHOLD 默認爲 8.
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i);
        if (oldVal != null)
            return oldVal;
        break;
    }

其中源碼以下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            // 將原來的數組擴大爲原來的兩倍
            tryPresize(n << 1);

        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

6.3 tableSizeFor 方法

這裏講一個 JDK8 中設計的很是巧妙的算法。看了很久纔看懂。
出自 tryPresize 方法中的如下位置

//數組的最大容積爲 1<<30 。若是數組大小超過 1<<29 ,則將最大大小設置爲 MAXIMUM_CAPACITY
    //不然,設置爲原來的兩倍。
    private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);

下面讓咱們來分析一下,tableSizeFor()
這個算法的目的,是得出相比較於給定參數,返回一個恰好比參數大的 2次冪 整數。

static final int tableSizeFor(int cap) {
    int n = cap - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

先來分析有關n位操做部分:先來假設n的二進制爲01xxx...xxx。接着
對n右移1位:001xx...xxx,再位或:011xx...xxx
對n右移2爲:00011...xxx,再位或:01111...xxx
此時前面已經有四個1了,再右移4位且位或可得8個1
同理,有8個1,右移8位確定會讓後八位也爲1。
綜上可得,該算法讓最高位的1後面的位全變爲1。

最後再讓結果n+1,即獲得了2的整數次冪的值了。

如今回來看看第一條語句:
int n = cap - 1;
  讓cap-1再賦值給n的目的是另找到的目標值大於或等於原值。例如二進制1000,十進制數值爲8。若是不對它減1而直接操做,將獲得答案10000,即16。顯然不是結果。減1後二進制爲111,再進行操做則會獲得原來的數值1000,即8。

引用自(http://www.cnblogs.com/loadin...

7. get 方法

經過 key值 搜索 value 值。
而且要 經過分辨 結點的種類,進行不一樣形式的尋找。

public V get(Object key) {  
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
        //計算hash值  
        int h = spread(key.hashCode());  
        //根據hash值肯定節點位置  
        if ((tab = table) != null && (n = tab.length) > 0 &&  
            (e = tabAt(tab, (n - 1) & h)) != null) {  
            //若是搜索到的節點key與傳入的key相同且不爲null,直接返回這個節點    
            if ((eh = e.hash) == h) {  
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                    return e.val;  
            }  
            //若是eh<0 說明這個節點在樹上 直接尋找  
            else if (eh < 0)  
                return (p = e.find(h, key)) != null ? p.val : null;  
             //不然遍歷鏈表 找到對應的值並返回  
            while ((e = e.next) != null) {  
                if (e.hash == h &&  
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))  
                    return e.val;  
            }  
        }  
        return null;  
    }

8. Size相關

《併發編程實戰》中有提到,size返回的結果在計算時可能已通過期了,它實際上只是一個估計值,所以容許size返回一個近似值,而不是一個精確值。

8.1 CounterCell 類

從註釋中能夠看出,這是從 LongAdder 類中的思想,拷貝過來的一個類。
LongAdder 類 是 JDK 1.8 新引進的類,其思想:

多個線程持有本身的加數(cell),線程個數增長時,會自動提供新的加數。
當全部工做作完後,再提供新的加數。

有時間寫一篇相關的源碼分析~ 逃~

不過,這裏同樣不能精確統計,這裏的 CounterCell 等同於 LongAdder.Cell sumCount() 等同於 LongAdder.sum()方法。
執行邏輯是同樣的。
就 LongAdder 類中的 sum 方法所說, 當有線程在運行時,同樣只是估計值,只有當全部線程執行完畢,纔是實際值。
而統計 Size ,不可以像垃圾清除同樣,有 Safe point 或 Safe region ,因此,這個假設不成立。。。

其相關的源碼以下。

/**
     * A padded cell for distributing counts.  Adapted from LongAdder
     * and Striped64.  See their internal docs for explanation.
     */
    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

    //執行邏輯
    final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

8.2 mappingCount 方法

就官方文檔中所說, mappingCount 方法,應該取代 size 方法,
但這個方法得出的值同樣在線程運行的時候,只是一個估計的值。
從源碼中就能夠看出,使用的是上文分析的 sumCount() 方法。

public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }

8.3 addCount 方法

出自於 put 方法的以下位置

//將當前ConcurrentHashMap的元素數量+1  
        addCount(1L, binCount);  
        return null;  
    }

統計上:
這裏用到 CounterCell類,而且統計的值的計算同樣是採用的 sumCount() 方法。
因此缺點如上,再也不闡述。
擴容上:
邏輯與 helpTransfer() 相似,都是判斷是否有多個線程在執行擴容,而後判斷是否須要輔助 transfer();
源碼以下

private final void addCount(long x, int check) {  
        //用到了 CounterCell 類
        CounterCell[] as; long b, s;  
        //利用CAS方法更新baseCount的值   
        if ((as = counterCells) != null ||  
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {  
            CounterCell a; long v; int m;  
            boolean uncontended = true;  
            if (as == null || (m = as.length - 1) < 0 ||  
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||  
                !(uncontended =  
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {  
                fullAddCount(x, uncontended);  
                return;  
            }  
            if (check <= 1)  
                return;  
            s = sumCount();  
        }  
        //若是check值大於等於0 則須要檢驗是否須要進行擴容操做  
        //下面的邏輯與 helpTransfer() 相似,能夠與 helpTransfer() 一塊兒參考。
        if (check >= 0) {  
            Node<K,V>[] tab, nt; int n, sc;  
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&  
                   (n = tab.length) < MAXIMUM_CAPACITY) {  
                int rs = resizeStamp(n);  
                //若是已經有其餘線程在執行擴容操做  
                if (sc < 0) {  
                    //校驗失效,直接退出。
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||  
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||  
                        transferIndex <= 0)  
                        break;  
                    
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))  
                        transfer(tab, nt);  
                }  
                //當前線程是惟一的或是第一個發起擴容的線程  此時nextTable=null  
                else if (U.compareAndSwapInt(this, SIZECTL, sc,  
                                             (rs << RESIZE_STAMP_SHIFT) + 2))  
                    transfer(tab, null);  
                s = sumCount();  
            }  
        }  
    }
相關文章
相關標籤/搜索