HashMap、ConcurrentHashMap解析

1、HashMap分析node

  在JDK1.8以前,hashMap由數組+鏈表組成,1.8以後,對hashMap進行了一些修改,最大的不一樣就是利用了紅黑樹,因此其由數組+鏈表+紅黑樹組成。查找時,根據hash值咱們可以快速定位到數組的具體下標,可是以後的話,須要順着鏈表一個個比較下去才能找到咱們須要的,時間複雜度取決於鏈表的長度爲O(n),爲了下降這部分的開銷,在Java8中,當鏈表中的元素達到了8個時,會將鏈表轉換爲紅黑樹,在這些位置進行查找時能夠下降時間複雜度爲O(logn)。算法

1.put過程:(JDK1.8)數組

   第一次put值時,會觸發resize(),相似Java7的第一次put也是要初始化數組長度的。安全

   第一次resize和後續的擴容有些不同,由於此次是數組從null初始化到默認的16或自定義的初始容量,找到具體的數據下標,若是此位置沒有值,那麼直接初始化一下Node並放置在這個位置就能夠了。若是數組改成只有數據:首先,判斷該位置的第一個數據和咱們要插入的數據,key是否是「相等」,若是是,取出這個節點,若是該節點是表明紅黑樹的節點,調用紅黑樹的插值方法,插入到鏈表的最後面(Java7是插入到鏈表的最前面),當treeify_threshold爲8時,若是新插入的值是鏈表中的第8個,會觸發下面的treeifyBin,也就是將鏈表轉換爲紅黑樹;若是在該鏈表中找到了「相等」的key(==或equals);若是hashMap因爲新插入這個值致使size已經超過了閾值,則須要進行擴容。多線程

 JDK1.7併發

  當插入第一個元素時,須要先初始化數組大小;app

  1)求key的hash值(數組索引 index=hash&(size-1));ssh

  2)找到對應的數組下標;
函數

  3)遍歷一下對應下標處的鏈表,看是否有重複的key已經存在,若是有,直接覆蓋,put方法返回舊值就結束了;高併發

  4)不存在重複的key,將此entry添加到鏈表中。

2.數組擴容(當前的size已經達到了閾值,而且要插入的數組位置上已經有元素,那麼就會觸發擴容,擴容後,數組大小爲原來的2倍)(JDK1.8)

  resize()方法用於初始化數組或數組擴容,每次擴容後,容量爲原來的2倍,並進行數據遷移。

  1)對數組進行擴容,將數組擴大一倍,將閾值擴大一倍;

  2)第一次put時初始化數組;

  3)開始遍歷數組進行數據遷移;

  若是該數組位置上只有單個元素:那就簡單了,直接遷移這個元素就能夠了。

  若是是鏈表:須要將此鏈表拆成兩個鏈表,放到新的數組中,而且保留原來的前後順序,loHead,loTail對應一條鏈表,hiHead、hiTail對應另外一條鏈表。

 JDK 1.7 

  擴容就是一個新的大數組替換原來的小數組,並將原來數組中的值遷移到新的數組中。

  因爲是雙倍擴容,遷移過程當中,會將原來table[i] 中的鏈表的全部節點,分拆到新的數組的newTable[i] 和 newTable[i+oldLength]位置上。如原來數組長度爲16,那麼擴容後,原來table[0]出的鏈表中的全部元素會被分配到新數組中 newTable[0] 和 newTable[16] 這兩個位置。

3.get過程(JDK1.8)

  1)計算key的hash值,根據hash值找到對應的數組下標:hash&(length-1)

  2)判斷數組該位置處的元素是否恰好是咱們要找的,若是不是,走第三步;

  3)判斷該元素類型是不是TreeNode,若是是,用紅黑樹的方法取數據,若是不是,走第四步;

  4)遍歷鏈表,直到找到相等(==或equals)的key。

 JDK 1.7

  1)根據key計算hash值;

  2)找到對應的數組下標:hash&(length-1)

  3)遍歷該數組位置處的鏈表,直到找到相等的key。

