J.U.C 之ConcurrentHashMap(JDK1.7)

Hashmap多線程會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形數據結構,Entry的next節點永遠不爲空,就會產生死循環獲取Entry。java

HashTable使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低。node

原理和實現

分段鎖技術

HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由,是由於全部訪問HashTable的線程都必須競爭同一把鎖。算法

那假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效的提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。數組

另外,ConcurrentHashMap能夠作到讀取數據不加鎖,而且其內部的結構可讓其在進行寫操做的時候可以將鎖的粒度保持地儘可能地小,不用對整個ConcurrentHashMap加鎖。安全

ConcurrentHashMap的內部結構

ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。數據結構

Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap裏扮演鎖的角色,HashEntry則用於存儲鍵值對數據。多線程

一個ConcurrentHashMap裏包含一個Segment數組,Segment的結構和HashMap相似,是一種數組和鏈表結構,併發

一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,app

每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到它對應的Segment鎖。ssh

結構圖以下:

從上面的結構咱們能夠了解到,ConcurrentHashMap定位一個元素的過程須要進行兩次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部,所以,這一種結構的帶來的反作用是Hash的過程要比普通的HashMap要長,可是帶來的好處是寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segment,這樣,在最理想的狀況下,ConcurrentHashMap能夠最高同時支持Segment數量大小的寫操做(恰好這些寫操做都很是平均地分佈在全部的Segment上),因此,經過這一種結構,ConcurrentHashMap的併發能力能夠大大的提升。

ConcurrentHashMap源碼分析

Segment

static final class Segment<K,V> extends ReentrantLock implements Serializable { 
    transient volatile int count; 
    transient int modCount; 
    transient int threshold; 
    transient volatile HashEntry<K,V>[] table; 
    final float loadFactor; 
} 
複製代碼
  1. count:Segment中元素的數量
  2. modCount:對table的大小形成影響的操做的數量(好比put或者remove操做)
  3. threshold:閾值,Segment裏面元素的數量超過這個值依舊就會對Segment進行擴容
  4. table:鏈表數組,數組中的每個元素表明了一個鏈表的頭部
  5. loadFactor:負載因子,用於肯定threshold

count用來統計該段數據的個數,它是volatile變量,它用來協調修改和讀取操做,以保證讀取操做可以讀取到幾乎最新的修改。協調方式是這樣的,每次修改操做作告終構上的改變,如增長/刪除節點(修改節點的值不算結構上的改變),都要寫count值,每次讀取操做開始都要讀取count的值。這利用了 Java 5中對volatile語義的加強,對同一個volatile變量的寫和讀存在happens-before關係。

modCount統計段結構改變的次數,主要是爲了檢測對多個段進行遍歷過程當中某個段是否發生改變。

threashold用來表示須要進行rehash的界限值。

table數組存儲段中節點,每一個數組元素是個hash鏈,用HashEntry表示。table也是volatile,這使得可以讀取到最新的 table值而不須要同步。loadFactor表示負載因子。

HashEntry

Segment中的元素是以HashEntry的形式存放在鏈表數組中的,看一下HashEntry的結構:

static final class HashEntry<K,V> { 
    final K key; 
    final int hash; 
    volatile V value; 
    final HashEntry<K,V> next; 
} 
複製代碼

能夠看到HashEntry的一個特色,除了value之外,其餘的幾個變量都是final的,這意味着不能從hash鏈的中間或尾部添加或刪除節點,由於這須要修改next 引用值,全部的節點的修改只能從頭部開始(頭插法)

對於put操做,能夠一概添加到Hash鏈的頭部。

可是對於remove操做,可能須要從中間刪除一個節點,這就須要將要刪除節點的前面全部節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點。。爲了確保讀操做可以看到最新的值,將value設置成volatile,這避免了加鎖。

ConcurrentHashMap的成員變量

