concurrentHashmap是爲了高併發而實現,內部採用分離鎖的設計,有效地避開了熱點訪問。而對於每一個分段,ConcurrentHashmap採用final和內存可見修飾符volatile關鍵字(內存當即可見:Java 的內存模型能夠保證:某個寫線程對 value 域的寫入立刻能夠被後續的某個讀線程「看」到。注:並不能保證對volatile變量狀態有依賴的其餘操做的原子性)html
借用某博客對concurrentHashmap對結構圖:java
不難看出,concurrenthashmap採用了二次hash的方式,第一次hash將key映射到對應的segment,而第二次hash則是映射到segment的不一樣桶中。web
爲何要用二次hash,主要緣由是爲了構造分離鎖,使得對於map的修改不會鎖住整個容器,提升併發能力。固然,沒有一種東西是絕對完美的,二次hash帶來的問題是整個hash的過程比hashmap單次hash要長,因此,若是不是併發情形,不要使用concurrentHashmap。算法
該數據結構中,最核心的部分是兩個內部類,HashEntry和Segment數組
concurrentHashmap維護一個segment數組,將元素分紅若干段(第一次hash)
數據結構
/** * The segments, each of which is a specialized hash table. */ final Segment<K,V>[] segments;
segments的每個segment維護一個鏈表數組併發
代碼:ssh
再來看看構造方法高併發
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
代碼28行,一旦指定了concurrencyLevel(segments數組大小)便不能改變,這樣,一旦threshold超標,rehash真不會影響segments數組,這樣,在大併發的狀況下,只會影響某一個segment的rehash而其餘segment不會受到影響性能
(put方法都要上鎖)
與hashmap相似,concurrentHashmap也採用了鏈表做爲每一個hash桶中的元素,不過concurrentHashmap又有些不一樣
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
HashEntry的key,hash採用final,能夠避免併發修改問題,HashEntry鏈的尾部是不能修改的,而next和value採用volatile,能夠避免使用同步形成的併發性能災難,新版(jdk1.7)的concurrentHashmap大量使用java Unsafe類提供的原子操做,直接調用底層操做系統,提升性能(這塊我也不是特別清楚)
1.6
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
1.6的jdk採用了樂觀鎖的方式處理了get方法,在get的時候put方法正在new對象,而此時value並未賦值,這時判斷爲空則加鎖訪問
1.7
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
1.7並無判斷value=null的狀況,不知爲什麼
跟同事溝經過,不管是1.6仍是1.7的實現,實際上都是一種樂觀的方式,而樂觀的方式帶來的是性能上的提高,但同時也帶來數據的弱一致性,若是你的業務是強一致性的業務,可能就要考慮另外的解決辦法(用Collections包裝或者像jdk6中同樣二次加鎖獲取)
http://ifeve.com/concurrenthashmap-weakly-consistent/
這篇文章能夠很好地解釋弱一致性問題
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
對於put,concurrentHashmap採用自旋鎖的方式,不一樣於1.6的直接獲取鎖
注:我的理解,這裏採用自旋鎖可能做者是以爲在分段鎖的狀態下,併發的可能原本就比較小,而且鎖佔用時間又並非特別長,所以自旋鎖能夠減少線程喚醒和切換的開銷
private int hash(Object k) { int h = hashSeed; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
concurrentHashMap採用自己hashcode的同時,採用Wang/Jenkins算法對每位都作了處理,使得發生hash衝突的可能性大大減少(不然效率會不好)
而對於concurrentHashMap,segments的大小在初始時肯定,此後不變,而元素所在segments桶序列由hash的高位決定
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
segmentShift爲(32-segments大小的二進制長度)
concurrentHashmap主要是爲併發設計,與Collections的包裝不一樣,他不是採用全同步的方式,而是採用非鎖get方式,經過數據的弱一致性帶來性能上的大幅提高,同時採用分段鎖的策略,提升併發能力
參考:
http://www.jb51.net/article/49699.htm
http://my.oschina.net/chihz/blog/58035
http://www.ibm.com/developerworks/cn/java/java-lo-concurrenthashmap/