HashMap是非線程安全的,在併發場景中若是不保持足夠的同步,就有可能在執行HashMap.get時進入死循環,將CPU的消耗到100%。java
HashMap採用鏈表解決Hash衝突。由於是鏈表結構,那麼就很容易造成閉合的鏈路,這樣在循環的時候只要有線程對這個HashMap進行get操做就會產生死循環,node
單線程狀況下,只有一個線程對HashMap的數據結構進行操做,是不可能產生閉合的迴路的。c++
只有在多線程併發的狀況下才會出現這種狀況,那就是在put操做的時候,若是size>initialCapacity*loadFactor,hash表進行擴容,那麼這時候HashMap就會進行rehash操做,隨之HashMap的結構就會很大的變化。頗有可能就是在兩個線程在這個時候同時觸發了rehash操做,產生了閉合的迴路。算法
推薦使用currentHashMap編程
多線程下[HashMap]的問題:數組
一、多線程put操做後,get操做致使死循環。
二、多線程put非NULL元素後,get操做獲得NULL值。
三、多線程put操做,致使元素丟失。安全
Java的HashMap是非線程安全的,因此在併發下必然出現問題,如下作詳細的解釋:數據結構
從前咱們的Java代碼由於一些緣由使用了HashMap這個東西,可是當時的程序是單線程的,一切都沒有問題。由於考慮到程序性能,因此須要變成多線程的,因而,變成多線程後到了線上,發現程序常常佔了100%的CPU,查看堆棧,你會發現程序都Hang在了HashMap.get()這個方法上了,重啓程序後問題消失。可是過段時間又會來。並且,這個問題在測試環境裏可能很難重現。多線程
咱們簡單的看一下咱們本身的代碼,咱們就知道HashMap被多個線程操做。而Java的文檔說HashMap是非線程安全的,應該用ConcurrentHashMap。併發
簡單地說一下HashMap這個經典的數據結構。
HashMap一般會用一個指針數組(假設爲table[])來作分散全部的key,當一個key被加入時,會經過Hash算法經過key算出這個數組的下標i,而後就把這個<key, value>插到table[i]中,若是有兩個不一樣的key被算在了同一個i,那麼就叫衝突,又叫碰撞,這樣會在table[i]上造成一個鏈表。
咱們知道,若是table[]的尺寸很小,好比只有2個,若是要放進10個keys的話,那麼碰撞很是頻繁,因而一個O(1)的查找算法,就變成了鏈表遍歷,性能變成了O(n),這是Hash表的缺陷(可參看《Hash Collision DoS 問題》)。
因此,Hash表的尺寸和容量很是的重要。通常來講,Hash表這個容器當有數據要插入時,都會檢查容量有沒有超過設定的thredhold,若是超過,須要增大Hash表的尺寸,這樣一來,整個Hash表裏的無素都須要被重算一遍。這叫rehash,這個成本至關的大。
下面,咱們來看一下Java的HashMap的源代碼。
Put一個Key,Value對到Hash表中:
public V put(K key, V value) { ...... //算Hash值 int hash = hash(key.hashCode()); int i = indexFor(hash, table.length); //若是該key已被插入,則替換掉舊的value (連接操做) for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; //該key不存在,須要增長一個結點 addEntry(hash, key, value, i); return null; }
檢查容量是否超標
void addEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<K,V>(hash, key, value, e); //查看當前的size是否超過了咱們設定的閾值threshold,若是超過,須要resize if (size++ >= threshold) resize(2 * table.length); }
新建一個更大尺寸的hash表,而後把數據從老的Hash表中遷移到新的Hash表中。
void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; ...... //建立一個新的Hash Table Entry[] newTable = new Entry[newCapacity]; //將Old Hash Table上的數據遷移到New Hash Table上 transfer(newTable); table = newTable; threshold = (int)(newCapacity * loadFactor); }
遷移的源代碼,注意高亮處:
void transfer(Entry[] newTable) { Entry[] src = table; int newCapacity = newTable.length; //下面這段代碼的意思是: // 從OldTable裏摘一個元素出來,而後放到NewTable中 for (int j = 0; j < src.length; j++) { Entry<K,V> e = src[j]; if (e != null) { src[j] = null; do { Entry<K,V> next = e.next; int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } while (e != null); } } }
好了,這個代碼算是比較正常的。並且沒有什麼問題。
畫了個圖作了個演示。
併發下的Rehash
1)假設咱們有兩個線程。我用紅色和淺藍色標註了一下。
咱們再回頭看一下咱們的 transfer代碼中的這個細節:
而咱們的線程二執行完成了。因而咱們有下面的這個樣子。
注意,由於Thread1的 e 指向了key(3),而next指向了key(7),其在線程二rehash後,指向了線程二重組後的鏈表。咱們能夠看到鏈表的順序被反轉後。
2)線程一被調度回來執行。
3)一切安好。
線程一接着工做。把key(7)摘下來,放到newTable[i]的第一個,而後把e和next往下移。
4)環形連接出現。
e.next = newTable[i] 致使 key(3).next 指向了 key(7)
注意:此時的key(7).next 已經指向了key(3), 環形鏈表就這樣出現了。
因而,當咱們的線程一調用到,HashTable.get(11)時,悲劇就出現了——Infinite Loop。
有人把這個問題報給了Sun,不過Sun不認爲這個是一個問題。由於HashMap原本就不支持併發。要併發就用ConcurrentHashmap
我在這裏把這個事情記錄下來,只是爲了讓你們瞭解並體會一下併發環境下的危險。
ConcurrentHashMap的讀取併發,由於在讀取的大多數時候都沒有用到鎖定,因此讀取操做幾乎是徹底的併發操做,而寫操做鎖定的粒度又很是細。只有在求size等操做時才須要鎖定整個表。而在迭代時,ConcurrentHashMap使用了不一樣於傳統集合的快速失敗迭代器的弱一致迭代器。在這種迭代方式中,當iterator被建立後集合再發生改變就再也不是拋出ConcurrentModificationException,取而代之的是在改變時new新的數據從而不影響原有的數據,iterator完成後再將頭指針替換爲新的數據,這樣iterator線程可使用原來老的數據,而寫線程也能夠併發的完成改變,更重要的,這保證了多個線程併發執行的連續性和擴展性,是性能提高的關鍵。
Hashtable繼承的是Dictionary(Hashtable是其惟一公開的子類),HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法時,其餘線程訪問HashTable的同步方法時,可能會進入阻塞或輪詢狀態。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,而且也不能使用get方法來獲取元素,因此競爭越激烈效率越低。
Hashtable的實現方式---鎖整個hash表;而ConcurrentHashMap的實現方式---鎖桶(或段)
HashTable容器在競爭激烈的併發環境下表現出效率低下的緣由,是由於全部訪問HashTable的線程都必須競爭同一把鎖,那假如容器裏有多把鎖,每一把鎖用於鎖容器其中一部分數據,那麼當多線程訪問容器裏不一樣數據段的數據時,線程間就不會存在鎖競爭,從而能夠有效的提升併發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。
從上面看出,ConcurrentHashMap定位一個元素的過程須要進行兩次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部,所以,這一種結構的帶來的反作用是Hash的過程要比普通的HashMap要長,可是帶來的好處是寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segment,這樣,在最理想的狀況下,ConcurrentHashMap能夠最高同時支持Segment數量大小的寫操做(恰好這些寫操做都很是平均地分佈在全部的Segment上),併發能力大大提升。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖ReentrantLock,在ConcurrentHashMap裏扮演鎖的角色,HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組,Segment的結構和HashMap相似,是一種數組和鏈表結構, 一個Segment裏包含一個HashEntry數組,每一個HashEntry是一個鏈表結構的元素, 每一個Segment守護者一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到它對應的Segment鎖。
3、ConcurrentHashMap實現原理
鎖分離 (Lock Stripping)
ConcurrentHashMap內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的hash table,它們有本身的鎖。只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。一樣當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。
ConcurrentHashMap有些方法須要跨段,好比size()和containsValue(),它們可能須要鎖定整個表而而不只僅是某個段,這須要按順序鎖定全部段,操做完畢後,又按順序釋放全部段的鎖。這裏"按順序"是很重要的,不然極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,而且其成員變量實際上也是final的,可是,僅僅是將數組聲明爲final的並不保證數組成員也是final的,這須要實現上的保證。這能夠確保不會出現死鎖,由於得到鎖的順序是固定的。不變性是多線程編程佔有很重要的地位,下面還要談到。
final Segment<K,V>[] segments;
不變(Immutable)和易變(Volatile)
ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。若是使用傳統的技術,如HashMap中的實現,若是容許能夠在hash鏈的中間添加或刪除元素,讀操做不加鎖將獲得不一致的數據。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,其結構以下所示:
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
}
能夠看到除了value不是final的,其它值都是final的,爲了防止鏈表結構被破壞,出現ConcurrentModification的狀況。這意味着不能從hash鏈的中間或尾部添加或刪除節點,由於這須要修改next引用值,全部的節點的修改只能從頭部開始。對於put操做,能夠一概添加到Hash鏈的頭部。可是對於remove操做,可能須要從中間刪除一個節點,這就須要將要刪除節點的前面全部節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點,爲了確保讀操做可以看到最新的值,將value設置成volatile,這避免了加鎖。
下面咱們來結合源代碼來具體分析一下ConcurrentHashMap的實現,先看下初始化方法:
CurrentHashMap的初始化一共有三個參數,一個initialCapacity,表示初始的容量,一個loadFactor,表示負載參數,最後一個是concurrentLevel,表明ConcurrentHashMap內部的Segment的數量,ConcurrentLevel一經指定,不可改變,後續若是ConcurrentHashMap的元素數量增長致使ConrruentHashMap須要擴容,ConcurrentHashMap不會增長Segment的數量,而只會增長Segment中鏈表數組的容量大小,這樣的好處是擴容過程不須要對整個ConcurrentHashMap作rehash,而只須要對Segment裏面的元素作一次rehash就能夠了。
整個ConcurrentHashMap的初始化方法仍是很是簡單的,先是根據concurrentLevel來new出Segment,這裏Segment的數量是不大於concurrentLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操做來進行hash,加快hash的過程。接下來就是根據intialCapacity肯定Segment的容量的大小,每個Segment的容量大小也是2的指數,一樣使爲了加快hash的過程。
這邊須要特別注意一下兩個變量,分別是segmentShift和segmentMask,這兩個變量在後面將會起到很大的做用,假設構造函數肯定了Segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。
前面提到過ConcurrentHashMap的get操做是不用加鎖的,咱們這裏看一下其實現:
public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
segmentFor這個函數用於肯定操做應該在哪個segment中進行,幾乎對ConcurrentHashMap的全部操做都須要用到這個函數,咱們看下這個函數的實現:
final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
這個函數用了位操做來肯定Segment,根據傳入的hash值向右無符號右移segmentShift位,而後和segmentMask進行與操做,結合咱們以前說的segmentShift和segmentMask的值,就能夠得出如下結論:假設Segment的數量是2的n次方,根據元素的hash值的高n位就能夠肯定元素到底在哪個Segment中。
在肯定了須要在哪個segment中進行操做之後,接下來的事情就是調用對應的Segment的get方法:
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
get操做不須要鎖。第一步是訪問count變量,這是一個volatile變量,因爲全部的修改操做在進行結構修改時都會在最後一步寫count變量,經過這種機制保證get操做可以獲得幾乎最新的結構更新。對於非結構更新,也就是結點值的改變,因爲HashEntry的value變量是volatile的,也能保證讀取到最新的值。接下來就是對hash鏈進行遍歷找到要獲取的結點,若是沒有找到,直接訪回null。對hash鏈進行遍歷不須要加鎖的緣由在於鏈指針next是final的。可是頭指針卻不是final的,這是經過getFirst(hash)方法返回,也就是存在table數組中的值。這使得getFirst(hash)可能返回過期的頭結點,例如,當執行get方法時,剛執行完getFirst(hash)以後,另外一個線程執行了刪除操做並更新頭結點,這就致使get方法中返回的頭結點不是最新的。這是能夠容許,經過對count變量的協調機制,get能讀取到幾乎最新的數據,雖然可能不是最新的。要獲得最新的數據,只有採用徹底的同步。
V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
最後,若是找到了所求的結點,判斷它的值若是非空就直接返回,不然在有鎖的狀態下再讀一次。這彷佛有些費解,理論上結點的值不可能爲空,這是由於put的時候就進行了判斷,若是爲空就要拋NullPointerException。空值的惟一源頭就是HashEntry中的默認值,由於HashEntry中的value不是final的,非同步讀取有可能讀取到空值。仔細看下put操做的語句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在這條語句中,HashEntry構造函數中對value的賦值以及對tab[index]的賦值可能被從新排序,這就可能致使結點的值爲空。這種狀況應當很罕見,一旦發生這種狀況,ConcurrentHashMap採起的方式是在持有鎖的狀況下再讀一遍,這可以保證讀到最新的值,而且必定不會爲空值。
由於實際上put、remove等操做也會更新count的值,因此當競爭發生的時候,volatile的語義能夠保證寫操做在讀操做以前,也就保證了寫操做對後續的讀操做都是可見的,這樣後面get的後續操做就能夠拿到完整的元素內容。
看完了get操做,再看下put操做,put操做的前面也是肯定Segment的過程,直接看關鍵的segment的put方法:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock(); //加鎖
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash(); //看是否須要rehash
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index]; //肯定鏈表頭部的位置
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) { //若是存在,替換掉value
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount; //修改modCount和count?
tab[index] = new HashEntry<K,V>(key, hash, first, value); //建立一個新的結點並添加到hash鏈的頭部
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
首先對Segment的put操做是加鎖完成的,而後在第五行,若是Segment中元素的數量超過了閾值(由構造函數中的loadFactor算出)這須要進行對Segment擴容,而且要進行rehash。
第8和第9行的操做就是getFirst的過程,肯定鏈表頭部的位置。
第11行這裏的這個while循環是在鏈表中尋找和要put的元素相同key的元素,若是找到,就直接更新更新key的value,若是沒有找到,則進入21行這裏,生成一個新的HashEntry而且把它加到整個Segment的頭部,而後再更新count的值。
修改操做還有putAll和replace。putAll就是屢次調用put方法。replace甚至不用作結構上的更改,實現要比put和delete要簡單得多.
Remove操做的前面一部分和前面的get和put操做同樣,都是定位Segment的過程,而後再調用Segment的remove方法:
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; //空白行以前的行主要是定位到要刪除的節點e V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
整個操做是先定位到段,而後委託給段的remove操做。當多個刪除操做併發進行時,只要它們所在的段不相同,它們就能夠同時進行。下面是Segment的remove方法實現
首先remove操做也是肯定須要刪除的元素的位置,HashEntry中的next是final的,一經賦值之後就不可修改,在定位到待刪除元素的位置之後,程序就將待刪除元素前面的那一些元素所有複製一遍,而後再一個一個從新接到鏈表上去.
將e前面的結點複製一遍,尾結點指向e的下一個結點。e後面的結點不須要複製,它們能夠重用.
假設鏈表中原來的元素如上圖所示,如今要刪除元素3,那麼刪除元素3之後的鏈表就以下圖所示:
在前面的章節中,咱們涉及到的操做都是在單個Segment中進行的,可是ConcurrentHashMap有一些操做是在多個Segment中進行,好比size操做,ConcurrentHashMap的size操做也採用了一種比較巧的方式,來儘可能避免對全部的Segment都加鎖。
前面咱們提到了一個Segment中的有一個modCount變量,表明的是對Segment中元素的數量形成影響的操做的次數,這個值只增不減,size操做就是遍歷了兩次Segment,每次記錄Segment的modCount值,而後將兩次的modCount進行比較,若是相同,則表示期間沒有發生過寫入操做,就將原先遍歷的結果返回,若是不相同,則把這個過程再重複作一次,若是再不相同,則就須要將全部的Segment都鎖住,而後一個一個遍歷了.
參考:http://www.iteye.com/topic/344876