...
    //初始的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    //初始的加載因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    //初始的併發等級,表示當前更新線程的估計數
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //最大容量
    static final int MAXIMUM_CAPACITY = 1 << 30;
    //最小的segment數量
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
    //最大的segment數量
    static final int MAX_SEGMENTS = 1 << 16; 
    //
    static final int RETRIES_BEFORE_LOCK = 2;
    // segments 的掩碼值, key 的散列碼的高位用來選擇具體的 segment
    final int segmentMask; 
    // 偏移量
    final int segmentShift; 
    final Segment<K,V>[] segments; 
複製代碼

ConcurrentHashMap的初始化

// 建立一個帶有指定初始容量、加載因子和併發級別的新的空映射
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // 尋找最佳匹配參數(不小於給定參數的最接近的 2^n)
    int sshift = 0; // 用來記錄向左按位移動的次數
    int ssize = 1; // 用來記錄Segment數組的大小
    // 計算並行級別 ssize,由於要保持並行級別是 2^n
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
     //用於定位元素所在segment。
    //segmentShift表示偏移位數,經過前面的int類型的位的描述咱們能夠得知,int類型的數字在變大的過程當中,
    //低位老是比高位先填滿的,爲保證元素在segment級別分佈的儘可能均勻,計算元素所在segment時,
    //老是取hash值的高位進行計算。segmentMask做用就是爲了利用位運算中取模的操做:
    //a % (Math.pow(2,n)) 等價於 a&( Math.pow(2,n)-1)
    // 若爲默認值,concurrencyLevel 爲 16,sshift 爲 4
    // 那麼計算出 segmentShift 爲 28,segmentMask 爲 15
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    // 記錄每一個 Segment 上要放置多少個元素
    int c = initialCapacity / ssize;
    // 假若有餘數,則Segment數量加1
    if (c * ssize < initialCapacity)
        ++c;
    //保證每一個Segment中tabel數組的大小,必定爲2的冪,初始化的三個參數取默認值時,table數組大小爲2
    int cap = MIN_SEGMENT_TABLE_CAPACITY; 
    while (cap < c) 
        cap <<= 1; 
   
    // create segments and segments[0]
    //初始化Segment數組,並實際只填充Segment數組的第0個元素。
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
} 
複製代碼

CurrentHashMap的初始化一共有三個參數:

  1. 一個initialCapacity,表示初始的容量,
  2. 一個loadFactor,表示負載參數,
  3. 最後一個是concurrentLevel,表明ConcurrentHashMap內部的Segment的數量,ConcurrentLevel一經指定,不可改變,後續若是ConcurrentHashMap的元素數量增長致使ConrruentHashMap須要擴容,ConcurrentHashMap不會增長Segment的數量,而只會增長Segment中鏈表數組的容量大小,這樣的好處是擴容過程不須要對整個ConcurrentHashMap作rehash,而只須要對Segment裏面的元素作一次rehash就能夠了。

整個ConcurrentHashMap的初始化方法仍是很是簡單的,先是根據concurrentLevel來new出Segment,這裏Segment的數量是不大於concurrentLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操做來進行hash,加快hash的過程。

接下來就是根據intialCapacity肯定Segment的容量的大小,每個Segment的容量大小也是2的指數,一樣使爲了加快hash的過程。

這邊須要特別注意一下兩個變量,分別是segmentShift和segmentMask,這兩個變量在後面將會起到很大的做用,假設構造函數肯定了Segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。

當用 new ConcurrentHashMap() 無參構造函數進行初始化的,那麼初始化完成後:

  1. Segment 數組長度爲 16,不能夠擴容
  2. Segment[i] 的默認大小爲 2,負載因子是 0.75,得出初始閾值爲 1.5,也就是之後插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
  3. 這裏初始化了 segment[0],其餘位置仍是 null
  4. 當前 segmentShift 的值爲 32 – 4 = 28,segmentMask 爲 16 – 1 = 15,姑且把它們簡單 翻譯 爲移位數和掩碼,這兩個值立刻就會用到

