1、HashMap分析node
在JDK1.8以前,hashMap由數組+鏈表組成,1.8以後,對hashMap進行了一些修改,最大的不一樣就是利用了紅黑樹,因此其由數組+鏈表+紅黑樹組成。查找時,根據hash值咱們可以快速定位到數組的具體下標,可是以後的話,須要順着鏈表一個個比較下去才能找到咱們須要的,時間複雜度取決於鏈表的長度爲O(n),爲了下降這部分的開銷,在Java8中,當鏈表中的元素達到了8個時,會將鏈表轉換爲紅黑樹,在這些位置進行查找時能夠下降時間複雜度爲O(logn)。算法
1.put過程:(JDK1.8)數組
第一次put值時,會觸發resize(),相似Java7的第一次put也是要初始化數組長度的。安全
第一次resize和後續的擴容有些不同,由於此次是數組從null初始化到默認的16或自定義的初始容量,找到具體的數據下標,若是此位置沒有值,那麼直接初始化一下Node並放置在這個位置就能夠了。若是數組改成只有數據:首先,判斷該位置的第一個數據和咱們要插入的數據,key是否是「相等」,若是是,取出這個節點,若是該節點是表明紅黑樹的節點,調用紅黑樹的插值方法,插入到鏈表的最後面(Java7是插入到鏈表的最前面),當treeify_threshold爲8時,若是新插入的值是鏈表中的第8個,會觸發下面的treeifyBin,也就是將鏈表轉換爲紅黑樹;若是在該鏈表中找到了「相等」的key(==或equals);若是hashMap因爲新插入這個值致使size已經超過了閾值,則須要進行擴容。多線程
JDK1.7併發
當插入第一個元素時,須要先初始化數組大小;app
1)求key的hash值(數組索引 index=hash&(size-1));ssh
2)找到對應的數組下標;
函數
3)遍歷一下對應下標處的鏈表,看是否有重複的key已經存在,若是有,直接覆蓋,put方法返回舊值就結束了;高併發
4)不存在重複的key,將此entry添加到鏈表中。
2.數組擴容(當前的size已經達到了閾值,而且要插入的數組位置上已經有元素,那麼就會觸發擴容,擴容後,數組大小爲原來的2倍)(JDK1.8)
resize()方法用於初始化數組或數組擴容,每次擴容後,容量爲原來的2倍,並進行數據遷移。
1)對數組進行擴容,將數組擴大一倍,將閾值擴大一倍;
2)第一次put時初始化數組;
3)開始遍歷數組進行數據遷移;
若是該數組位置上只有單個元素:那就簡單了,直接遷移這個元素就能夠了。
若是是鏈表:須要將此鏈表拆成兩個鏈表,放到新的數組中,而且保留原來的前後順序,loHead,loTail對應一條鏈表,hiHead、hiTail對應另外一條鏈表。
JDK 1.7
擴容就是一個新的大數組替換原來的小數組,並將原來數組中的值遷移到新的數組中。
因爲是雙倍擴容,遷移過程當中,會將原來table[i] 中的鏈表的全部節點,分拆到新的數組的newTable[i] 和 newTable[i+oldLength]位置上。如原來數組長度爲16,那麼擴容後,原來table[0]出的鏈表中的全部元素會被分配到新數組中 newTable[0] 和 newTable[16] 這兩個位置。
3.get過程(JDK1.8)
1)計算key的hash值,根據hash值找到對應的數組下標:hash&(length-1)
2)判斷數組該位置處的元素是否恰好是咱們要找的,若是不是,走第三步;
3)判斷該元素類型是不是TreeNode,若是是,用紅黑樹的方法取數據,若是不是,走第四步;
4)遍歷鏈表,直到找到相等(==或equals)的key。
JDK 1.7
1)根據key計算hash值;
2)找到對應的數組下標:hash&(length-1)
3)遍歷該數組位置處的鏈表,直到找到相等的key。
4.爲何HashMap線程不安全?(hash衝突和擴容致使的)
hashMap的實現使用了一個數組,每一個數組項裏面由一個鏈表方式來實現,由於hashMap使用key的hashcode來尋找存儲位置,不一樣的key可能具備相同的hashcode,這時就出現了哈希衝突,也叫哈希碰撞。爲了解決哈希衝突,有開放地址法,以及鏈地址法。hashMap的實現選取了鏈地址法,也就是將哈希值同樣的entry保存在同一個數組裏面,能夠把一個數組項看成一個桶,桶裏面裝的entry的key的hashcode是同樣的。
擴容致使的不安全:
1)put時致使的多線程數據不一致。好比有兩個線程A和B,首先A但願插入一個key-value對到hashMap中,首先計算記錄所要落到的桶裏面的鏈表頭結點,此時線程A的時間片用完了,而此時線程B被調用得以執行,和線程A同樣執行,只不過線程B成功將記錄插到了桶裏面,假設線程A插入的記錄計算出來的桶所引和線程B要插入的記錄計算出來的桶索引是同樣的,那麼當線程B成功插入後,線程A再次被調用運行時,它依然持有過時的鏈表頭可是它對此一無所知,以致於它認爲應該這樣作,如此一來就覆蓋了線程B插入的記錄,這樣線程B插入的記錄就憑空消失了,形成了數據不一致的行爲。
2)get:擴容時將數組擴爲二倍後,原數組中的數組的索引會發生變化,在進行get時仍然用原來的索引進行尋找,致使找不到要尋找的值。
2、ConcurrentHashMap分析
ConcurrentHashMap是線程安全且高效的HashMap。
1.線程不安全的HashMap
hashMap是Java中最經常使用的一個map類,性能好,速度快,可是不能保證線程安全,它可用null做爲key/value。
在多線程環境下,使用hashMap進行put操做會引發死循環,是由於多線程會致使hashMap的entry鏈表造成環,一旦成環,entry的next節點永遠不爲空,產生死循環。因此在併發狀況下不能使用hashMap。
2.效率低下的HashTable
線程安全的Map類,其public方法均用synchronize修飾,這表示在多線程操做時,每一個線程在操做以前都會鎖住整個map,待操做完成後才釋放。
如線程1使用put操做進行元素添加,線程2不但不能使用put方法進行添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低,這必然致使多線程時性能不佳。另外,hashTable不能使用null做爲key/value。
3. 鎖分段技術可有效提高併發訪問效率
hashTable在競爭激烈的併發環境中表現出效率低下的緣由是所訪問hashTable的線程都必須競爭同一把鎖,假如容器中有多把鎖,每一把鎖用於鎖容器其中一部分的數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。
- 首先將數據分紅一段一段地存儲;
- 而後給每一個數據段配一把鎖;
- 當一個線程佔用鎖訪問其中一個段數據時,其餘段的數據也能被其餘線程訪問。
ConcurrentHashMap是由Segment數組和HashEntry數組組成。
Segment是一種可重入鎖,在ConsurrentHashMap裏扮演鎖的角色;HashEntry則用於存儲鍵值對數據。
一個ConsurrentHashMap裏包含一個Segment數組,Segment的機構和HashMap相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。如圖:
4.ConcurrentHashMap的初始化
1)ConcurrentHashMap類中包含三個與Segment相關的成員變量:
final int segmentMask;
final int segmentShift;
final Segment<K,V>[] segments;
其中segments是Segment的原生數組,此數組的長度能夠在ConcurrentHashMap的構造函數中使用併發度參數指定,其默認值爲default_concurrency_level=16;segmentShift是用來計算segments數組索引的位移量,而segmentMask則是用來計算索引的掩碼值。
例如併發度爲16時(即segments數組長度爲16),segmentShift爲32-4=28(由於2的4次冪爲16),而segmentMask則爲1111(二進制),索引的計算式以下:
int j=(hash>>>segmentShift) & segmentMask;
2)在多線程併發訪問一個共享變量時,爲了保證邏輯的正確,能夠採用如下方法:
加鎖,性能最低,能保證原子性、可見性,防止指令重排;
volatile修飾,性能中等,能保證原子性,防止指令重排;
使用getObjectVolatile,性能最好,可防止指令重排;
所以ConcurrentHashMap選擇了使用Unsafe的getObjectVolatile來讀取segments中的元素。
1 private Segment<K,V> segmentForHash(int h) { 2 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 3 return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); 4 }
3)Segment鎖
Segment繼承了ReentrantLock(可重入鎖),所以它其實是一把鎖。在進行put、remove、replace、clear等須要改動內容的操做時,都要進行加鎖操做,其代碼通常是這樣的:
1 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 2 HashEntry<K,V> node = tryLock() ? : scanAndLockForPut(key, hash, value); 3 V oldValue; 4 try { 5 //實際代碼…… 6 } 7 } finally { 8 unlock(); 9 } 10 return oldValue; 11 }
首先調用tryLock,若是加鎖失敗,則進入scanAndLockForPut(key,hash,value),該方法其實是先自旋等待其餘線程解鎖,直至指定的次數MAX_SCAN_RETRIES;若自旋過程當中,其餘線程釋放了鎖,致使本線程直接得到了鎖,就避免了本線程進入等待鎖的場景,提升了效率。若自旋必定次數後,仍未獲取鎖,則調用lock方法進入等待鎖的場景。
優勢:採用這種自旋鎖和獨佔鎖結合的方法,在不少場景下可以提升Segment併發操做數據的效率。
初始化方法是經過initialCapacity、loadFactor和concurrencyLevel等參數來初始化segment數組,段偏移量segmentShift、段掩碼segmentMask和每一個segment裏的HashEntry數組來實現的。
4)初始化segments數組
1 if (concurrencyLevel > MAX_SEGMENTS) 2 concurrencyLevel = MAX_SEGMENTS; 3 int sshift = 0; 4 int ssize = 1; 5 while (ssize < concurrencyLevel) { 6 ++sshift; 7 ssize <<= 1; 8 } 9 segmentShift = 32 - sshift; 10 segmentMask = ssize - 1; 11 this.segments = Segment.newArray(ssize);
segments數組的長度ssize是經過concurrencyLevel計算得出的;爲了能經過按位與的散列算法來定位segments數組的索引,必須保證segments數組的長度爲2的N次方,因此必須計算出一個大於或等於concurrencyLevel的最小的2的N次方值來做爲segments數組的長度。concurrencyLevel的最大值爲65535,這意味着segments數組的長度最大爲65536,對應的二進制是16位。
5)初始化segmentShift和segmentMask
這兩個全局變量須要在定位segment時的散列算法裏使用;sshift等於ssize從1向左移位的次數,默認concurrencyLevel等於16,1須要向左位移動4次,因此sshift爲4.
segmentShift用於定位參與散列算法的位數,segmentShift等於32減sshift,因此等於28。這裏之因此是32,是由於ConcurrentHashMap裏的hash()方法輸出的最大位數爲32位。
segmentMask是散列運算的掩碼,等於ssize減1,即15;掩碼的二進制各個位的值都是1,由於ssize的最大長度爲65536,因此segmentShift最大值是16,segmentMask最大值是65535,對應的二進制爲16位,每一個位都是1。
6)初始化每一個segment
輸入參數initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每一個segment的負載因子,在構造方法裏須要經過這兩個參數來初始化數組中的每一個segment。
1 if (initialCapacity > MAXIMUM_CAPACITY) 2 initialCapacity = MAXIMUM_CAPACITY; 3 int c = initialCapacity / ssize; 4 if (c * ssize < initialCapacity) 5 ++c; 6 int cap = 1; 7 while (cap < c) 8 cap <<= 1; 9 for (int i = 0; i < this.segments.length; ++i) 10 this.segments[i] = new Segment<K, V>(cap, loadFactor);
上面代碼中的變量cap就是segment裏HashEntry數組的長度,它等於initialCapacity除以ssize的倍數c,若是c大於1,就會取大於等於c的2的N次方值,因此cap不是1,就是2的N次方。
segment的容量threshold=(int) cap*loadFactor,默認initialCapacity等於16,loadfactor等於0.75,經過運算cap等於1,threshold等於零。
7)定位Segment
既然ConcurrentHashMap使用分段鎖Segment來保護不一樣段的數據,那麼在插入和獲取元素時,必須先經過散列算法定位到Segment。能夠看到ConcurrentHashMap會首先使用hash的變種算法對元素的hashcode進行一次再散列。
1 private static int hash(int h) { 2 h += (h << 15) ^ 0xffffcd7d; 3 h ^= (h >>> 10); 4 h += (h << 3); 5 h ^= (h >>> 6); 6 h += (h << 2) + (h << 14); 7 return h ^ (h >>> 16); 8 }
進行再散列,是爲了減小散列衝突,使元素可以均勻地分佈在不一樣的Segment上,從而提升容器的存取效率。
假如散列的質量差到極點,那麼全部元素都在一個Segment中,不只存區元素緩慢,分段鎖也會失去意義。
1 //ConcurrentHashMap經過如下散列算法定位segment
2 final Segment<K,V> segmentFor(int hash) { 3 return segments[(hash >>> segmentShift) & segmentMask]; 4 } 5 //默認狀況下segmentShift爲28,segmentMask爲15,再散列後的數最大是32位二進制數據,向右無符號移動28位,即讓高4位參與到散列運算中,(hash>>>segmentShift)&segmentMask的運算結果分別是四、1五、7和8,能夠看到散列值沒有發生衝突.
8)HashEntry
1 static final class HashEntry<K,V> { 2 final int hash; 3 final K key; 4 volatile V value; 5 volatile HashEntry<K,V> next; 6
7 HashEntry(int hash, K key, V value, HashEntry<K,V> next) { 8 this.hash = hash; 9 this.key = key; 10 this.value = value; 11 this.next = next; 12 } 13 final void setNext(HashEntry<K,V> n) { 14 UNSAFE.putOrderedObject(this, nextOffset, n); 15 } 16
17 static final long nextOffset; 18 static { 19 try { 20 UNSAFE = sun.misc.Unsafe.getUnsafe(); 21 Class k = HashEntry.class; 22 nextOffset = UNSAFE.objectFieldOffset 23 (k.getDeclaredField("next")); 24 } catch (Exception e) { 25 throw new Error(e); 26 } 27 } 28 } 29 @SuppressWarnings("unchecked") 30 static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { 31 return (tab == ) ? : 32 (HashEntry<K,V>) UNSAFE.getObjectVolatile 33 (tab, ((long)i << TSHIFT) + TBASE); 34 } 35 static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { 36 UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); 37 }
與Segment相似,HashEntry使用UNSAFE.putOrderedObject來設置它的next成員變量,這樣既能夠提升性能,又能保持併發可見性。同時entryAt方法和setEntryAt方法也使用了UNSAFE.getObjectVolatile和UNSAFE.putOrderedObject來獲取和寫入指定索引的HashEntry。
總之,Segment數組和HashEntry數組的讀取寫入通常都是使用UNSAFE。
5.ConcurrentHashMap的操做
5.1 get操做
先通過一次再散列,而後使用這個散列值經過散列運算定位到Segment,再經過散列算法定位到元素。
1 public V get(Object key) { 2 Segment<K,V> s; 3 HashEntry<K,V>[] tab; 4 int h = hash(key); 5 //找到segment的地址 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 6 //取出segment,並找到其hashtable if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
7 (tab = s.table) != ) { 8 //遍歷此鏈表,直到找到對應的值 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
9 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != ; e = e.next) { 10 K k; 11 if ((k = e.key) == key || (e.hash == h && key.equals(k))) 12 return e.value; 13 } 14 } 15 return ; 16 }
整個get方法不須要加鎖,只須要計算兩次hash值,而後遍歷一個單向鏈表(此鏈表長度平均小於2),所以get性能很高。高效之處在於整個過程不須要加鎖,除非讀到的值是空纔會加鎖重讀。
HashTable容器的get方法是須要加鎖的,那ConcurrentHashMap的get操做是如何作到不加鎖的呢?
緣由是它的get方法將要使用的共享變量都定義成了volatile類型,如用於統計當前Segment大小的count字段和用於存儲值得HashEntry的value。
定義成volatile的變量,可以在線程之間保持可見性,可以被多線程同時讀,而且保證不會讀到過時的值,可是隻能被單線程寫(有一種狀況能夠被多線程寫,就是寫入的值不依賴於原值),在get操做裏只須要讀不須要寫共享變量count和value,因此能夠不用加鎖。
之因此不會讀到過時的值,是由於根據Java內存模型的happen before原則,對volatile字段的寫操做先於讀操做,即便兩個線程同時修改和獲取volatile變量,get操做也能拿到最新的值,這是用volatile替換鎖的經典應用場景。
transient volatile int count;
volatile V value;
在定位元素的代碼裏能夠實現,定位HashEntry和定位Segment的散列算法雖然同樣,都與數組的長度減去1再相「與」,可是相「與」的值不同。
定位Segment使用的是元素的hashcode再散列後獲得的值的高位,定位HashEntry直接使用再散列後的值。其目的是避免兩次散列後的值同樣,雖然元素在Segment裏散列開了,可是卻沒有在HashEntry中散列開。
hash>>>segmentShift & segmentMask; //定位Segment所使用的hash算法
int index=hash & (tab.length-1); //定位HashEntry所使用的hash算法
5.2 put操做
因爲須要對共享變量進行寫操做,因此爲了線程安全,在操做共享變量時必須加鎖。put方法首先定位到Segment,而後在Segment裏進行插入操做。
插入操做須要經歷的兩個步驟:
判斷是否須要對Segment裏的HashEntry數組進行擴容;定位添加元素的位置,而後將其放在HashEntry數組裏;
1)是否須要擴容?
在插入元素前會先判斷Segment裏的HashEntry數組是否超過容量,若是超過閾值,則對數組進行擴容。Segment的擴容判斷比HashMap更恰當,由於HashMap是在插入元素後判斷是否已經到達容量,若是到達了就進行擴容,可是頗有可能擴容以後沒有新元素插入,這時HashMap就進行了一次無效的擴容。
2)如何擴容?
在擴容時,首先會建立一個容量是原來兩倍的數組,而後將原數組裏的元素進行再散列後插入到新的數組裏。爲了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個Segment擴容。
put方法的第一步:計算segment數組的索引,並找到該segment,而後調用該segment的put方法。
1 public V put(K key, V value) { 2 Segment<K,V> s; 3 if (value == ) 4 throw new NullPointerException(); 5 int hash = hash(key); 6 //計算segment數組的索引,並找到該segment int j = (hash >>> segmentShift) & segmentMask;
7 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
8 (segments, (j << SSHIFT) + SBASE)) == ) // in ensureSegment
9 s = ensureSegment(j); 10 //調用該segment的put方法 return s.put(key, hash, value, false);
11 }
put方法的第二步:在Segment的put方法中進行操做。
1 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 2 //調用tryLock()嘗試加鎖,若失敗則調用scanAndLockForPut進行加鎖,同時尋找key相應的節點node
3 HashEntry<K,V> node = tryLock() ? : 4 scanAndLockForPut(key, hash, value); 5 //如下的代碼都運行在加鎖狀態
6 V oldValue; 7 try { 8 HashEntry<K,V>[] tab = table; 9 //計算hash表的索引值,並取出HashEntry int index = (tab.length - 1) & hash;
10 HashEntry<K,V> first = entryAt(tab, index); 11 //遍歷此鏈表 for (HashEntry<K,V> e = first;;) { 12 //若是鏈表不爲空,在鏈表中尋找對應的node,找到後進行賦值,並退出循環 if (e != null) {
13 K k; 14 if ((k = e.key) == key ||
15 (e.hash == hash && key.equals(k))) { 16 oldValue = e.value; 17 if (!onlyIfAbsent) { 18 e.value = value; 19 ++modCount; 20 } 21 break; 22 } 23 e = e.next; 24 } 25 //若是在鏈表中沒有找到對應的node else { 26 //若是scanAndLockForPut方法中已經返回的對應的node,則將其插入first以前 if (node != null)
27 node.setNext(first); 28 else //不然,new一個新的HashEntry
29 node = new HashEntry<K,V>(hash, key, value, first); 30 int c = count + 1; 31 //測試是否須要自動擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
32 rehash(node); 33 else //設置node到Hash表的index索引處
34 setEntryAt(tab, index, node); 35 ++modCount; 36 count = c; 37 oldValue = ; 38 break; 39 } 40 } 41 } finally { 42 unlock(); 43 } 44 return oldValue; 45 }
5.3 size操做
要統計整個ConcurrentHashMap裏的元素的數量,就必須統計全部Segment裏元素的數量後計總。
Segment裏的全局變量count是一個volatile,在併發場景下,是否是直接把全部的Segment的count相加就能夠獲得整個ConcurrentHashMap大小了呢?不是的。
雖然相加時能夠獲取每一個Segment的count的最新值,可是可能累加前使用的count發生了變化,那麼統計結果就不許了。因此,最安全的作法就是在統計size時把全部Segment的put、remove和clear方法所有鎖住,可是這種作法顯然很是低效。
由於在累加count操做過程當中,以前累加過的count發生變化的機率很是小,因此ConcurrentHashMap的作法是先嚐試2次經過不鎖Segment的方式來統計各個Segment大小,若是統計的過程當中,count發生了變化,則再採用加鎖的方式來統計全部Segment的大小。
那麼ConcurrentHashMap又是如何判斷在統計時容器是否發生了變化呢?
使用modCount變量,在put、remove和clear方法裏操做元素前都會將變量modCount進行加1,那麼在統計size先後比較modCount是否發生變化,從而得知容器的大小是否發生了變化。