Concurrent Hash Map源碼閱讀

PS.本文基於JDK1.8java

前言

你們都知道HashMap是線程不安全的,想要在併發的環境中使用,用什麼呢?HashTable?採用syncgronized加鎖,致使效率及其底下.在java5以後jdk提供了另外一個類,ConcurrentHashMap,極大的提高了併發下的性能.node

此次將閱讀ConcurrentHashMap的源碼並記錄關鍵知識.算法

實現原理

數據結構

與HashMap的數據結構同步,在JDK1.7中使用數組+鏈表,在JDK1.8以後使用數組+鏈表+紅黑樹.數組

併發

在1.7版本,使用鎖分離技術,即ConcurrentHashMap由Segment組成,每一個Segment包含一些Node存儲鍵值對. 而每一個Segment都有一把鎖.併發性能依賴於Segment的粒度,當你將整個HashMap放入同一個Segment,ConcurrentHashMap會退化成HashMap.安全

1.8版本中,摒棄了鎖分離的概念,雖然保留了Segment,可是隻是爲了兼容老的版本.數據結構

1.8中使用CAS算法+鎖來保證併發性能及線程安全併發

CAS 算法

通俗的講(個人理解)就是:在每一次操做的時候參數中帶有預期值(舊值),當且僅當內存中的值與預期值相同的時候,才寫入新值.性能

源碼逐步解析

注意,本文只解讀JDK1.8版本的ConcurrentHashMap,在源碼中與之前版本有關的東西略過.學習

常量

//最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;

//默認容量,且必須爲2的次冪
private static final int DEFAULT_CAPACITY = 16;

//負載因子,決定什麼時候擴容
private static final float LOAD_FACTOR = 0.75f;

//鏈表轉紅黑樹的閥值,>8
static final int TREEIFY_THRESHOLD = 8;

//紅黑樹轉鏈表的閥值,<6
static final int UNTREEIFY_THRESHOLD = 6;

//樹的最小容量
static final int MIN_TREEIFY_CAPACITY = 64;

//正在擴容的標示位
static final int MOVED     = -1; // hash for forwarding nodes
//樹的根節點標識
static final int TREEBIN   = -2; // hash for roots of trees
複製代碼

常量的定義較爲簡單,這裏只列出了一些經常使用的常量,還有一些在具體使用時再貼.spa

代碼中已加入註釋,一看就懂.

屬性

//node的數組,
transient volatile Node<K,V>[] table;

//node的數組,擴容時候使用
private transient volatile Node<K,V>[] nextTable;

//計數值,也是用CAS修改
private transient volatile long baseCount;

/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */
 //這是一個標識位,當爲-1的時候表明正在初始化,當爲-N的時候表明有N - 1個線程正在擴容,
 //當爲正數的時候表明下一次擴容後的大小
private transient volatile int sizeCtl;
複製代碼

這裏面有一個重要的屬性sizeCtl,保留了源碼的註釋及添加了個人理解.

數據節點Node類

static class Node<K,V> implements Map.Entry<K,V> {
     final int hash;
     final K key;
     volatile V val;
     volatile Node<K,V> next;
}
複製代碼

對於Node類的構造方法以及getter/setter進行了省略.只保留了屬性.

能夠看到共有四個屬性

  1. final修飾的hash值,初始化後不能再次改變.
  2. final修飾的key,初始化後不能再次改變.
  3. volatile 修飾的值
  4. volatile 修飾的下一節點指針

hash和key都被final修飾,不會存在線程安全問題,而value及next被volatile修飾,保證了線程間的數據可見性.

三個重要的原子方法

//獲取數組i位置的node
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

//cas實現插入
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

//直接插入,此方法僅在上鎖的區域被調用
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
複製代碼

構造方法

構造方法十分簡單,這裏再也不貼代碼,只是須要注意:

在建立對象的時候沒有進行Node數組的初始化,初始化操做在put時進行.

get()方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //獲取hash值
    int h = spread(key.hashCode());
    //經過tabat獲取hash桶
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //若是該hash桶的第一個節點就是查找結果,則返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        //第一個節點是樹的根節點,按照樹的方式進行遍歷查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //第一個節點是鏈表的根節點,按照鏈表的方式遍歷查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
複製代碼