4.爲何HashMap線程不安全?(hash衝突和擴容致使的)

  hashMap的實現使用了一個數組,每一個數組項裏面由一個鏈表方式來實現,由於hashMap使用key的hashcode來尋找存儲位置,不一樣的key可能具備相同的hashcode,這時就出現了哈希衝突,也叫哈希碰撞。爲了解決哈希衝突,有開放地址法,以及鏈地址法。hashMap的實現選取了鏈地址法,也就是將哈希值同樣的entry保存在同一個數組裏面,能夠把一個數組項看成一個桶,桶裏面裝的entry的key的hashcode是同樣的。

 擴容致使的不安全:

  1)put時致使的多線程數據不一致。好比有兩個線程A和B,首先A但願插入一個key-value對到hashMap中,首先計算記錄所要落到的桶裏面的鏈表頭結點,此時線程A的時間片用完了,而此時線程B被調用得以執行,和線程A同樣執行,只不過線程B成功將記錄插到了桶裏面,假設線程A插入的記錄計算出來的桶所引和線程B要插入的記錄計算出來的桶索引是同樣的,那麼當線程B成功插入後,線程A再次被調用運行時,它依然持有過時的鏈表頭可是它對此一無所知,以致於它認爲應該這樣作,如此一來就覆蓋了線程B插入的記錄,這樣線程B插入的記錄就憑空消失了,形成了數據不一致的行爲。

  2)get:擴容時將數組擴爲二倍後,原數組中的數組的索引會發生變化,在進行get時仍然用原來的索引進行尋找,致使找不到要尋找的值。

2、ConcurrentHashMap分析

  ConcurrentHashMap是線程安全且高效的HashMap。

1.線程不安全的HashMap

  hashMap是Java中最經常使用的一個map類,性能好,速度快,可是不能保證線程安全,它可用null做爲key/value。

  在多線程環境下,使用hashMap進行put操做會引發死循環,是由於多線程會致使hashMap的entry鏈表造成環,一旦成環,entry的next節點永遠不爲空,產生死循環。因此在併發狀況下不能使用hashMap。

2.效率低下的HashTable

  線程安全的Map類,其public方法均用synchronize修飾,這表示在多線程操做時,每一個線程在操做以前都會鎖住整個map,待操做完成後才釋放。

  如線程1使用put操做進行元素添加,線程2不但不能使用put方法進行添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低,這必然致使多線程時性能不佳。另外,hashTable不能使用null做爲key/value。

3. 鎖分段技術可有效提高併發訪問效率

  hashTable在競爭激烈的併發環境中表現出效率低下的緣由是所訪問hashTable的線程都必須競爭同一把鎖,假如容器中有多把鎖,每一把鎖用於鎖容器其中一部分的數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術

  - 首先將數據分紅一段一段地存儲;

  - 而後給每一個數據段配一把鎖;

  - 當一個線程佔用鎖訪問其中一個段數據時,其餘段的數據也能被其餘線程訪問。

  

  

  ConcurrentHashMap是由Segment數組和HashEntry數組組成。

  Segment是一種可重入鎖,在ConsurrentHashMap裏扮演鎖的角色;HashEntry則用於存儲鍵值對數據。

  一個ConsurrentHashMap裏包含一個Segment數組,Segment的機構和HashMap相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。如圖:

  

4.ConcurrentHashMap的初始化

  1)ConcurrentHashMap類中包含三個與Segment相關的成員變量:

    final int segmentMask;

    final int segmentShift;

    final Segment<K,V>[] segments;

  其中segments是Segment的原生數組,此數組的長度能夠在ConcurrentHashMap的構造函數中使用併發度參數指定,其默認值爲default_concurrency_level=16;segmentShift是用來計算segments數組索引的位移量,而segmentMask則是用來計算索引的掩碼值。

  例如併發度爲16時(即segments數組長度爲16),segmentShift爲32-4=28(由於2的4次冪爲16),而segmentMask則爲1111(二進制),索引的計算式以下:

    int j=(hash>>>segmentShift) & segmentMask;

  2)在多線程併發訪問一個共享變量時,爲了保證邏輯的正確,能夠採用如下方法:

    加鎖,性能最低,能保證原子性、可見性,防止指令重排;

    volatile修飾,性能中等,能保證原子性,防止指令重排;

    使用getObjectVolatile,性能最好,可防止指令重排;

   所以ConcurrentHashMap選擇了使用Unsafe的getObjectVolatile來讀取segments中的元素。

1 private Segment<K,V> segmentForHash(int h) { 2     long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 3     return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); 4 }

 

  3)Segment鎖

    Segment繼承了ReentrantLock(可重入鎖),所以它其實是一把鎖。在進行put、remove、replace、clear等須要改動內容的操做時,都要進行加鎖操做,其代碼通常是這樣的:

 1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 2     HashEntry<K,V> node = tryLock() ?  :  scanAndLockForPut(key, hash, value);
 3     V oldValue;
 4     try {
 5 //實際代碼……
 6         }
 7     } finally {
 8         unlock();
 9     }
