一、實現原理介紹java
ConcurrentHashMap採用分段鎖的思想,將整個table分紅不一樣的segment。每一個Segment配有一個鎖,這樣每一個段都能並行的作增刪改查等功能。算法
二、ConcurrentHashMap在代碼實現上有哪些技巧編程
我的理解ConcurrentHashMap是比較好的線程安全容器不只僅是由於它的段鎖。還在於他代碼中包含了許多好的編程思想。數組
首先:ConcurrentHashMap在併發訪問時採用了CAS技術,因此性能較好。安全
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } }
其次:ConcurrentHashMap的字段MAX_SCAN_RETRIES,用來限制嘗試得到鎖的次數。這樣能夠避免在嘗試鎖的時候發生修改而一直進行獲取鎖的操做。併發
第三:在isEmpty()方法的時候巧妙的使用了modCount字段判斷是否爲空,並無添加任何所信息。app
public boolean isEmpty() { long sum = 0L; final Segment<K,V>[] segments = this.segments; //遍歷各個段信息 for (int j = 0; j < segments.length; ++j) { //得到第j個段 Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; //modCount相加 sum += seg.modCount; } } //若是不爲0,繼續判斷一次每一個段是否爲空 if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } //sum加一次、減一次 兩次事後看看sum是否爲0L。若是不是說明在判斷過程當中有修改,而且數據非空 if (sum != 0L) return false; } //不然爲空 return true; }
三、Unsafe對象字段的內存定位。less
在ConcurrentHashMap源碼中有不少Unsafe方法得到物理地址的代碼:從hash值定位段,從段內地址到定位HashEntry地址都須要Unsafe的方法。性能
Unsafe實例可以作兩件事情,this
第1、經過Unsafe類能夠分配內存,能夠釋放內存。allocateMemory、reallocateMemory、freeMemory分別用於分配內存,擴充內存和釋放內存
第2、能夠定位對象某字段的內存位置,也能夠修改對象的字段值,即便它是私有的。
static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) { long u = (j << SSHIFT) + SBASE; return ss == null ? null : (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u); } 這些SSHIFT、SBASE就是相對地址偏移量。他們的得到以下: static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; //arrayBaseOffset方法是一個本地方法,能夠獲取數組第一個元素的偏移地址 //TBASE 第一個HashEntry的地址偏移量 TBASE = UNSAFE.arrayBaseOffset(tc); //SBASE 第一個 Segment的地址偏移量 SBASE = UNSAFE.arrayBaseOffset(sc); //arrayIndexScale是數組中元素的增量地址 ts = UNSAFE.arrayIndexScale(tc); ss = UNSAFE.arrayIndexScale(sc); //long objectFieldOffset (Field field); 獲得 filed在對象中的偏移 HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed")); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); //Segemnt增量按位數據非零的位數 SSHIFT = 31 - Integer.numberOfLeadingZeros(ss); //HashEntry增量按位數據非零的值。 TSHIFT = 31 - Integer.numberOfLeadingZeros(ts); }
有了上面的說明,咱們來看看最終定位一個key值對應的Entry的方法是如何實現的。
段定位:
private Segment<K,V> segmentForHash(int h) { //hash值向右無符號必定segmentShift個單位得到了h的高32-segmentShift位,而後將該值左移動SSHIFT位+段首地址,得到段偏移地址 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE //根據段偏移地址和段首信息定位具體的Segment。 return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u); } Entry定位: static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) { //將hash值右移動 TSHIFT個位置得到HashEntry偏移量 +HashEntry基本地址 return (tab == null) ? null :(HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)i << TSHIFT) + TBASE); }
四、爲何說ConcurrentHashMap存在弱一致的問題
jdk1.7版本ConcurrentHashMap已經不存在弱一致性問題了。那麼什麼是弱一致性問題,jdk1.7又是如何解決的呢?
所謂的弱一致性問題,就是在某些狀況下,放置進去的數據不能準確的訪問到該數據。下文的put、get方法在(2)、(8)和(3)、(9)這兩個地方因爲執行順序的問題會產生某些已經放置進容器的數據沒有被正確的獲取到。這是由於ConcurrentHashMap的段裏面put方法加鎖執行、而get方法並無加鎖執行,有可能和put方法在指令執行順序上即知足happen-before原則又產生了數據的不一致性問題。
如圖所示,在上面的執行路徑下是沒法正確得到放置的數據的。
jdk1.7以後Segement裏面的get方法已經被拿掉,接下來,重點來解讀下jdk1.7 ConcurrentHashMap的放置和獲取數據的過程。
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; //得到該key值的再hash值,該算法散列的更好,減小碰撞 int h = hash(key); //得到該hash值對應的段偏移量 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //得到具體段地址 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //得到該段內 該hash值對應的具體HashEntry的地址 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; //若是hash值和key值和須要查詢的key一致,則返回該值 if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } //不然返回null,未查詢到信息 return null; } @SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; //若是放置null值,則跑錯 if (value == null) throw new NullPointerException(); //根據key值計算出hash值 int hash = hash(key); //得到該hash值的段偏移量 int j = (hash >>> segmentShift) & segmentMask; //根據偏移量計算出具體的短地址 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //首先判斷該段數據數量是否超過了段能承受的閾值,超過則擴容將舊信息hash映射過去,沒有超過則返回就segment s = ensureSegment(j); //將數據放置到該段的對應hashEntry中去。或者放置table上的HashEntry或者放置到table掛載的hashEntry中去。 return s.put(key, hash, value, false); }
參考文獻:
http://ifeve.com/concurrenthashmap-weakly-consistent/
http://blog.csdn.net/aesop_wubo/article/details/7537278