接上一篇 學習 ConcurrentHashMap1.8 併發寫機制, 本文主要學習 Segment分段鎖
的實現原理。java
雖然 JDK1.7
在生產環境已逐漸被 JDK1.8
替代,然而一些好的思想仍是須要進行學習的。比方說位圖中尋找 bit
位的思路是否是和 ConcurrentHashMap1.7
有點類似?node
接下來,本文基於 OpenJDK7
來作源碼解析。算法
ConcurrentHashMap 中 put()是線程安全的。可是不少時候, 因爲業務需求, 須要先 get()
操做再 put()
操做,這 2 個操做沒法保證原子性,這樣就會產生線程安全問題了。你們在開發中必定要注意。數組
ConcurrentHashMap 的結構示意圖以下:安全
在進行數據的定位時,會首先找到 segment
, 而後在 segment
中定位 bucket
。若是多線程操做同一個 segment
, 就會觸發 segment
的鎖 ReentrantLock
, 這就是分段鎖的基本實現原理。多線程
HashEntry
是 ConcurrentHashMap
的基礎單元(節點),是實際數據的載體。併發
static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; HashEntry(int hash, K key, V value, HashEntry<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } /** * Sets next field with volatile write semantics. (See above * about use of putOrderedObject.) */ final void setNext(HashEntry<K,V> n) { UNSAFE.putOrderedObject(this, nextOffset, n); } // Unsafe mechanics static final sun.misc.Unsafe UNSAFE; static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = HashEntry.class; nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
Segment
繼承 ReentrantLock
鎖,用於存放數組 HashEntry[]
。在這裏能夠看出, 不管 1.7 仍是 1.8 版本, ConcurrentHashMap
底層並非對 HashMap
的擴展, 而是一樣從底層基於數組+鏈表進行功能實現。ssh
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1; // 數據節點存儲在這裏(基礎單元是數組) transient volatile HashEntry<K,V>[] table; transient int count; transient int modCount; transient int threshold; final float loadFactor; Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf; this.threshold = threshold; this.table = tab; } // 具體方法不在這裏討論... }
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); // 對於concurrencyLevel的理解, 能夠理解爲segments數組的長度,即理論上多線程併發數(分段鎖), 默認16 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; // 默認concurrencyLevel = 16, 因此ssize在默認狀況下也是16,此時 sshift = 4 // ssize = 2^sshift 即 ssize = 1 << sshift while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 段偏移量,32是由於hash是int值,int值32位,默認值狀況下此時segmentShift = 28 this.segmentShift = 32 - sshift; // 散列算法的掩碼,默認值狀況下segmentMask = 15, 定位segment的時候須要根據segment[]長度取模, 即hash(key)&(ssize - 1) this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 計算每一個segment中table的容量, 初始容量=16, 併發數=16, 則segment中的Entry[]長度爲1。 int c = initialCapacity / ssize; // 處理沒法整除的狀況,取上限 if (c * ssize < initialCapacity) ++c; // MIN_SEGMENT_TABLE_CAPACITY默認時2,cap是2的n次方 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] // 建立segments並初始化第一個segment數組,其他的segment延遲初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); // 默認併發數=16 Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
由圖和源碼可知,當用默認構造函數時,最大併發數是 16,即最大容許 16 個線程同步寫操做,且沒法擴展。因此若是咱們的場景數據量比較大時,應該設置合適的併發數,避免頻繁鎖衝突。函數
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 根據key的hash再次進行hash運算 int hash = hash(key.hashCode()); // 基於hash定位segment數組的索引。 // hash值是int值,32bits。segmentShift=28,無符號右移28位,剩下高4位,其他補0。 // segmentMask=15,二進制低4位所有是1,因此j至關於hash右移後的低4位。 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 找到對應segment s = ensureSegment(j); // 將新節點插入segment中 return s.put(key, hash, value, false); }
找出對應 segment,若是不存在就建立並初始化源碼分析
@SuppressWarnings("unchecked") private Segment<K,V> ensureSegment(int k) { // 當前的segments數組 final Segment<K,V>[] ss = this.segments; // 計算原始偏移量,在segments數組的位置 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; // 判斷沒有被初始化 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 獲取第一個segment ss[0]做爲原型 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; // 容量 float lf = proto.loadFactor; // 負載因子 int threshold = (int)(cap * lf); // 閾值 // 初始化ss[k] 內部的tab數組 // recheck HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; // 再次檢查這個ss[k] 有沒有被初始化 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 自旋。getObjectVolatile 保證了讀的可見性,因此一旦有一個線程初始化了,那麼就結束自旋 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
上一步找到 segment 位置後計算節點在 segment 中的位置。
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 是否獲取鎖,失敗自旋獲取鎖(直到成功) HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); // 失敗了纔會scanAndLockForPut V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; // 獲取到bucket位置的第一個節點 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { // hash衝突 if (e != null) { K k; // key相等則覆蓋 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } // 不相等則遍歷鏈表 e = e.next; } else { if (node != null) // 將新節點插入鏈表做爲表頭 node.setNext(first); else // 建立新節點並插入表頭 node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 判斷元素個數是否超過了閾值或者segment中數組的長度超過了MAXIMUM_CAPACITY,若是知足條件則rehash擴容! if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 擴容 rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解鎖 unlock(); } return oldValue; }
若是加鎖失敗則先走 scanAndLockForPut()
方法。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { // 根據hash獲取頭結點 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 嘗試獲取鎖,成功就返回,失敗就開始自旋 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { // 若是頭結點不存在 if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } // 和頭結點key相等 else if (key.equals(e.key)) retries = 0; else // 下一個節點 直到爲null e = e.next; } // 達到自旋的最大次數 else if (++retries > MAX_SCAN_RETRIES) { // lock()是阻塞方法。進入加鎖方法,失敗進入隊列,阻塞當前線程 lock(); break; } // TODO (retries & 1) == 0 沒理解 else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { // 頭結點變化,須要從新遍歷,說明有新的節點加入或者移除 e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
(retries & 1) == 0 沒理解是在作什麼,有小夥伴看明白了請賜教。
本文到此結束,主要是學習分段鎖是如何工做的。謝謝你們的觀看。