能夠看到,在get()方法的過程當中,是沒有進行加鎖操做的,那麼是如何保證線程安全的呢?

  1. 首先經過tabat獲取hash桶的根節點
  2. 遍歷的時候根據node的volatile屬性next.
  3. 返回時讀取node的volatile屬性val.

全部的操做屬性都是volatile,由該關鍵字保證內存的可見性,進一步保證讀取時的線程安全.

put()方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    //獲取hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    //遍歷數組
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //若是當前數組還未初始化,則進行初始化操做
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //若是已經初始化且要插入的位置爲null,則直接使用cas方式進行插入,沒有加鎖
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //若是當前節點爲擴容標識節點,則幫助擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            //對該hash桶進行加鎖
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //鏈表狀況的插入
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //紅黑樹的插入
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            //檢查長度是否超過閥值,若是超過則由鏈表轉成紅黑樹
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //size屬性加1,若是過長則擴容
    addCount(1L, binCount);
    return null;
}
複製代碼

put方法中的流程:

  1. 獲取hash值
  2. 遍歷數組
  3. 若是未初始化則初始化
  4. 若是要插入的位置爲null,則使用cas插入,不加鎖
  5. 若是要插入的位置爲擴容標識節點,則幫助其擴容
  6. 對插入的hash桶加鎖
  7. 按照紅黑樹或者鏈表的方式進行插入
  8. 檢查插入後鏈表長度是否超過閥值,若是超過則轉爲紅黑樹
  9. 添加計數,若是添加後的數量大於擴容閥值,則進行擴容.

remove()方法

public V remove(Object key) {
        return replaceNode(key, null, null);
    }

    /** 參數value:當 value==null 時 ,刪除節點 。不然 更新節點的值爲value 參數cv:一個指望值, 當 map[key].value 等於指望值cv 或者 cv==null的時候 ,刪除節點,或者更新節點的值 */
    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //table尚未初始化或者key對應的hash桶爲空
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            //正在擴容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    //cas獲取tab[i],若是此時tab[i]!=f,說明其餘線程修改了tab[i]。回到for循環開始處,從新執行
                    if (tabAt(tab, i) == f) {
                        //node鏈表
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                //找的key對應的node
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    //cv參數表明指望值
                                    //cv==null:表示直接更新value/刪除節點
                                    //cv不爲空,則只有在key的oldValue等於指望值的時候,才更新value/刪除節點

                                    //符合更新value或者刪除節點的條件
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        //更新value
                                        if (value != null)
                                            e.val = value;
                                        //刪除非頭節點
                                        else if (pred != null)
                                            pred.next = e.next;
                                        //刪除頭節點
                                        else
                                            //由於已經獲取了頭結點鎖,因此此時不須要使用casTabAt
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                //當前節點不是目標節點,繼續遍歷下一個節點
                                pred = e;
                                if ((e = e.next) == null)
                                    //到達鏈表尾部,依舊沒有找到,跳出循環
                                    break;
                            }
                        }
                        //紅黑樹
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        //若是刪除了節點,更新size
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }
複製代碼

remove方法中流程:

  1. 獲取hash值
  2. 若是未初始化或者該hash值對應的hash桶爲空,則直接返回
  3. 若是正在擴容則幫助擴容
  4. 對該hash桶加鎖
  5. 遍歷該hash桶處的鏈表或者紅黑樹,更新或者刪除節點.

後話

本文記錄了ConcurrentHashMap的基本原理及幾個經常使用方法的實現,但因爲才疏學淺以及ConcurrentHashMap的複雜性,文中可能會有些許疏漏,若有錯誤歡迎隨時指出.

對於ConcurrentHashMap,建議仍是先學會使用,在有必定的併發基礎後再學習源碼,至少要了解volatile及synchronized關鍵字的實現機制以及JMM(java內存模型)的一些基礎知識.不然學習起來十分費勁(我看了很久,,),而且囫圇吞棗,學習以後收穫也不必定很大.

參考連接

www.jianshu.com/p/cf5e024d9… blog.csdn.net/u010723709/… www.jianshu.com/p/5bc70d9e5…


完。



ChangeLog

2018-11-18 完成

以上皆爲我的所思所得,若有錯誤歡迎評論區指正。

歡迎轉載,煩請署名並保留原文連接。

聯繫郵箱:huyanshi2580@gmail.com

更多學習筆記見我的博客------>呼延十

相關文章
相關標籤/搜索