目錄java
本文是基於JDK7分析ConcurrentHashMap的實現原理,這個版本ConcurrentHashMap的代碼實現比較清晰,代碼加註釋總共也就1622行,適合用來分析學習。
ConcurrentHashMap至關於多線程版本的HashMap,不會有線程安全問題,在多線程環境下使用HashMap可能產生死循環等問題,在這篇博客裏作了很好的解釋:老生常談,HashMap的死循環,咱們知道除了HashMap,還有線程安全的HashTable,HashTable的實現原理與HashMap一致,只是HashTable全部的方法都使用了synchronized來修飾確保線程安全性,這在多線程競爭激烈的環境下效率是很低的;ConcurrentHashMap經過鎖分段,把整個哈希表ConcurrentHashMap分紅了多個片斷(segment),來確保線程安全。下面是JDK對ConcurrentHashMap的介紹:node
A hash table supporting full concurrency of retrievals and high expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access. This class is fully interoperable with Hashtable in programs that rely on its thread safety but not on its synchronization details.算法
大意是ConcurrentHashMap支持併發的讀寫,支持HashTable的全部方法,實現併發讀寫不會鎖定整個ConcurrentHashMap。數組
咱們回憶一下HashMap的數據結構(JDK7版本),核心是一個鍵值對Entry數組,鍵值對經過鍵的hash值映射到數組上:
安全
ConcurrentHashMap在初始化時會要求初始化concurrencyLevel做爲segment數組長度,即併發度,表明最多有多少個線程能夠同時操做ConcurrentHashMap,默認是16,每一個segment片斷裏面含有鍵值對HashEntry數組,是真正存放鍵值對的地方,這就是ConcurrentHashMap的數據結構。
數據結構
從圖中能夠看到,ConcurrentHashMap離不開Segment,Segment是ConcurrentHashMap的一個靜態內部類,能夠看到Segment繼承了重入鎖ReentrantLock,要想訪問Segment片斷,線程必須得到同步鎖,結構以下:多線程
static final class Segment<K,V> extends ReentrantLock implements Serializable { //嘗試獲取鎖的最多嘗試次數,即自旋次數 static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; //HashEntry數組,也就是鍵值對數組 transient volatile HashEntry<K, V>[] table; //元素的個數 transient int count; //segment中發生改變元素的操做的次數,如put/remove transient int modCount; //當table大小超過閾值時,對table進行擴容,值爲capacity *loadFactor transient int threshold; //加載因子 final float loadFactor; Segment(float lf, int threshold, HashEntry<K, V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } }
鍵值對HashEntry是ConcurrentHashMap的基本數據結構,多個HashEntry能夠造成鏈表用於解決hash衝突。併發
static final class HashEntry<K,V> { //hash值 final int hash; //鍵 final K key; //值 volatile V value; //下一個鍵值對 volatile HashEntry<K, V> next; HashEntry(int hash, K key, V value, HashEntry<K, V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } }
ConcurrentHashMap成員變量和構造方法以下:less
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { private static final long serialVersionUID = 7249069246763182397L; //默認的初始容量 static final int DEFAULT_INITIAL_CAPACITY = 16; //默認加載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; //默認的併發度,也就是默認的Segment數組長度 static final int DEFAULT_CONCURRENCY_LEVEL = 16; //最大容量,ConcurrentMap最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; //每一個segment中table數組的長度,必須是2^n,最小爲2 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; //容許最大segment數量,用於限定concurrencyLevel的邊界,必須是2^n static final int MAX_SEGMENTS = 1 << 16; // slightly conservative //非鎖定狀況下調用size和contains方法的重試次數,避免因爲table連續被修改致使無限重試 static final int RETRIES_BEFORE_LOCK = 2; //計算segment位置的掩碼值 final int segmentMask; //用於計算算segment位置時,hash參與運算的位數 final int segmentShift; //Segment數組 final Segment<K,V>[] segments; public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //參數校驗 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments //找到一個大於等於傳入的concurrencyLevel的2^n數,且與concurrencyLevel最接近 //ssize做爲Segment數組 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 計算每一個segment中table的容量 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; // 確保cap是2^n while (cap < c) cap <<= 1; // create segments and segments[0] // 建立segments並初始化第一個segment數組,其他的segment延遲初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } }
concurrencyLevel 參數表示指望併發的修改 ConcurrentHashMap 的線程數量,用於決定 Segment 的數量,經過算法能夠知道就是找到最接近傳入的concurrencyLevel的2的冪次方。而segmentMask 和 segmentShift看上去有點難以理解,做用主要是根據key的hash值作計算定位在哪一個Segment片斷。ssh
對於哈希表而言,最重要的方法就是put和get了,下面分別來分析這兩個方法的實現:
put方法實際上只有兩步:1.根據鍵的值定位鍵值對在那個segment片斷 2.調用Segment的put方法
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); //計算鍵的hash值 int hash = hash(key); //經過hash值運算把鍵值對定位到segment[j]片斷上 int j = (hash >>> segmentShift) & segmentMask; //檢查segment[j]是否已經初始化了,沒有的話調用ensureSegment初始化segment[j] if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); //向片斷中插入鍵值對 return s.put(key, hash, value, false); }
咱們從ConcurrentHashMap的構造函數能夠發現Segment數組只初始化了Segment[0],其他的Segment是用到了在初始化,用了延遲加載的策略,而延遲加載調用的就是ensureSegment方法
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; //按照segment[0]的HashEntry數組長度和加載因子初始化Segment[k] if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
調用Segment的put方法插入鍵值對到Segment的HashEntry數組
final V put(K key, int hash, V value, boolean onlyIfAbsent) { //Segment繼承ReentrantLock,嘗試獲取獨佔鎖 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; //定位鍵值對在HashEntry數組上的位置 int index = (tab.length - 1) & hash; //獲取這個位置的第一個鍵值對 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) {//此處有鏈表結構,一直循環到e==null K k; //存在與待插入鍵值對相同的鍵,則替換value if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) {//onlyIfAbsent默認爲false e.value = value; ++modCount; } break; } e = e.next; } else { //node不爲null,設置node的next爲first,node爲當前鏈表的頭節點 if (node != null) node.setNext(first); //node爲null,建立頭節點,指定next爲first,node爲當前鏈表的頭節點 else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //擴容條件 (1)entry數量大於閾值 (2) 當前數組tab長度小於最大容量。知足以上條件就擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) //擴容 rehash(node); else //tab的index位置設置爲node, setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
在不超過最大重試次數MAX_SCAN_RETRIES經過CAS嘗試獲取鎖
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { //first,e:鍵值對的hash值定位到數組tab的第一個鍵值對 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node //線程嘗試經過CAS獲取鎖 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { //當e==null或key.equals(e.key)時retry=0,走出這個分支 if (e == null) { if (node == null) // speculatively create node //初始化鍵值對,next指向null node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } //超過最大自旋次數,阻塞 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } //頭節點發生變化,從新遍歷 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
用於對Segment的table數組進行擴容,擴容後的數組長度是原數組的兩倍。
private void rehash(HashEntry<K,V> node) { //擴容前的舊tab數組 HashEntry<K,V>[] oldTable = table; //擴容前數組長度 int oldCapacity = oldTable.length; //擴容後數組長度(擴容前兩倍) int newCapacity = oldCapacity << 1; //計算新的閾值 threshold = (int)(newCapacity * loadFactor); //新的tab數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; //新的掩碼 int sizeMask = newCapacity - 1; //遍歷舊的數組 for (int i = 0; i < oldCapacity ; i++) { //遍歷數組的每個元素 HashEntry<K,V> e = oldTable[i]; if (e != null) { //元素e指向的下一個節點,若是存在hash衝突那麼e不爲空 HashEntry<K,V> next = e.next; //計算元素在新數組的索引 int idx = e.hash & sizeMask; // 桶中只有一個元素,把當前的e設置給新的table if (next == null) // Single node on list newTable[idx] = e; //桶中有佈置一個元素的鏈表 else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; // idx 是當前鏈表的頭結點 e 的新位置 int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { //k是單鏈表元素在新數組的位置 int k = last.hash & sizeMask; //lastRun是最後一個擴容後不在原桶處的Entry if (k != lastIdx) { lastIdx = k; lastRun = last; } } //lastRun以及它後面的元素都在一個桶中 newTable[lastIdx] = lastRun; // Clone remaining nodes //遍歷到lastRun便可 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //處理引發擴容的那個待添加的節點 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; //把Segment的table指向擴容後的table table = newTable; }
get獲取元素不須要加鎖,效率高,獲取key定位到的segment片斷仍是遍歷table數組的HashEntry元素時使用了UNSAFE.getObjectVolatile保證了可以無鎖且獲取到最新的volatile變量的值
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; //計算key的hash值 int h = hash(key); //根據hash值計算key在哪一個segment片斷 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //獲取segments[u]的table數組 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //遍歷table中的HashEntry元素 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; //找到相同的key,返回value if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
size方法用來計算ConcurrentHashMap中儲存元素的個數。那麼在統計全部的segment元素的個數是否都須要上鎖呢?若是不上鎖在統計的過程當中可能存在其餘線程併發存儲/刪除元素,而若是上鎖又會下降讀寫效率。ConcurrentHashMap在實現時使用了折中的方法,它會無鎖遍歷三次把全部的segment的modCount加到sum裏面,若是與前一次遍歷結果相比sum沒有改變那麼說明這兩次遍歷沒有其餘線程修改ConcurrentHashMap,返回segment的count的和;若是每次遍歷與上一次相比都不同那就上鎖進行同步。
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { //達到RETRIES_BEFORE_LOCK,也就是三次 if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); //遍歷計算segment的modCount和count的和 if (seg != null) { sum += seg.modCount; int c = seg.count; //是否溢出int範圍 if (c < 0 || (size += c) < 0) overflow = true; } } //last是上一次的sum值,相等跳出循環 if (sum == last) break; last = sum; } } finally { //解鎖 if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
調用Segment的remove方法
public V remove(Object key) { int hash = hash(key); Segment<K,V> s = segmentForHash(hash); return s == null ? null : s.remove(key, hash, null); }
獲取同步鎖,移除指定的鍵值對
final V remove(Object key, int hash, Object value) { //獲取同步鎖 if (!tryLock()) scanAndLock(key, hash); V oldValue = null; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> e = entryAt(tab, index); //遍歷鏈表用來保存當前鏈表節點的前一個節點 HashEntry<K,V> pred = null; while (e != null) { K k; HashEntry<K,V> next = e.next; //找到key對應的鍵值對 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { V v = e.value; //鍵值對的值與傳入的value相等 if (value == null || value == v || value.equals(v)) { //當前元素爲頭節點,把當前元素的下一個節點設爲頭節點 if (pred == null) setEntryAt(tab, index, next); //不是頭節點,把當前鏈表節點的前一個節點的next指向當前節點的下一個節點 else pred.setNext(next); ++modCount; --count; oldValue = v; } break; } pred = e; e = next; } } finally { unlock(); } return oldValue; }
掃描是否含有指定的key而且獲取同步鎖,當方法執行完畢也就是跳出循環確定成功獲取到同步鎖,跳出循環有兩種方式:1.tryLock方法嘗試獲取獨佔鎖成功 2.嘗試獲取超過最大自旋次數MAX_SCAN_RETRIES線程堵塞,當線程從等待隊列中被喚醒獲取到鎖跳出循環。
private void scanAndLock(Object key, int hash) { // similar to but simpler than scanAndLockForPut HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; int retries = -1; while (!tryLock()) { HashEntry<K,V> f; if (retries < 0) { if (e == null || key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; retries = -1; } } }
檢查ConcurrentHashMap是否爲空。一樣沒有使用同步鎖,經過兩次遍歷:1.肯定每一個segment是否爲0,其中任何一個segment的count不爲0,就返回,都爲0,就累加modCount爲sum.2.第一個循環執行完尚未推出,map可能爲空,再作一次遍歷,若是在這個過程當中任何一個segment的count不爲0返回false,同時sum減去每一個segment的modCount,若循環執行完程序尚未退出,比較sum是否爲0,爲0表示兩次檢查沒有元素插入,map確實爲空,不然map不爲空。
public boolean isEmpty() { //累計segment的modCount值 long sum = 0L; final Segment<K,V>[] segments = this.segments; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum += seg.modCount; } } //再次檢查 if (sum != 0L) { // recheck unless no modifications for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { if (seg.count != 0) return false; sum -= seg.modCount; } } if (sum != 0L) return false; } return true; }
ConcurrentHashMap引入分段鎖的概念提升了併發量,每當線程要修改哈希表時並非鎖住整個表,而是去操做某一個segment片斷,只對segment作同步,經過細化鎖的粒度提升了效率,相對與HashTable對整個哈希表作同步處理更實用與多線程環境。