ConcurrentHashMap是Java中使用很是廣泛的一個Map,ConcurrentHashMap較HashMap而言極大提升了併發操做速度,咱們知道HashMap是線程不安全的,在多線程環境下容易出現死鎖,線程安全的HashTable(包括SynchronizedMap)因爲對每次操做都加鎖致使效率很是低,咱們都知道Map通常都是數組+鏈表結構(JDK1.8可能爲數組+紅黑樹),ConcurrentHashMap避免了對全局加鎖改爲了局部加鎖操做,這樣就極大地提升了併發環境下的操做速度,ConcurrentHashMap在JDK1.7和1.8中的實現很是不一樣,接下來咱們會分別對1.7和1.8中的實現原理作分析。java
ConcurrentHashMap(JDK1.7)node
在JDK1.7中ConcurrentHashMap採用了數組+Segment+分段鎖的方式實現,Segment是ConcurrentHashMap(後續此ConcurrentHashMap都泛指JDK1.7中實現)一個很是重要的部分,Segment繼承了ReentrantLock所以擁有了鎖的功能,Segment中維護了HashEntry數組table,HashEntry本質是一個K-V存儲結構,內部存儲了目標對象的Key和Value,HashEntry同時也是一個鏈式結構內部維護了下一個HashEntry變量next,即Segment是一個數組鏈表結構,而整個ConcurrentHashMap中維護了一個Segment數組segments,所以ConcurrentHashMap的總體結構以下:數組
其中HashEntry基層了可重入鎖,所以對每一個segments中的元素進行操做都會加鎖。接下來咱們分析下ConcurrentHashMap的主要方法的源碼進行分析。安全
put操做多線程
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); //計算hash值 int hash = hash(key); //獲取segments數組位置 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); //插入數據 return s.put(key, hash, value, false); }
ensureSegment方法的主要功能是找出的segmengs數組位置位於k的Segment,若是不存在將會建立。併發
private Segment<K,V> ensureSegment(int k) { //獲取當前segmegts數組 final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //CAS獲取當前數組是否存在,存在當即返回 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); //實例化一個HashEntry數組 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //再次獲segments中取位置位於k的值 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //不存在則新建一個Segments並CAS設置位於k處 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
插入數據:async
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //這句代碼主要是獲取Segment的鎖, HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { //獲取HashEntry數組 HashEntry<K,V>[] tab = table; //定位元素位於HashEntry數組中的位置 int index = (tab.length - 1) & hash; //獲取HashEntry中數組位於index的位置,也就是鏈表的第一個節點 HashEntry<K,V> first = entryAt(tab, index); //循環遍歷鏈表 for (HashEntry<K,V> e = first;;) { //鏈表頭節點不爲null if (e != null) { K k; //判斷當前節點是否與須要插入的數據是否同樣,同樣的話修改value直接退出循環 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { //獲取鎖時沒有初始化節點就直接初始化節點 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //數量大於閾值且長度小於最大容量時對HashEntry數組進行擴容處理 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { //釋放鎖 unlock(); } return oldValue; }
上面這段代碼比較簡單,基本就是跟HashMap相似,只是多了個獲取鎖的過程。這裏HashEntry數組數量大於某個閾值時會進行擴容處理,咱們看下擴容方法rehash;高併發
private void rehash(HashEntry<K,V> node) { //獲取老的數組 HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; //設置新的數組容量爲老的數組容量的兩倍,JDK中大量使用了移位操做,移位比乘除效率高,值得推薦。 int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); //實例化新的HashEntry數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; //循環遍歷數組 for (int i = 0; i < oldCapacity ; i++) { //獲取老數組中位置處於i的節點,即鏈表的頭節點 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; //若是鏈表只有一個節點直接設置須要插入的值爲數組值 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; //尋找一個lastRun節點,這個節點以後的全部節點都要放在某個數組下面 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } //將lastRun後面的全部節點均防止在lastIds這個位置 newTable[lastIdx] = lastRun; // Clone remaining nodes //遷移lastRun以前的節點 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //採用頭插入法將鏈表插入到頭部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
get操做性能
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
get操做就很是簡單了,也不須要加速操做,首先定位到segments數組,再定位到HashEntry數組,若是數組位置就是須要獲取的值直接返回不然遍歷鏈表。this
size操做
size()方法比普通的集合獲取要複雜點,由於在獲取size的時候可能有其它線程正在增長或刪除元素,JDK7實現爲首先兩次不加鎖去獲取這個值大小,每次都是將全部segments中值相加,若兩次相加的結果同樣能夠認爲這個結果是可信的,若兩次相加的結構不一致則進一步對全部segments數組加鎖處理。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { //判斷是否須要加鎖 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; //統計每一個segments數組中元素大小 for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } //若兩次相等則退出循環 if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
JDK7中ConcurrentHashMap實現還算比較簡單也容易看到,其主要思想是採用分段鎖的設計思想,對於併發請求寫入只鎖住集合的一部分這樣極大地提升了高併發環境下的性能。
ConcurrentHashMap(JDK1.8)
JDK8中ConcurrentHashMap參考了JDK8 HashMap的實現,採用了數組+鏈表+紅黑樹的實現方式來設計,內部大量採用CAS操做,JDK8中完全放棄了Segment轉而採用的是Node,其設計思想也再也不是JDK1.7中的分段鎖思想了。下面將看下JDK8中ConcurrentHashMap的結構。因爲引入了紅黑樹,使得ConcurrentHashMap的實現很是複雜,咱們都知道,紅黑樹是一種性能很是好的二叉查找樹,其查找性能爲O(logN),可是其實現過程也很是複雜,並且可讀性也很是差,Doug Lea的思惟能力確實不是通常人能比的,早期徹底採用鏈表結構時Map的查找時間複雜度爲O(N),JDK8中ConcurrentHashMap在鏈表的長度大於某個閾值的時候會將鏈表轉換成紅黑樹進一步提升其查找性能。這裏咱們主要關係Put、Get以及Size方法。
put方法
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
未完待續