10     return oldValue;
11 }

    首先調用tryLock,若是加鎖失敗,則進入scanAndLockForPut(key,hash,value),該方法其實是先自旋等待其餘線程解鎖,直至指定的次數MAX_SCAN_RETRIES;若自旋過程當中,其餘線程釋放了鎖,致使本線程直接得到了鎖,就避免了本線程進入等待鎖的場景,提升了效率。若自旋必定次數後,仍未獲取鎖,則調用lock方法進入等待鎖的場景。

  優勢:採用這種自旋鎖和獨佔鎖結合的方法,在不少場景下可以提升Segment併發操做數據的效率。

  初始化方法是經過initialCapacity、loadFactor和concurrencyLevel等參數來初始化segment數組,段偏移量segmentShift、段掩碼segmentMask和每一個segment裏的HashEntry數組來實現的。

  4)初始化segments數組 

 1  if (concurrencyLevel > MAX_SEGMENTS)  2             concurrencyLevel = MAX_SEGMENTS;  3             int sshift = 0;  4             int ssize = 1;  5             while (ssize < concurrencyLevel) {  6                     ++sshift;  7                     ssize <<= 1;  8  }  9        segmentShift = 32 - sshift; 10        segmentMask = ssize - 1; 11        this.segments = Segment.newArray(ssize);

 

    segments數組的長度ssize是經過concurrencyLevel計算得出的;爲了能經過按位與的散列算法來定位segments數組的索引,必須保證segments數組的長度爲2的N次方,因此必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來做爲segments數組的長度。concurrencyLevel的最大值爲65535,這意味着segments數組的長度最大爲65536,對應的二進制是16位。

  5)初始化segmentShift和segmentMask

    這兩個全局變量須要在定位segment時的散列算法裏使用;sshift等於ssize從1向左移位的次數,默認concurrencyLevel等於16,1須要向左位移動4次,因此sshift爲4.

    segmentShift用於定位參與散列算法的位數,segmentShift等於32減sshift,因此等於28。這裏之因此是32,是由於ConcurrentHashMap裏的hash()方法輸出的最大位數爲32位。

    segmentMask是散列運算的掩碼,等於ssize減1,即15;掩碼的二進制各個位的值都是1,由於ssize的最大長度爲65536,因此segmentShift最大值是16,segmentMask最大值是65535,對應的二進制爲16位,每一個位都是1。

  6)初始化每一個segment

    輸入參數initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每一個segment的負載因子,在構造方法裏須要經過這兩個參數來初始化數組中的每一個segment。

 1    if (initialCapacity > MAXIMUM_CAPACITY)  2             initialCapacity = MAXIMUM_CAPACITY;  3         int c = initialCapacity / ssize;  4         if (c * ssize < initialCapacity)  5             ++c;  6         int cap = 1;  7         while (cap < c)  8             cap <<= 1;  9         for (int i = 0; i < this.segments.length; ++i) 10             this.segments[i] = new Segment<K, V>(cap, loadFactor);

    上面代碼中的變量cap就是segment裏HashEntry數組的長度,它等於initialCapacity除以ssize的倍數c,若是c大於1,就會取大於等於c的2的N次方值,因此cap不是1,就是2的N次方。

    segment的容量threshold=(int) cap*loadFactor,默認initialCapacity等於16,loadfactor等於0.75,經過運算cap等於1,threshold等於零。

  7)定位Segment

    既然ConcurrentHashMap使用分段鎖Segment來保護不一樣段的數據,那麼在插入和獲取元素時,必須先經過散列算法定位到Segment。能夠看到ConcurrentHashMap會首先使用hash的變種算法對元素的hashcode進行一次再散列。

1 private static int hash(int h) { 2             h += (h << 15) ^ 0xffffcd7d; 3             h ^= (h >>> 10); 4             h += (h << 3); 5             h ^= (h >>> 6); 6             h += (h << 2) + (h << 14); 7             return h ^ (h >>> 16); 8         }

    進行再散列,是爲了減小散列衝突,使元素可以均勻地分佈在不一樣的Segment上,從而提升容器的存取效率。

    假如散列的質量差到極點,那麼全部元素都在一個Segment中,不只存區元素緩慢,分段鎖也會失去意義。

