concurrentHashMap 是用的最多的一個concurrent包數據結構,瞭解內部設計對高併發有幫助。node
Segment繼承ReentrantLock用來充當鎖的角色,每一個 Segment 對象守護每一個散列映射表的若干個桶 。數組
HashEntry 用來封裝映射表的鍵 / 值對數據結構
每一個桶是由若干個 HashEntry 對象連接起來的鏈表併發
ConcurrentHashMap定位一個元素的過程須要進行兩次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部。這一種結構的帶來的反作用是Hash的過程要比普通的HashMap要長,可是帶來的好處是寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segmentssh
擴容:因爲HashEntry.next都是/final/函數
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { '用於分段' //容許的最大段數;用於綁定構造函數參數。必須是小於1小於24的冪。 static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //根據這個數來計算segment的個數,segment的個數是僅小於這個數且是2的幾回方的一個數(ssize) static final int DEFAULT_CONCURRENCY_LEVEL = 16; //每一個分段表的最小容量。必定要有2個,至少2個,以免在懶惰的建築後使用下一個使用。 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; '用於hashEntry' //默認的用於計算Segment數組中的每個segment的HashEntry[]的容量,可是並非每個segment的HashEntry[]的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //這個表的默認加載因子,在構造函數中沒有指定的時候使用 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 用於計算Segment數組中的每個segment的HashEntry[]的最大容量(2的30次方) static final int MAXIMUM_CAPACITY = 1 << 30; //在使用鎖以前,在大小和容器值方法上進行不一樣步的重試。若是表進行連續修改,就會避免無界重試,這樣就不可能得到準確的結果。 static final int RETRIES_BEFORE_LOCK = 2; static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } } static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //存儲表 transient volatile HashEntry<K,V>[] table; //元素的數量。 transient int count; //併發標記 transient int modCount; //容量閾值,用於擴容 transient int threshold; //負載因子,用於肯定threshold final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } '真實的插入' final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : '加鎖成功在插入' scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) / key || (e.hash / hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); '釋放鎖' } return oldValue; } } '插入' public V put(K key, V value) { Segment<K,V> s; if (value / null) throw new NullPointerException(); int hash = hash(key); 'key的hash值' 'segmentShift:28' 'segmentMask:1111' int j = (hash >>> segmentShift) & segmentMask; '無符號右位移28位,模運算' if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) / null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); '調用segment的put ' } '獲取' public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) / key || (e.hash / h && key.equals(k))) return e.value; } } return null; } }
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); '設置最大併發級別:65535' if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; '' int ssize = 1; '段數組大小' while (ssize < concurrencyLevel) { ++sshift; 'sshift=4' ssize <<= 1; '左移1位,,ssize *=2 /16' } this.segmentShift = 32 - sshift; '偏移量:28' this.segmentMask = ssize - 1; '掩碼:15:1111' if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; '最大爲:1073741824。默' int c = initialCapacity / ssize; 'C=1' if (c * ssize < initialCapacity) '判斷是有沒除盡,保證:c * ssize >initialCapacity' ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; '最小HashEntry大小:' while (cap < c) cap <<= 1; // create segments and segments[0] // 建立段S0 'loadFactor=0.75' 'threshold=(int)0.75*2=1' 'HashEntry[2]' Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); //建立 segment 數組 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
'調用' public V put(K key, V value) { '判斷值' Segment<K,V> s; if (value / null) throw new NullPointerException(); int hash = hash(key); //計算hash值 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) / null) // in ensureSegment s = ensureSegment(j); '' return s.put(key, hash, value, false); } '新建segment' private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; 'cap =2' float lf = proto.loadFactor; ' 0.75' int threshold = (int)(cap * lf); 'threshold = 1' HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } '添加' final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : '加鎖' scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; '獲取table' int index = (tab.length - 1) & hash; '獲取table中的偏移量' HashEntry<K,V> first = entryAt(tab, index); '從table中,找出鏈表頭' '無限循環添加進去' for (HashEntry<K,V> e = first;;) { if (e != null) { '頭部位null' K k; "若是key相同,則直接替換" if ((k = e.key) / key ||(e.hash / hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; "若是key不一樣,則操做e後面的元素" }else { "頭爲null" if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); '新建,並將頭放在新HashEntry的後面' int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); '擴容' else setEntryAt(tab, index, node); '將新HashEntry直接添加在頭上面' ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; "獲取舊的段table" int oldCapacity = oldTable.length; '舊的HashEntry數量' int newCapacity = oldCapacity << 1; '擴大一倍' threshold = (int)(newCapacity * loadFactor); '剩擴展因子:0.75' HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; "位模運算:1111或者11111。每次擴大2被" for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; "舊數組的璉表頭" if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; "新的index" if (next / null) // 只有一個節點 newTable[idx] = e; "若是隻有一個數據,直接放到新的index中" else { "在相同的槽內重用連續序列" HashEntry<K,V> lastRun = e; "新的起始點" int lastIdx = idx; "新的index" '找到e節點第一個索引值相同的HashEntry節點' for (HashEntry<K,V> last = next;last != null;last = last.next) { int k = last.hash & sizeMask; "後繼的index" if (k != lastIdx) { "若是後繼的新index和當前結點index相同的元素" lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; "而後將相同的結點到結束,直接放在新的槽中" '複製e節點到第一個索引值相同(該結點和後繼結點index相同)節之間的HashEntry節點' for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }