ConcurrentHashMap源碼分析(1.8)

0、說明 java

1、ConcurrentHashMap跟HashMap,HashTable的對比node

二、ConcurrentHashMap原理概覽數組

三、ConcurrentHashMap幾個重要概念安全

四、ConcurrentHashMap幾個重要方法多線程

五、ConcurrentHashMap的初始化併發

六、ConcurrentHashMap的put操做詳解less

七、ConcurrentHashMap的擴容詳解ide

八、ConcurrentHashMap的get操做詳解源碼分析

九、ConcurrentHashMap的同步機制性能

十、鏈表轉爲紅黑樹的過程

 

0、說明

 ※爲了分析源碼的時候方便調試,把ConcurrentHashMap的源碼放在本地了,名字改成了ConcurrentHashMapDebug

因爲源碼中的unsafe有不少限制,不能直接在本地使用,因此,在源碼的最後面的靜態代碼塊處修改了U的初始化方法。

private static final sun.misc.Unsafe U;
static{
    U = getUnsafe();
    ....          
}
static sun.misc.Unsafe getUnsafe() throws Exception {
        java.lang.reflect.Field field = Unsafe.class.getDeclaredField("theUnsafe");
        field.setAccessible(true);
        Unsafe unsafe=(Unsafe) field.get(null);
        return unsafe;
       }

 

 

一、ConcurrentHashMap跟HashMap,HashTable的對比

  咱們都知道HashMap不是線程安全的,因此在處理併發的時候會出現問題。

  而HashTable雖然是線程安全的,可是是經過整個來加鎖的方式,當一個線程在寫操做的時候,另外的線程則不能進行讀寫。

  而ConcurrentHashMap則能夠支持併發的讀寫。跟1.7版本相比,1.8版本又有了很大的變化,已經拋棄了Segment的概念,雖然源碼裏面還保留了,也只是爲了兼容性的考慮。

 

二、ConcurrentHashMap原理概覽

   在ConcurrentHashMap中經過一個Node<K,V>[]數組來保存添加到map中的鍵值對,而在同一個數組位置是經過鏈表和紅黑樹的形式來保存的。可是這個數組只有在第一次添加元素的時候纔會初始化,不然只是初始化一個ConcurrentHashMap對象的話,只是設定了一個sizeCtl變量,這個變量用來判斷對象的一些狀態和是否須要擴容,後面會詳細解釋。

  第一次添加元素的時候,默認初期長度爲16,當往map中繼續添加元素的時候,經過hash值跟數組長度取與來決定放在數組的哪一個位置,若是出現放在同一個位置的時候,優先以鏈表的形式存放,在同一個位置的個數又達到了8個以上,若是數組的長度還小於64的時候,則會擴容數組。若是數組的長度大於等於64了的話,在會將該節點的鏈表轉換成樹。

  經過擴容數組的方式來把這些節點給分散開。而後將這些元素複製到擴容後的新的數組中,同一個鏈表中的元素經過hash值的數組長度位來區分,是仍是放在原來的位置仍是放到擴容的長度的相同位置去 。在擴容完成以後,若是某個節點的是樹,同時如今該節點的個數又小於等於6個了,則會將該樹轉爲鏈表。

  取元素的時候,相對來講比較簡單,經過計算hash來肯定該元素在數組的哪一個位置,而後在經過遍歷鏈表或樹來判斷key和key的hash,取出value值。

  往ConcurrentHashMap中添加元素的時候,裏面的數據以數組的形式存放的樣子大概是這樣的:

 

 

  這個時候由於數組的長度才爲16,則不會轉化爲樹,而是會進行擴容。

  擴容後數組大概是這樣的:

  

  須要注意的是,擴容以後的長度不是32,擴容後的長度在後面細說。

  若是數組擴張後長度達到64了,且繼續在某個節點的後面添加元素達到8個以上的時候,則會出現轉化爲紅黑樹的狀況。

  轉化以後大概是這樣:

 

三、ConcurrentHashMap幾個重要概念 

下面是幾個重要的屬性

private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED     = -1; // 表示正在轉移
static final int TREEBIN   = -2; // 表示已經轉換成樹
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
transient volatile Node<K,V>[] table;//默認沒初始化的數組,用來保存元素
private transient volatile Node<K,V>[] nextTable;//轉移的時候用的數組
/**
     * 用來控制表初始化和擴容的,默認值爲0,當在初始化的時候指定了大小,這會將這個大小保存在sizeCtl中,大小爲數組的0.75
     * 當爲負的時候,說明表正在初始化或擴張,
     *     -1表示初始化
     *     -(1+n) n:表示活動的擴張線程
     */
    private transient volatile int sizeCtl;