1 //ConcurrentHashMap經過如下散列算法定位segment
2 final Segment<K,V> segmentFor(int hash) { 3       return segments[(hash >>> segmentShift) & segmentMask]; 4 } 5 //默認狀況下segmentShift爲28,segmentMask爲15,再散列後的數最大是32位二進制數據,向右無符號移動28位,即讓高4位參與到散列運算中,(hash>>>segmentShift)&segmentMask的運算結果分別是四、1五、7和8,能夠看到散列值沒有發生衝突.

 

  8)HashEntry

 1 static final class HashEntry<K,V> {  2     final int hash;  3     final K key;  4     volatile V value;  5     volatile HashEntry<K,V> next;  6 
 7     HashEntry(int hash, K key, V value, HashEntry<K,V> next) {  8         this.hash = hash;  9         this.key = key; 10         this.value = value; 11         this.next = next; 12  } 13 final void setNext(HashEntry<K,V> n) { 14         UNSAFE.putOrderedObject(this, nextOffset, n); 15  } 16 
17     static final long nextOffset; 18     static { 19         try { 20             UNSAFE = sun.misc.Unsafe.getUnsafe(); 21             Class k = HashEntry.class; 22             nextOffset = UNSAFE.objectFieldOffset 23                 (k.getDeclaredField("next")); 24         } catch (Exception e) { 25             throw new Error(e); 26  } 27  } 28 } 29 @SuppressWarnings("unchecked") 30 static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { 31     return (tab == ) ? : 32         (HashEntry<K,V>) UNSAFE.getObjectVolatile 33         (tab, ((long)i << TSHIFT) + TBASE); 34 } 35 static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { 36         UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); 37 }

    與Segment相似,HashEntry使用UNSAFE.putOrderedObject來設置它的next成員變量,這樣既能夠提升性能,又能保持併發可見性。同時entryAt方法和setEntryAt方法也使用了UNSAFE.getObjectVolatile和UNSAFE.putOrderedObject來獲取和寫入指定索引的HashEntry。

    總之,Segment數組和HashEntry數組的讀取寫入通常都是使用UNSAFE。

5.ConcurrentHashMap的操做

  5.1 get操做

    先通過一次再散列,而後使用這個散列值經過散列運算定位到Segment,再經過散列算法定位到元素。

 1 public V get(Object key) {  2     Segment<K,V> s;  3     HashEntry<K,V>[] tab;  4     int h = hash(key);  5 //找到segment的地址 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;  6 //取出segment,並找到其hashtable if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
 7         (tab = s.table) != ) {  8 //遍歷此鏈表,直到找到對應的值 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
 9                  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != ; e = e.next) { 10  K k; 11                     if ((k = e.key) == key || (e.hash == h && key.equals(k))) 12                             return e.value; 13  } 14  } 15     return ; 16 }

    整個get方法不須要加鎖,只須要計算兩次hash值,而後遍歷一個單向鏈表(此鏈表長度平均小於2),所以get性能很高。高效之處在於整個過程不須要加鎖,除非讀到的值是空纔會加鎖重讀。

  HashTable容器的get方法是須要加鎖的,那ConcurrentHashMap的get操做是如何作到不加鎖的呢?

    緣由是它的get方法將要使用的共享變量都定義成了volatile類型,如用於統計當前Segment大小的count字段和用於存儲值得HashEntry的value。

    定義成volatile的變量,可以在線程之間保持可見性,可以被多線程同時讀,而且保證不會讀到過時的值,可是隻能被單線程寫(有一種狀況能夠被多線程寫,就是寫入的值不依賴於原值),在get操做裏只須要讀不須要寫共享變量count和value,因此能夠不用加鎖。

    之因此不會讀到過時的值,是由於根據Java內存模型的happen before原則,對volatile字段的寫操做先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能拿到最新的值,這是用volatile替換鎖的經典應用場景。

      transient volatile int count;

      volatile V value;

    在定位元素的代碼裏能夠實現,定位HashEntry和定位Segment的散列算法雖然同樣,都與數組的長度減去1再相「與」,可是相「與」的值不同。

    定位Segment使用的是元素的hashcode再散列後獲得的值的高位,定位HashEntry直接使用再散列後的值。其目的是避免兩次散列後的值同樣,雖然元素在Segment裏散列開了,可是卻沒有在HashEntry中散列開。

      hash>>>segmentShift & segmentMask;  //定位Segment所使用的hash算法

      int index=hash & (tab.length-1);  //定位HashEntry所使用的hash算法

  5.2 put操做

    因爲須要對共享變量進行寫操做,因此爲了線程安全,在操做共享變量時必須加鎖。put方法首先定位到Segment,而後在Segment裏進行插入操做。 

    插入操做須要經歷的兩個步驟:

      判斷是否須要對Segment裏的HashEntry數組進行擴容;定位添加元素的位置,而後將其放在HashEntry數組裏;

    1)是否須要擴容?

      在插入元素前會先判斷Segment裏的HashEntry數組是否超過容量,若是超過閾值,則對數組進行擴容。Segment的擴容判斷比HashMap更恰當,由於HashMap是在插入元素後判斷是否已經到達容量,若是到達了就進行擴容,可是頗有可能擴容以後沒有新元素插入,這時HashMap就進行了一次無效的擴容。

    2)如何擴容?

      在擴容時,首先會建立一個容量是原來兩倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組裏。爲了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個Segment擴容。

    put方法的第一步:計算segment數組的索引,並找到該segment,而後調用該segment的put方法。

 1 public V put(K key, V value) {  2     Segment<K,V> s;  3     if (value == )  4         throw new NullPointerException();  5     int hash = hash(key);  6 //計算segment數組的索引,並找到該segment int j = (hash >>> segmentShift) & segmentMask;
 7     if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
 8          (segments, (j << SSHIFT) + SBASE)) == ) // in ensureSegment
 9         s = ensureSegment(j); 10 //調用該segment的put方法 return s.put(key, hash, value, false);
