jdk1.7.0_79node
HashMap能夠說是每一個Java程序員用的最多的數據結構之一了,無處不見它的身影。關於HashMap,一般也能說出它不是線程安全的。這篇文章要提到的是在多線程併發環境下的HashMap——ConcurrentHashMap,顯然它必然是線程安全的,一樣咱們不可避免的要討論散列表,以及它是如何實現線程安全的,它的效率又是怎樣的,由於對於映射容器還有一個Hashtable也是線程安全的但它彷佛只出如今筆試、面試題裏,在現實編碼中它已經基本被遺棄。程序員
關於HashMap的線程不安全,在多線程併發環境下它所帶來的影響毫不僅僅是出現髒數據等數據不一致的狀況,嚴重的是它有可能帶來程序死循環,這可能有點難以想象,但確實在不久前的項目裏同事有遇到了CPU100%滿負荷運行,分析結果是在多線程環境下HashMap致使程序死循環。對於Hashtable,查看其源碼可知,Hashtable保證線程安全的方式就是利用synchronized關鍵字,這樣會致使效率低下,但對於ConcurrentHashMap則採用了不一樣的線程安全保證方式——分段鎖。它不像Hashtable那樣將整個table鎖住而是將數組元素分段加鎖,若是線程1訪問的元素在分段segment1,而線程2訪問的元素在分段segment2,則它們互不影響能夠同時進行操做。若是合理的進行分段就是其關鍵問題。面試
ConcurrentHashMap和HashMap的結果基本一致,一樣也是Entry做爲存放數據的對象,另一個就是上面提到的分段鎖——Segment。它繼承自ReentrantLock(關於ReentrantLock,可參考《5.Lock接口及其實現ReentrantLock》),故它具備ReentrantLock一切特性——可重入,獨佔等。算法
ConcurrentHashMap的結構圖以下所示:數組
能夠看到相比較於HashMap,ConcurrentHashMap在Entry數組之上是Segment,這個就是咱們上面提到的分段鎖,合理的肯定分段數就能更好的提升併發效率,咱們來看ConcurrentHashMap是如何肯定分段數的。 安全
ConcurrentHashMap的初始化時經過其構造函數public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)完成的,若在不指定各參數的狀況下,初始容量initialCapacity=DAFAULT_INITIAL_CAPACITY=16,負載因子loadFactor=DEFAULT_LOAD_FACTOR=0.75f,併發等級concurrencyLevel=DEFAULT_CONCURRENCY_LEVEL=16,前二者和HashMap相同。至於負載因子表示一個散列表的空間的使用程度,initialCapacity(總容量) * loadFactor(負載因子) = 數據量,有此公式可知,若負載因子越大,則散列表的裝填程度越高,也就是能容納更多的元素,但這樣元素就多,鏈表就大,此時索引效率就會下降。若負載因子越小,則相反,索引效率就會高,換來的代價就是浪費的空間越多。併發等級它表示估計最多有多少個線程來共同修改這個Map,稍後能夠看到它和segment數組相關,segment數組的長度就是經過concurrencyLevel計算得出。數據結構
1 //以默認參數爲例initalCapacity=16,loadFactor=0.75,concurrencyLevel=16 2 public ConcurrentHashMap(int initalCapacity, float loadFactor, int concurrencyLevel) { 3 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 4 throw new IllegalArgumentException(); 5 if (concurrencyLevel > MAX_SEGMENTS) 6 concurrencyLevel = MAX_SEGMENTS; 7 int sshift = 0; 8 int ssize = 1;//segment數組長度 9 while (ssize < concurrencyLevel) { 10 ++sshift; 11 ssize <= 1; 12 }//通過ssize左移4位後,ssize=16,ssift=4 13 /*segmentShift用於參與散列運算的位數,segmentMask是散列運算的掩碼,這裏有關的散列函數運算和HashMap有相似之處*/ 14 this.segmentShift = 32 – ssift;//段偏移量segmentShift=28 15 this.segmentMask = ssize – 1;//段掩碼segmentMask=15(1111) 16 if (initialCapacity > MAXIMUM_CAPACITY) 17 initialCapacity = MAXIMUM_CAPACITY; 18 int c = initialCapacity / ssize;//c = 1 19 if (c * ssize < initialCapacity) 20 ++c; 21 int cap = MIN_SEGMENT_TABLE_CAPACITY;//MIN_SEGMENT_TABLE_CAPACITY=2 22 while (cap < c)//cap = 2, c = 1,false 23 cap <<= 1;//cap是segment裏HashEntry數組的長度,最小爲2 24 /*建立segments數組和segment[0]*/ 25 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[]) new HashEntry[cap]);//參數意爲:負載因子=1,數據容量=(int)(2 * 0.75)=1,總容量=2,故每一個Segment的HashEntry總容量爲2,實際數據容量爲1 26 Segment<K,V> ss = (Segment<K,V>[])new Segment[ssize];//segments數組大小爲16 27 UNSAFE.putOrderedObject(ss, SBASE, s0); 28 this.segments = ss; 29 }
以上就是整個初始化過程,主要是初始化segments的長度大小以及經過負載因子肯定每一個Segment的容量大小。肯定好Segment事後,接下來的重點就是如何準肯定位Segment。定位Segment的方法就是經過散列函數來定位,先經過hash方法對元素進行二次散列,這個算法較爲複雜,其目的只有一個——減小散列衝突,使元素能均勻分佈在不一樣的Segment上,提升容器的存取效率。 多線程
咱們經過最直觀最經常使用的put方法來觀察ConcurrentHashMap是如何經過key值計算hash值在定位到Segment的: 併發
1 //ConcurrentHashMap#put 2 public V put(K key, V value) { 3 Segment<K,V> s; 4 if (value == null) 5 throw new NullPointerException(); 6 int hash = hash(key);//根據散列函數,計算出key值的散列值 7 int j = (hash >>> segmentShift) & segmentMask;//這個操做就是定位Segment的數組下標,jdk1.7以前是segmentFor返回Segment,1.7以後直接就取消了這個方法,直接計算數組下標,而後經過偏移量底層操做獲取Segment 8 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck 9 (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment 10 s = ensureSegment(j);//經過便宜量定位不到就調用ensureSegment方法定位Segment 11 return s.put(key, hash, value, false); 12 }
Segment.put方法就是將鍵、值構造爲Entry節點加入到對應的Segment段裏,若是段中已經有元素(即表示兩個key鍵值的hash值重複)則將最新加入的放到鏈表的頭),整個過程必然是加鎖安全的。 ssh
不妨繼續深刻Segment.put方法 :
1 //Segment#put 2 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 3 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);//非阻塞獲取鎖,獲取成功node=null,失敗 4 V oldValue; 5 try { 6 HashEntry<K,V>[] tab = table;//Segment對應的HashEntry數組長度 7 int index = (tab.length - 1) & hash; 8 HashEntry<K,V> first = entryAt(tab, index);//獲取HashEntry數組的第一個值 9 for (HashEntry<K,V> e = first;;) { 10 if (e != null) {//HashEntry數組已經存在值 11 K k; 12 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {//key值和hash值都相等,則直接替換舊值 13 oldValue = e.value; 14 if (!onlyIfAbsent) { 15 e.value = value; 16 ++modCount; 17 } 18 break; 19 } 20 e = e.next;//不是同一個值則繼續遍歷,直到找到相等的key值或者爲null的HashEntry數組元素 21 } 22 else {//HashEntry數組中的某個位置元素爲null 23 if (node != null) 24 node.setNext(first);//將新加入節點(key)的next引用指向HashEntry數組第一個元素 25 else//已經獲取到了Segment鎖 26 node = new HashEntry<K,V>(hash, key, value, first) 27 int c = count + 1; 28 if (c > threshold && tab.lenth < MAXIUM_CAPACITY)//插入前先判斷是否擴容,ConcurrentHashMap擴容與HashMap不一樣,ConcurrentHashMap只擴Segment的容量,HashMap則是整個擴容 29 rehash(node); 30 else 31 setEntryAt(tab, index, node);//設置爲頭節點 32 ++modCount;//總容量 33 count = c; 34 oldValue = null; 35 break; 36 } 37 } 38 } finally { 39 unlock(); 40 } 41 return oldValue; 42 }
上面大體就是ConcurrentHashMap加入一個元素的過程,須要明白的就是ConcurrentHashMap分段鎖的概念。在JDK1.6中定位Segment較爲簡單,直接計算出Segment數組下標後就返回具體的Segment,而JDK1.7則經過偏移量來計算,算出爲空時,還有一次檢查獲取Segment,猜想是1.7使用底層native是爲了提升效率,JDK1.8的ConcurrentHashMap又有不一樣,暫未深刻研究,它的數據結果彷佛變成了紅黑樹。
有關ConcurrentHashMap的get方法再也不分析,過程總結爲一句話:根據key值計算出hash值,根據hash值計算出對應的Segment,再在Segment下的HashEntry鏈表遍歷查找。