HashTable是一個線程安全的類,它使用synchronized來鎖住整張Hash表來實現線程安全,即每次鎖住整張表讓線程獨佔。ConcurrentHashMap容許多個修改操做併發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不一樣部分進行的修改。ConcurrentHashMap內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的Hashtable,它們有本身的鎖。只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。node
有些方法須要跨段,好比size()和containsValue(),它們可能須要鎖定整個表而而不只僅是某個段,這須要按順序鎖定全部段,操做完畢後,又按順序釋放全部段的鎖。這裏「按順序」是很重要的,不然極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,而且其成員變量實際上也是final的,可是,僅僅是將數組聲明爲final的並不保證數組成員也是final的,這須要實現上的保證。這能夠確保不會出現死鎖,由於得到鎖的順序是固定的。算法
ConcurrentHashMap使用分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。以下圖是ConcurrentHashMap的內部結構圖:數組
從圖中能夠看到,ConcurrentHashMap內部分爲不少個Segment,每個Segment擁有一把鎖,而後每一個Segment(繼承ReentrantLock)安全
static final class Segment<K,V> extends ReentrantLock implements Serializable
Segment繼承了ReentrantLock,代表每一個segment均可以當作一個鎖。(ReentrantLock前文已經提到,不瞭解的話就把當作synchronized的替代者吧)這樣對每一個segment中的數據須要同步操做的話都是使用每一個segment容器對象自身的鎖來實現。只有對全局須要改變時鎖定的是全部的segment。多線程
Segment下面包含不少個HashEntry列表數組。對於一個key,須要通過三次(爲何要hash三次下文會詳細講解)hash操做,才能最終定位這個元素的位置,這三次hash分別爲:併發
對於一個key,先進行一次hash操做,獲得hash值h1,也即h1 = hash1(key);app
將獲得的h1的高几位進行第二次hash,獲得hash值h2,也即h2 = hash2(h1高几位),經過h2可以肯定該元素的放在哪一個Segment;ssh
將獲得的h1進行第三次hash,獲得hash值h3,也即h3 = hash3(h1),經過h3可以肯定該元素放置在哪一個HashEntry。async
ConcurrentHashMap中主要實體類就是三個:ConcurrentHashMap(整個Hash表),Segment(桶),HashEntry(節點),對應上面的圖能夠看出之間的關係ide
/** * The segments, each of which is a specialized hash table */ final Segment<K,V>[] segments;
不變(Immutable)和易變(Volatile)ConcurrentHashMap徹底容許多個讀操做併發進行,讀操做並不須要加鎖。若是使用傳統的技術,如HashMap中的實現,若是容許能夠在hash鏈的中間添加或刪除元素,讀操做不加鎖將獲得不一致的數據。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry表明每一個hash鏈中的一個節點,其結構以下所示:
1 static final class HashEntry<K,V> { 2 final K key; 3 final int hash; 4 volatile V value; 5 volatile HashEntry<K,V> next; 6 }
在JDK 1.6中,HashEntry中的next指針也定義爲final,而且每次插入將新添加節點做爲鏈的頭節點(同HashMap實現),並且每次刪除一個節點時,會將刪除節點以前的全部節點 拷貝一份組成一個新的鏈,而將當前節點的上一個節點的next指向當前節點的下一個節點,從而在刪除之後 有兩條鏈存在,於是能夠保證即便在同一條鏈中,有一個線程在刪除,而另外一個線程在遍歷,它們都能工做良好,由於遍歷的線程能繼續使用原有的鏈。於是這種實現是一種更加細粒度的happens-before關係,即若是遍歷線程在刪除線程結束後開始,則它能看到刪除後的變化,若是它發生在刪除線程正在執行中間,則它會使用原有的鏈,而不會等到刪除線程結束後再執行,即看不到刪除線程的影響。若是這不符合你的需求,仍是乖乖的用Hashtable或HashMap的synchronized版本,Collections.synchronizedMap()作的包裝。
而HashMap中的Entry只有key是final的
1 static class Entry<K,V> implements Map.Entry<K,V> {2 final K key;3 V value;4 Entry<K,V> next;5 int hash;
不變模式(immutable)是多線程安全裏最簡單的一種保障方式。由於你拿他沒有辦法,想改變它也沒有機會。
不變模式主要經過final關鍵字來限定的。在JMM中final關鍵字還有特殊的語義。Final域使得確保初始化安全性(initialization safety)成爲可能,初始化安全性讓不可變形對象不須要同步就能自由地被訪問和共享。
先看看ConcurrentHashMap的初始化作了哪些事情,構造函數的源碼以下:
1 public ConcurrentHashMap(int initialCapacity, 2 float loadFactor, int concurrencyLevel) { 3 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 4 throw new IllegalArgumentException(); 5 if (concurrencyLevel > MAX_SEGMENTS) 6 concurrencyLevel = MAX_SEGMENTS; 7 // Find power-of-two sizes best matching arguments 8 int sshift = 0; 9 int ssize = 1;10 while (ssize < concurrencyLevel) {11 ++sshift;12 ssize <<= 1;13 }14 this.segmentShift = 32 - sshift;15 this.segmentMask = ssize - 1;16 if (initialCapacity > MAXIMUM_CAPACITY)17 initialCapacity = MAXIMUM_CAPACITY;18 int c = initialCapacity / ssize;19 if (c * ssize < initialCapacity)20 ++c;21 int cap = MIN_SEGMENT_TABLE_CAPACITY;22 while (cap < c)23 cap <<= 1;24 // create segments and segments[0]25 Segment<K,V> s0 =26 new Segment<K,V>(loadFactor, (int)(cap * loadFactor),27 (HashEntry<K,V>[])new HashEntry[cap]);28 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];29 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]30 this.segments = ss;31 }
傳入的參數有initialCapacity,loadFactor,concurrencyLevel這三個。
initialCapacity表示新建立的這個ConcurrentHashMap的初始容量,也就是上面的結構圖中的Entry數量。默認值爲static final int DEFAULT_INITIAL_CAPACITY = 16;
loadFactor表示負載因子,就是當ConcurrentHashMap中的元素個數大於loadFactor * 最大容量時就須要rehash,擴容。默認值爲static final float DEFAULT_LOAD_FACTOR = 0.75f;
concurrencyLevel表示併發級別,這個值用來肯定Segment的個數,Segment的個數是大於等於concurrencyLevel的第一個2的n次方的數。好比,若是concurrencyLevel爲12,13,14,15,16這些數,則Segment的數目爲16(2的4次方)。默認值爲static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想狀況下ConcurrentHashMap的真正的併發訪問量可以達到concurrencyLevel,由於有concurrencyLevel個Segment,假若有concurrencyLevel個線程須要訪問Map,而且須要訪問的數據都剛好分別落在不一樣的Segment中,則這些線程可以無競爭地自由訪問(由於他們不須要競爭同一把鎖),達到同時訪問的效果。這也是爲何這個參數起名爲「併發級別」的緣由。
初始化的一些動做:
驗證參數的合法性,若是不合法,直接拋出異常。
concurrencyLevel也就是Segment的個數不能超過規定的最大Segment的個數,默認值爲static final int MAX_SEGMENTS = 1 << 16;,若是超過這個值,設置爲這個值。
而後使用循環找到大於等於concurrencyLevel的第一個2的n次方的數ssize,這個數就是Segment數組的大小,並記錄一共向左按位移動的次數sshift,並令segmentShift = 32 - sshift,而且segmentMask的值等於ssize - 1,segmentMask的各個二進制位都爲1,目的是以後能夠經過key的hash值與這個值作&運算肯定Segment的索引。
檢查給的容量值是否大於容許的最大容量值,若是大於該值,設置爲該值。最大容量值爲static final int MAXIMUM_CAPACITY = 1 << 30;。
而後計算每一個Segment平均應該放置多少個元素,這個值c是向上取整的值。好比初始容量爲15,Segment個數爲4,則每一個Segment平均須要放置4個元素。
最後建立一個Segment實例,將其當作Segment數組的第一個元素。
put操做的源碼以下:
1 public V put(K key, V value) { 2 Segment<K,V> s; 3 if (value == null) 4 throw new NullPointerException(); 5 int hash = hash(key); 6 int j = (hash >>> segmentShift) & segmentMask; 7 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck 8 (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment 9 s = ensureSegment(j);10 return s.put(key, hash, value, false);11 }
操做步驟以下:
判斷value是否爲null,若是爲null,直接拋出異常。
key經過一次hash運算獲得一個hash值。(這個hash運算下文詳說)
將獲得hash值向右按位移動segmentShift位,而後再與segmentMask作&運算獲得segment的索引j。
在初始化的時候咱們說過segmentShift的值等於32-sshift,例如concurrencyLevel等於16,則sshift等於4,則segmentShift爲28。hash值是一個32位的整數,將其向右移動28位就變成這個樣子:
0000 0000 0000 0000 0000 0000 0000 xxxx,而後再用這個值與segmentMask作&運算,也就是取最後四位的值。這個值肯定Segment的索引。
使用Unsafe的方式從Segment數組中獲取該索引對應的Segment對象。
向這個Segment對象中put值,這個put操做也基本是同樣的步驟(經過&運算獲取HashEntry的索引,而後set)。
1 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 2 HashEntry<K,V> node = tryLock() ? null : 3 scanAndLockForPut(key, hash, value); 4 V oldValue; 5 try { 6 HashEntry<K,V>[] tab = table; 7 int index = (tab.length - 1) & hash; 8 HashEntry<K,V> first = entryAt(tab, index); 9 for (HashEntry<K,V> e = first;;) {10 if (e != null) {11 K k;12 if ((k = e.key) == key ||13 (e.hash == hash && key.equals(k))) {14 oldValue = e.value;15 if (!onlyIfAbsent) {16 e.value = value;17 ++modCount;18 }19 break;20 }21 e = e.next;22 }23 else {24 if (node != null)25 node.setNext(first);26 else27 node = new HashEntry<K,V>(hash, key, value, first);28 int c = count + 1;29 if (c > threshold && tab.length < MAXIMUM_CAPACITY)30 rehash(node);31 else32 setEntryAt(tab, index, node);33 ++modCount;34 count = c;35 oldValue = null;36 break;37 }38 }39 } finally {40 unlock();41 }42 return oldValue;43 }
put操做是要加鎖的。
get操做的源碼以下:
1 public V get(Object key) { 2 Segment<K,V> s; // manually integrate access methods to reduce overhead 3 HashEntry<K,V>[] tab; 4 int h = hash(key); 5 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; 6 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && 7 (tab = s.table) != null) { 8 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile 9 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);10 e != null; e = e.next) {11 K k;12 if ((k = e.key) == key || (e.hash == h && key.equals(k)))13 return e.value;14 }15 }16 return null;17 }
操做步驟爲:
和put操做同樣,先經過key進行兩次hash肯定應該去哪一個Segment中取數據。
使用Unsafe獲取對應的Segment,而後再進行一次&運算獲得HashEntry鏈表的位置,而後從鏈表頭開始遍歷整個鏈表(由於Hash可能會有碰撞,因此用一個鏈表保存),若是找到對應的key,則返回對應的value值,若是鏈表遍歷完都沒有找到對應的key,則說明Map中不包含該key,返回null。
值得注意的是,get操做是不須要加鎖的(若是value爲null,會調用readValueUnderLock,只有這個步驟會加鎖),經過前面提到的volatile和final來確保數據安全。
size操做與put和get操做最大的區別在於,size操做須要遍歷全部的Segment才能算出整個Map的大小,而put和get都只關心一個Segment。假設咱們當前遍歷的Segment爲SA,那麼在遍歷SA過程當中其餘的Segment好比SB可能會被修改,因而這一次運算出來的size值可能並非Map當前的真正大小。因此一個比較簡單的辦法就是計算Map大小的時候全部的Segment都Lock住,不能更新(包含put,remove等等)數據,計算完以後再Unlock。這是普通人可以想到的方案,可是牛逼的做者還有一個更好的Idea:先給3次機會,不lock全部的Segment,遍歷全部Segment,累加各個Segment的大小獲得整個Map的大小,若是某相鄰的兩次計算獲取的全部Segment的更新的次數(每一個Segment都有一個modCount變量,這個變量在Segment中的Entry被修改時會加一,經過這個值能夠獲得每一個Segment的更新操做的次數)是同樣的,說明計算過程當中沒有更新操做,則直接返回這個值。若是這三次不加鎖的計算過程當中Map的更新次數有變化,則以後的計算先對全部的Segment加鎖,再遍歷全部Segment計算Map大小,最後再解鎖全部Segment。源代碼以下:
1 public int size() { 2 // Try a few times to get accurate count. On failure due to 3 // continuous async changes in table, resort to locking. 4 final Segment<K,V>[] segments = this.segments; 5 int size; 6 boolean overflow; // true if size overflows 32 bits 7 long sum; // sum of modCounts 8 long last = 0L; // previous sum 9 int retries = -1; // first iteration isn't retry10 try {11 for (;;) {12 if (retries++ == RETRIES_BEFORE_LOCK) {13 for (int j = 0; j < segments.length; ++j)14 ensureSegment(j).lock(); // force creation15 }16 sum = 0L;17 size = 0;18 overflow = false;19 for (int j = 0; j < segments.length; ++j) {20 Segment<K,V> seg = segmentAt(segments, j);21 if (seg != null) {22 sum += seg.modCount;23 int c = seg.count;24 if (c < 0 || (size += c) < 0)25 overflow = true;26 }27 }28 if (sum == last)29 break;30 last = sum;31 }32 } finally {33 if (retries > RETRIES_BEFORE_LOCK) {34 for (int j = 0; j < segments.length; ++j)35 segmentAt(segments, j).unlock();36 }37 }38 return overflow ? Integer.MAX_VALUE : size;39 }
舉個例子:
containsValue操做採用了和size操做同樣的想法:
1 public boolean containsValue(Object value) { 2 // Same idea as size() 3 if (value == null) 4 throw new NullPointerException(); 5 final Segment<K,V>[] segments = this.segments; 6 boolean found = false; 7 long last = 0; 8 int retries = -1; 9 try {10 outer: for (;;) {11 if (retries++ == RETRIES_BEFORE_LOCK) {12 for (int j = 0; j < segments.length; ++j)13 ensureSegment(j).lock(); // force creation14 }15 long hashSum = 0L;16 int sum = 0;17 for (int j = 0; j < segments.length; ++j) {18 HashEntry<K,V>[] tab;19 Segment<K,V> seg = segmentAt(segments, j);20 if (seg != null && (tab = seg.table) != null) {21 for (int i = 0 ; i < tab.length; i++) {22 HashEntry<K,V> e;23 for (e = entryAt(tab, i); e != null; e = e.next) {24 V v = e.value;25 if (v != null && value.equals(v)) {26 found = true;27 break outer;28 }29 }30 }31 sum += seg.modCount;32 }33 }34 if (retries > 0 && sum == last)35 break;36 last = sum;37 }38 } finally {39 if (retries > RETRIES_BEFORE_LOCK) {40 for (int j = 0; j < segments.length; ++j)41 segmentAt(segments, j).unlock();42 }43 }44 return found;45 }
看看hash的源代碼:
1 private int hash(Object k) { 2 int h = hashSeed; 3 4 if ((0 != h) && (k instanceof String)) { 5 return sun.misc.Hashing.stringHash32((String) k); 6 } 7 8 h ^= k.hashCode(); 9 10 // Spread bits to regularize both segment and index locations,11 // using variant of single-word Wang/Jenkins hash.12 h += (h << 15) ^ 0xffffcd7d;13 h ^= (h >>> 10);14 h += (h << 3);15 h ^= (h >>> 6);16 h += (h << 2) + (h << 14);17 return h ^ (h >>> 16);18 }
源碼中的註釋是這樣的:
這裏用到了Wang/Jenkins hash算法的變種,主要的目的是爲了減小哈希衝突,使元素可以均勻的分佈在不一樣的Segment上,從而提升容器的存取效率。假如哈希的質量差到極點,那麼全部的元素都在一個Segment中,不只存取元素緩慢,分段鎖也會失去意義。
舉個簡單的例子:
1 System.out.println(Integer.parseInt("0001111", 2) & 15);2 System.out.println(Integer.parseInt("0011111", 2) & 15);3 System.out.println(Integer.parseInt("0111111", 2) & 15);4 System.out.println(Integer.parseInt("1111111", 2) & 15);
這些數字獲得的hash值都是同樣的,全是15,因此若是不進行第一次預hash,發生衝突的概率仍是很大的,可是若是咱們先把上例中的二進制數字使用hash()函數先進行一次預hash,獲得的結果是這樣的:
上面這個例子引用自: InfoQ
能夠看到每一位的數據都散開了,而且ConcurrentHashMap中是使用預hash值的高位參與運算的。好比以前說的先將hash值向右按位移動28位,再與15作&運算,獲得的結果都別爲:4,15,7,8,沒有衝突!
ConcurrentHashMap中的key和value值都不能爲null,HashMap中key能夠爲null,HashTable中key不能爲null。
ConcurrentHashMap是線程安全的類並不能保證使用了ConcurrentHashMap的操做都是線程安全的!
ConcurrentHashMap的get操做不須要加鎖,put操做須要加鎖