hash()方法

private int hash(Object k) {
        int h = hashSeed;
        //若是Key是字符串類型,則使用專門爲字符串設計的Hash方法,不然使用一連串的異或操做增長hash隨機性
        if ((0 != h) && (k instanceof String)) {
            return sun.misc.Hashing.stringHash32((String) k);
        }

        h ^= k.hashCode();

        // Spread bits to regularize both segment and index locations,
        // using variant of single-word Wang/Jenkins hash.
        h += (h <<  15) ^ 0xffffcd7d;
        h ^= (h >>> 10);
        h += (h <<   3);
        h ^= (h >>>  6);
        h += (h <<   2) + (h << 14);
        return h ^ (h >>> 16);
    }
複製代碼

初始化Segment

ConcurrentHashMap 初始化的時候會初始化第一個槽 segment[0],對於其餘槽來講,在插入第一個值的時候進行初始化。

這裏須要考慮併發,由於極可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就能夠。

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 這裏看到爲何以前要初始化 segment[0] 了,
        // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k]
        // 爲何要用「當前」,由於 segment[0] 可能早就擴容過了
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        // 初始化 segment[k] 內部的數組
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { // recheck Segment[k] 是否被其它線程初始化了
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            // 使用 while 循環,內部用 CAS,當前線程成功設值或其餘線程成功設值後,退出
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}
複製代碼

put過程分析

當執行put方法插入數據時,根據key的hash值,在Segment數組中找到相應的位置,若是相應位置的Segment還未初始化,則經過CAS進行賦值,接着執行Segment對象的put方法經過加鎖機制插入數據

場景:線程A和線程B同時執行相同Segment對象的put方法

1. 線程A執行tryLock()方法成功獲取鎖,則把HashEntry對象插入到相應的位置;
2. 線程B獲取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會經過重複執行tryLock()方法嘗試獲取鎖,在多處理器環境下,重複次數爲64,單處理器重複次數爲1,當執行tryLock()方法的次數超過上限時,則執行lock()方法掛起線程B;
3. 當線程A執行完插入操做時,會經過unlock()方法釋放鎖,接着喚醒線程B繼續執行;
複製代碼

put 方法的過程:

  1. 判斷value是否爲null,若是爲null,直接拋出異常。注:不容許key或者value爲null

  2. 經過哈希算法定位到Segment(key經過一次hash運算獲得一個hash值,將獲得hash值向右按位移動segmentShift位,而後再與segmentMask作&運算獲得segment的索引j)。

  3. 使用Unsafe的方式從Segment數組中獲取該索引對應的Segment對象

  4. 向這個Segment對象中put值

注:對共享變量進行寫入操做爲了線程安全,在操做共享變量時必須得加鎖,持有段鎖(鎖定整個segment)的狀況下執行的。修改數據是不能併發進行的

判斷該值的插入是否會致使該 segment 的元素個數超過閾值,以確保容量不足時可以rehash擴容,再插值。

注:rehash 擴容 segment 數組不能擴容,擴容的是 segment 數組某個位置內部的數組 HashEntry[] 擴容爲原來的 2 倍。先進行擴容,再插值

查找是否存在一樣一個key的結點,存在直接替換這個結點的值。不然建立一個新的結點並添加到hash鏈的頭部,修改modCount和count的值,修改count的值必定要放在最後一步。

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);
    // 根據 hash 值找到 Segment 數組中的位置 j
    // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下高 4 位,
    // 而後和 segmentMask(15) 作一次與操做,也就是說 j 是 hash 值的高 4 位,也就是槽的數組下標
    int j = (hash >>> segmentShift) & segmentMask;
    // 剛剛說了,初始化的時候初始化了 segment[0],可是其餘位置仍是 null,
    // ensureSegment(j) 對 segment[j] 進行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
        s = ensureSegment(j);
    // 插入新值到 槽 s 中
    return s.put(key, hash, value, false);
}   
複製代碼