幾個重要的類

Node<K,V>,這是構成每一個元素的基本類。

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;    //key的hash值
        final K key;       //key
        volatile V val;    //value
        volatile Node<K,V> next; //表示鏈表中的下一個節點

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    }

 TreeNode,構造樹的節點

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
}

TreeBin 用做樹的頭結點,只存儲root和first節點,不存儲節點的key、value值。

static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
}

ForwardingNode在轉移的時候放在頭部的節點,是一個空節點

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
}

 

四、ConcurrentHashMap幾個重要方法

  在ConcurrentHashMap中使用了unSafe方法,經過直接操做內存的方式來保證併發處理的安全性,使用的是硬件的安全機制。

/*
     * 用來返回節點數組的指定位置的節點的原子操做
     */
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    /*
     * cas原子操做,在指定位置設定值
     */
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    /*
     * 原子操做,在指定位置設定值
     */
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

 

5、ConcurrentHashMap的初始化

   首先咱們看看構造方法

//空的構造
public ConcurrentHashMapDebug() {
    }
//若是在實例化對象的時候指定了容量,則初始化sizeCtl
public ConcurrentHashMapDebug(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
//當出入一個Map的時候,先設定sizeCtl爲默認容量,在添加元素
public ConcurrentHashMapDebug(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

  能夠看到,在任何一個構造方法中,都沒有對存儲Map元素Node的table變量進行初始化。而是在第一次put操做的時候在進行初始化。

  下面來看看數組的初始化方法initTable

/**
     * 初始化數組table,
     * 若是sizeCtl小於0,說明別的數組正在進行初始化,則讓出執行權
     * 若是sizeCtl大於0的話,則初始化一個大小爲sizeCtl的數組
     * 不然的話初始化一個默認大小(16)的數組
     * 而後設置sizeCtl的值爲數組長度的3/4
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //第一次put的時候,table還沒被初始化,進入while if ((sc = sizeCtl) < 0) //sizeCtl初始值爲0,當小於0的時候表示在別的線程在初始化表或擴展表 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //SIZECTL:表示當前對象的內存偏移量,sc表示指望值,-1表示要替換的值,設定爲-1表示要初始化表了 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //指定了大小的時候就建立指定大小的Node數組,不然建立指定大小(16)的Node數組 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; //初始化後,sizeCtl長度爲數組長度的3/4  } break; } } return tab; }

 

 六、ConcurrentHashMap的put操做詳解

  下面看看put方法的源碼

/*
     *    單純的額調用putVal方法,而且putVal的第三個參數設置爲false
     *  當設置爲false的時候表示這個value必定會設置
     *  true的時候,只有當這個key的value爲空的時候纔會設置
     */
    public V put(K key, V value) {
        return putVal(key, value, false);
    }

  再來看putVal

/*
     * 當添加一對鍵值對的時候,首先會去判斷保存這些鍵值對的數組是否是初始化了,
     * 若是沒有的話就初始化數組
     *  而後經過計算hash值來肯定放在數組的哪一個位置
     * 若是這個位置爲空則直接添加,若是不爲空的話,則取出這個節點來
     * 若是取出來的節點的hash值是MOVED(-1)的話,則表示當前正在對這個數組進行擴容,複製到新的數組,則當前線程也去幫助複製
     * 最後一種狀況就是,若是這個節點,不爲空,也不在擴容,則經過synchronized來加鎖,進行添加操做
     *    而後判斷當前取出的節點位置存放的是鏈表仍是樹
     *    若是是鏈表的話,則遍歷整個鏈表,直到取出來的節點的key來個要放的key進行比較,若是key相等,而且key的hash值也相等的話,
     *          則說明是同一個key,則覆蓋掉value,不然的話則添加到鏈表的末尾
     *    若是是樹的話,則調用putTreeVal方法把這個元素添加到樹中去
     *  最後在添加完成以後,會判斷在該節點處共有多少個節點(注意是添加前的個數),若是達到8個以上了的話,
     *  則調用treeifyBin方法來嘗試將處的鏈表轉爲樹,或者擴容數組
     */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();//K,V都不能爲空,不然的話跑出異常
        int hash = spread(key.hashCode());    //取得key的hash值
        int binCount = 0;    //用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移爲樹
        for (Node<K,V>[] tab = table;;) {    //
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)    
                tab = initTable();    //第一次put的時候table沒有初始化,則初始化table
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {    //經過哈希計算出一個表中的位置由於n是數組的長度,因此(n-1)&hash確定不會出現數組越界
                if (casTabAt(tab, i, null,        //若是這個位置沒有元素的話,則經過cas的方式嘗試添加,注意這個時候是沒有加鎖的
                             new Node<K,V>(hash, key, value, null)))        //建立一個Node添加到數組中區,null表示的是下一個節點爲空
                    break;                   // no lock when adding to empty bin
            }
            /*
             * 若是檢測到某個節點的hash值是MOVED,則表示正在進行數組擴張的數據複製階段,
             * 則當前線程也會參與去複製,經過容許多線程複製的功能,一次來減小數組的複製所帶來的性能損失
             */
            else if ((fh = f.hash) == MOVED)    
                tab = helpTransfer(tab, f);
            else {
                /*
                 * 若是在這個位置有元素的話,就採用synchronized的方式加鎖,
                 *     若是是鏈表的話(hash大於0),就對這個鏈表的全部元素進行遍歷,
                 *         若是找到了key和key的hash值都同樣的節點,則把它的值替換到
                 *         若是沒找到的話,則添加在鏈表的最後面
                 *  不然,是樹的話,則調用putTreeVal方法添加到樹中去
                 *  
                 *  在添加完以後,會對該節點上關聯的的數目進行判斷,
                 *  若是在8個以上的話,則會調用treeifyBin方法,來嘗試轉化爲樹,或者是擴容
                 */
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {        //再次取出要存儲的位置的元素,跟前面取出來的比較
                        if (fh >= 0) {                //取出來的元素的hash值大於0,當轉換爲樹以後,hash值爲-2
                            binCount = 1;            
                            for (Node<K,V> e = f;; ++binCount) {    //遍歷這個鏈表
                                K ek;
                                if (e.hash == hash &&        //要存的元素的hash,key跟要存儲的位置的節點的相同的時候,替換掉該節點的value便可
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)        //當使用putIfAbsent的時候,只有在這個key沒有設置值得時候才設置
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {    //若是不是一樣的hash,一樣的key的時候,則判斷該節點的下一個節點是否爲空,
                                    pred.next = new Node<K,V>(hash, key,        //爲空的話把這個要加入的節點設置爲當前節點的下一個節點
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {    //表示已經轉化成紅黑樹類型了
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,    //調用putTreeVal方法,將該元素添加到樹中去
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)    //當在同一個節點的數目達到8個的時候,則擴張數組或將給節點的數據轉爲tree
                        treeifyBin(tab, i);    
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);    //計數
        return null;
    }

 

 七、ConcurrentHashMap的擴容詳解

   在put方法的詳解中,咱們能夠看到,在同一個節點的個數超過8個的時候,會調用treeifyBin方法來看看是擴容仍是轉化爲一棵樹

  同時在每次添加完元素的addCount方法中,也會判斷當前數組中的元素是否達到了sizeCtl的量,若是達到了的話,則會進入transfer方法去擴容

/**
     * Replaces all linked nodes in bin at given index unless table is
     * too small, in which case resizes instead.
     * 當數組長度小於64的時候,擴張數組長度一倍,不然的話把鏈表轉爲樹
     */
    private final void treeifyBin(Node<K,V>[] tab, int index) {
        Node<K,V> b; int n, sc;
        if (tab != null) {
                System.out.println("treeifyBin方\t==>數組長:"+tab.length);
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY 64
                tryPresize(n << 1);        // 數組擴容
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {    //使用synchronized同步器,將該節點出的鏈表轉爲樹
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;    //hd:樹的頭(head)
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)        //把Node組成的鏈表,轉化爲TreeNode的鏈表,頭結點任然放在相同的位置
                                hd = p;    //設置head
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的鏈表放入容器TreeBin中
                    }
                }
            }
        }
    }

  能夠看到當須要擴容的時候,調用的時候tryPresize方法,看看trePresize的源碼

    /**
     * 擴容表爲指能夠容納指定個數的大小(老是2的N次方)
     * 假設原來的數組長度爲16,則在調用tryPresize的時候,size參數的值爲16<<1(32),此時sizeCtl的值爲12
     * 計算出來c的值爲64,則要擴容到sizeCtl≥爲止
     *  第一次擴容以後 數組長:32 sizeCtl:24
     *  第二次擴容以後 數組長:64 sizeCtl:48
     *  第二次擴容以後 數組長:128 sizeCtl:94 --> 這個時候纔會退出擴容
     */
    private final void tryPresize(int size) {
            /*
             * MAXIMUM_CAPACITY = 1 << 30
             * 若是給定的大小大於等於數組容量的一半,則直接使用最大容量,
             * 不然使用tableSizeFor算出來
             * 後面table一直要擴容到這個值小於等於sizeCtrl(數組長度的3/4)才退出擴容
             */
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
//            printTable(tab);    調試用的
            /*
             * 若是數組table尚未被初始化,則初始化一個大小爲sizeCtrl和剛剛算出來的c中較大的一個大小的數組
             * 初始化的時候,設置sizeCtrl爲-1,初始化完成以後把sizeCtrl設置爲數組長度的3/4
             * 爲何要在擴張的地方來初始化數組呢?這是由於若是第一次put的時候不是put單個元素,
             * 而是調用putAll方法直接put一個map的話,在putALl方法中沒有調用initTable方法去初始化table,
             * 而是直接調用了tryPresize方法,因此這裏須要作一個是否是須要初始化table的判斷
             */
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {    //初始化tab的時候,把sizeCtl設爲-1
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            /*
             * 一直擴容到的c小於等於sizeCtl或者數組長度大於最大長度的時候,則退出
             * 因此在一次擴容以後,不是原來長度的兩倍,而是2的n次方倍
             */
            else if (c <= sc || n >= MAXIMUM_CAPACITY) {
                    break;    //退出擴張
            }
            else if (tab == table) {
                int rs = resizeStamp(n);
                /*
                 * 若是正在擴容Table的話,則幫助擴容
                 * 不然的話,開始新的擴容
                 * 在transfer操做,將第一個參數的table中的元素,移動到第二個元素的table中去,
                 * 雖然此時第二個參數設置的是null,可是,在transfer方法中,當第二個參數爲null的時候,
                 * 會建立一個兩倍大小的table
                 */
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    /*
                     * transfer的線程數加一,該線程將進行transfer的幫忙
                     * 在transfer的時候,sc表示在transfer工做的線程數
                     */
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                /*
                 * 沒有在初始化或擴容,則開始擴容
                 */
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2)) {
                        transfer(tab, null);
                }
            }
        }
    }

在tryPresize方法中,並無加鎖,容許多個線程進入,若是數組正在擴張,則當前線程也去幫助擴容。

數組擴容的主要方法就是transfer方法

 

/**
     * Moves and/or copies the nodes in each bin to new table. See
     * above for explanation.
     * 把數組中的節點複製到新的數組的相同位置,或者移動到擴張部分的相同位置
     * 在這裏首先會計算一個步長,表示一個線程處理的數組長度,用來控制對CPU的使用,
     * 每一個CPU最少處理16個長度的數組元素,也就是說,若是一個數組的長度只有16,那只有一個線程會對其進行擴容的複製移動操做
     * 擴容的時候會一直遍歷,知道複製完全部節點,沒處理一個節點的時候會在鏈表的頭部設置一個fwd節點,這樣其餘線程就會跳過他,
     * 複製後在新數組中的鏈表不是絕對的反序的
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    //MIN_TRANSFER_STRIDE 用來控制不要佔用太多CPU
            stride = MIN_TRANSFER_STRIDE; // subdivide range    //MIN_TRANSFER_STRIDE=16
        /*
         * 若是複製的目標nextTab爲null的話,則初始化一個table兩倍長的nextTab
         * 此時nextTable被設置值了(在初始狀況下是爲null的)
         * 由於若是有一個線程開始了表的擴張的時候,其餘線程也會進來幫忙擴張,
         * 而只是第一個開始擴張的線程須要初始化下目標數組
         */
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        /*
         * 建立一個fwd節點,這個是用來控制併發的,當一個節點爲空或已經被轉移以後,就設置爲fwd節點
         * 這是一個空的標誌節點
         */
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;    //是否繼續向前查找的標誌位
        boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成以前從新在掃描一遍數組,看看有沒完成的沒
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing) {
                    advance = false;
                }
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {        //已經完成轉移
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);    //設置sizeCtl爲擴容後的0.75
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                            return;
                    }
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)            //數組中把null的元素設置爲ForwardingNode節點(hash值爲MOVED[-1])
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {                //加鎖操做
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {        //該節點的hash值大於等於0,說明是一個Node節點
                                /*
                                 * 由於n的值爲數組的長度,且是power(2,x)的,因此,在&操做的結果只多是0或者n
                                 * 根據這個規則
                                 *         0-->  放在新表的相同位置
                                 *         n-->  放在新表的(n+原來位置)
                                 */
                            int runBit = fh & n; 
                            Node<K,V> lastRun = f;
                            /*
                             * lastRun 表示的是須要複製的最後一個節點
                             * 每當新節點的hash&n -> b 發生變化的時候,就把runBit設置爲這個結果b
                             * 這樣for循環以後,runBit的值就是最後不變的hash&n的值
                             * 而lastRun的值就是最後一次致使hash&n 發生變化的節點(假設爲p節點)
                             * 爲何要這麼作呢?由於p節點後面的節點的hash&n 值跟p節點是同樣的,
                             * 因此在複製到新的table的時候,它確定仍是跟p節點在同一個位置
                             * 在複製完p節點以後,p節點的next節點仍是指向它原來的節點,就不須要進行復制了,本身就被帶過去了
                             * 這也就致使了一個問題就是複製後的鏈表的順序並不必定是原來的倒序
                             */
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;    //n的值爲擴張前的數組的長度
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            /*
                             * 構造兩個鏈表,順序大部分和原來是反的
                             * 分別放到原來的位置和新增長的長度的相同位置(i/n+i)
                             */
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                        /*
                                         * 假設runBit的值爲0,
                                         * 則第一次進入這個設置的時候至關於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同爲0的節點)設置到舊的table的第一個hash計算後爲0的節點下一個節點
                                         * 而且把本身返回,而後在下次進來的時候把它本身設置爲後面節點的下一個節點
                                         */
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                        /*
                                         * 假設runBit的值不爲0,
                                         * 則第一次進入這個設置的時候至關於把舊的序列的最後一次發生hash變化的節點(該節點後面可能還有hash計算後同不爲0的節點)設置到舊的table的第一個hash計算後不爲0的節點下一個節點
                                         * 而且把本身返回,而後在下次進來的時候把它本身設置爲後面節點的下一個節點
                                         */
                                    hn = new Node<K,V>(ph, pk, pv, hn);    
                            }
                            setTabAt(nextTab, i, ln);    
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {    //不然的話是一個樹節點
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            /*
                             * 在複製完樹節點以後,判斷該節點處構成的樹還有幾個節點,
                             * 若是≤6個的話,就轉回爲一個鏈表
                             */
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

  到這裏,ConcurrentHashMap的put操做和擴容都介紹的差很少了,

  下面的兩點必定要注意:

    ·複製以後的新鏈表不是舊鏈表的絕對倒序。

    ·在擴容的時候每一個線程都有處理的步長,最少爲16,在這個步長範圍內的數組節點只有本身一個線程來處理

八、ConcurrentHashMap的get操做詳解

  相比put操做,get操做就顯得很簡單了。廢話少說,直接上源碼分析。

/*
     * 相比put方法,get就很單純了,支持併發操做,
     * 當key爲null的時候回拋出NullPointerException的異常
     * get操做經過首先計算key的hash值來肯定該元素放在數組的哪一個位置
     * 而後遍歷該位置的全部節點
     * 若是不存在的話返回null
     */
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

 

九、ConcurrentHashMap的同步機制

  前面分析了下ConcurrentHashMap的源碼,那麼,對於一個映射集合來講,ConcurrentHashMap是若是來作到併發安全,又是如何作到高效的併發的呢?

  首先是讀操做,從源碼中能夠看出來,在get操做中,根本沒有使用同步機制,也沒有使用unsafe方法,因此讀操做是支持併發操做的。

  那麼寫操做呢?

  分析這個以前,先看看什麼狀況下會引發數組的擴容,擴容是經過transfer方法來進行的。而調用transfer方法的只有trePresize、helpTransfer和addCount三個方法。

  這三個方法又是分別在什麼狀況下進行調用的呢?

  ·tryPresize是在treeIfybin和putAll方法中調用,treeIfybin主要是在put添加元素完以後,判斷該數組節點相關元素是否是已經超過8個的時候,若是超過則會調用這個方法來擴容數組或者把鏈表轉爲樹。

  ·helpTransfer是在當一個線程要對table中元素進行操做的時候,若是檢測到節點的HASH值爲MOVED的時候,就會調用helpTransfer方法,在helpTransfer中再調用transfer方法來幫助完成數組的擴容

  ·addCount是在當對數組進行操做,使得數組中存儲的元素個數發生了變化的時候會調用的方法。

  

  因此引發數組擴容的狀況以下

  ·只有在往map中添加元素的時候,在某一個節點的數目已經超過了8個,同時數組的長度又小於64的時候,纔會觸發數組的擴容。

  ·當數組中元素達到了sizeCtl的數量的時候,則會調用transfer方法來進行擴容

  

  那麼在擴容的時候,能夠不能夠對數組進行讀寫操做呢?

  事實上是能夠的。當在進行數組擴容的時候,若是當前節點尚未被處理(也就是說尚未設置爲fwd節點),那就能夠進行設置操做。

  若是該節點已經被處理了,則當前線程也會加入到擴容的操做中去。

  

  那麼,多個線程又是如何同步處理的呢?

  在ConcurrentHashMap中,同步處理主要是經過Synchronized和unsafe兩種方式來完成的。

  ·在取得sizeCtl、某個位置的Node的時候,使用的都是unsafe的方法,來達到併發安全的目的

  ·當須要在某個位置設置節點的時候,則會經過Synchronized的同步機制來鎖定該位置的節點。

  ·在數組擴容的時候,則經過處理的步長和fwd節點來達到併發安全的目的,經過設置hash值爲MOVED

  ·當把某個位置的節點複製到擴張後的table的時候,也經過Synchronized的同步機制來保證現程安全

 

 十、鏈表轉爲紅黑樹的過程 

   前面在講解tryifyBin的源碼的時候講到過,若是在當個bin上的元素超過了8個的時候,就會嘗試去擴容數組或者是將鏈表轉爲紅黑樹。

源碼:

private final void treeifyBin(Node<K,V>[] tab, int index) {
        System.out.println("當前線程:"+Thread.currentThread().getName()+"進入treeifyBin方法");
        Node<K,V> b; int n, sc;
        if (tab != null) {
            if ((n = tab.length) < MIN_TREEIFY_CAPACITY)    //MIN_TREEIFY_CAPACITY 64
                tryPresize(n << 1);        // 數組擴容
            else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                synchronized (b) {    //使用synchronized同步器,將該節點出的鏈表轉爲樹
                    if (tabAt(tab, index) == b) {
                        TreeNode<K,V> hd = null, tl = null;    //hd:樹的頭(head)
                        for (Node<K,V> e = b; e != null; e = e.next) {
                            TreeNode<K,V> p =
                                new TreeNode<K,V>(e.hash, e.key, e.val,
                                                  null, null);
                            if ((p.prev = tl) == null)        //把Node組成的鏈表,轉化爲TreeNode的鏈表,頭結點任然放在相同的位置
                                hd = p;    //設置head
                            else
                                tl.next = p;
                            tl = p;
                        }
                        setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的鏈表放入容器TreeBin中
                    }
                }
            }
        }
    }

  首先將Node的鏈表轉化爲一個TreeNode的鏈表,而後將TreeNode鏈表的頭結點來構造一個TreeBin。  

  下面是TreeBin構造方法的源碼:

TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);    //建立的TreeBin是一個空節點,hash值爲TREEBIN(-2)
            this.first = b;
            TreeNode<K,V> r = null;
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x;
                }//
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {//x表明的是轉換爲樹以前的順序遍歷到鏈表的位置的節點,r表明的是根節點
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)    //
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);    //當key不能夠比較,或者相等的時候採起的一種排序措施
                            TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {//在這裏判斷要放的left/right是否爲空,不爲空繼續用left/right節點來判斷
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x); //每次插入一個元素的時候都調用balanceInsertion來保持紅黑樹的平衡
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

 

 

 

  轉化的過程大概以下:

 

  接下來,用鏈表頭部的TreeNode來構造一個TreeBin,在TreeBin容器中,將鏈表轉化爲紅黑樹。

      首先是構造一個以下的TreeBin空節點。

  構造完TreeBin這個空節點以後,就開始構造紅黑樹,首先是第一個節點,左右子節點設置爲空,做爲紅黑樹的root節點,設置爲黑色,父節點爲空。

              

接下來遍歷鏈表的後續節點,沒添加一個元素的時候,都會經過判斷hash值來決定是放在根節點的左節點仍是有節點,若是左/右節點不爲空,則繼續以左/右節點來重複判斷,直到左/右節點爲空,則添加到左/右位置。

 

            

 

  而後在每次添加完一個節點以後,都會調用balanceInsertion方法來維持這是一個紅黑樹的屬性和平衡性。紅黑樹全部操做的複雜度都是O(logn),因此當元素量比較大的時候,效率也很高。

相關文章
相關標籤/搜索