11 }

    put方法的第二步:在Segment的put方法中進行操做。

 1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {  2 //調用tryLock()嘗試加鎖,若失敗則調用scanAndLockForPut進行加鎖,同時尋找key相應的節點node
 3     HashEntry<K,V> node = tryLock() ? :  4  scanAndLockForPut(key, hash, value);  5 //如下的代碼都運行在加鎖狀態
 6  V oldValue;  7     try {  8         HashEntry<K,V>[] tab = table;  9 //計算hash表的索引值,並取出HashEntry int index = (tab.length - 1) & hash;
10         HashEntry<K,V> first = entryAt(tab, index); 11 //遍歷此鏈表 for (HashEntry<K,V> e = first;;) { 12 //若是鏈表不爲空,在鏈表中尋找對應的node,找到後進行賦值,並退出循環 if (e != null) {
13  K k; 14                 if ((k = e.key) == key ||
15                     (e.hash == hash && key.equals(k))) { 16                     oldValue = e.value; 17                     if (!onlyIfAbsent) { 18                         e.value = value; 19                         ++modCount; 20  } 21                     break; 22  } 23                 e = e.next; 24  } 25 //若是在鏈表中沒有找到對應的node else { 26 //若是scanAndLockForPut方法中已經返回的對應的node,則將其插入first以前 if (node != null)
27  node.setNext(first); 28                 else //不然,new一個新的HashEntry
29                     node = new HashEntry<K,V>(hash, key, value, first); 30                 int c = count + 1; 31 //測試是否須要自動擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
32  rehash(node); 33                 else //設置node到Hash表的index索引處
34  setEntryAt(tab, index, node); 35                 ++modCount; 36                 count = c; 37                 oldValue = ; 38                 break; 39  } 40  } 41     } finally { 42  unlock(); 43  } 44     return oldValue; 45 }

  5.3 size操做

    要統計整個ConcurrentHashMap裏的元素的數量,就必須統計全部Segment裏元素的數量後計總。

    Segment裏的全局變量count是一個volatile,在併發場景下,是否是直接把全部的Segment的count相加就能夠獲得整個ConcurrentHashMap大小了呢?不是的。

    雖然相加時能夠獲取每一個Segment的count的最新值,可是可能累加前使用的count發生了變化,那麼統計結果就不許了。因此,最安全的作法就是在統計size時把全部Segment的put、remove和clear方法所有鎖住,可是這種作法顯然很是低效。

    由於在累加count操做過程當中,以前累加過的count發生變化的機率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖Segment的方式來統計各個Segment大小,若是統計的過程當中,count發生了變化,則再採用加鎖的方式來統計全部Segment的大小。

  那麼ConcurrentHashMap又是如何判斷在統計時容器是否發生了變化呢?

    使用modCount變量,在put、remove和clear方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生了變化。

相關文章
相關標籤/搜索