Segment 內部是由 數組+鏈表 組成的。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先獲取該 segment 的獨佔鎖
    // 每個Segment進行put時,都會加鎖
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // segment 內部的數組
        HashEntry<K,V>[] tab = table;
        // 利用 hash 值,求應該放置的數組下標
        int index = (tab.length - 1) & hash;
        // 數組該位置處的鏈表的表頭
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            // 若是鏈頭不爲 null
            if (e != null) {
                K k;
                //若是在該鏈中找到相同的key,則用新值替換舊值,並退出循環
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                //若是沒有和key相同的,一直遍歷到鏈尾,鏈尾的next爲null,進入到else
                e = e.next;
            }
            else {
                // node 究竟是不是 null,這個要看獲取鎖的過程,不過和這裏都沒有關係。
                // 若是不爲 null,那就直接將它設置爲鏈表表頭;若是是null,初始化並設置爲鏈表表頭。
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);
                int c = count + 1;
                // 若是超過了該 segment 的閾值,這個 segment 須要擴容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 沒有達到閾值,將 node 放到數組 tab 的 index 位置,
                    // 其實就是將新的節點設置成原鏈表的表頭
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解鎖
        unlock();
    }
    return oldValue;
}
複製代碼

get()方法

  1. 計算 hash 值,找到 segment 數組中的具體位置,使用Unsafe獲取對應的Segment

  2. 根據 hash 找到數組中具體的位置

  3. 從鏈表頭開始遍歷整個鏈表(由於Hash可能會有碰撞,因此用一個鏈表保存),若是找到對應的key,則返回對應的value值,不然返回null。

注:get操做不須要鎖,因爲其中涉及到的共享變量都使用volatile修飾,volatile能夠保證內存可見性,因此不會讀取到過時數據。

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}
複製代碼

remove操做

Remove操做的前面一部分和前面的get和put操做同樣,都是定位Segment的過程,而後再調用Segment的remove方法:

final V remove(Object key, int hash, Object value) {
    if (!tryLock())
        scanAndLock(key, hash);
    V oldValue = null;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> e = entryAt(tab, index);
        HashEntry<K,V> pred = null;
        while (e != null) {
            K k;
            HashEntry<K,V> next = e.next;
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                V v = e.value;
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null)
                        setEntryAt(tab, index, next);
                    else
                        pred.setNext(next);
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }
            pred = e;
            e = next;
        }
    } finally {
        unlock();
    }
    return oldValue;
}
複製代碼

首先remove操做也是肯定須要刪除的元素的位置,不過這裏刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向後面一個就完事了,前面已經說過HashEntry中的next是final的,一經賦值之後就不可修改,在定位到待刪除元素的位置之後,程序就將待刪除元素前面的那一些元素所有複製一遍,而後再一個一個從新接到鏈表上去,看一下下面這一幅圖來了解這個過程:

假設鏈表中原來的元素如上圖所示,如今要刪除元素3,那麼刪除元素3之後的鏈表就以下圖所示:

注意:圖1和2的元素順序相反了,爲何這樣,不防再仔細看看源碼或者再讀一遍上面remove的分析過程,元素複製是從待刪除元素位置起將前面的元素逐一複製的,而後再將後面的連接起來。

size 操做

size操做須要遍歷全部的Segment才能算出整個Map的大小。先採用不加鎖的方式,循環全部的Segment(經過Unsafe的getObjectVolatile()以保證原子讀語義)連續計算元素的個數,最多計算3次:

  1. 若是先後兩次計算結果相同,則說明計算出來的元素個數是準確的;
  2. 若是先後兩次計算結果都不一樣,則給每一個Segment進行加鎖,再計算一次元素的個數;

注:在put,remove和clean方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生變化。

相關文章
相關標籤/搜索