前段時間把 JDK 1.6 中的 HashMap
主要的一些操做源碼分析了一次。既然把 HashMap
源碼分析了, 就順便把 JDK 1.6 中 ConcurrentHashMap
的主要一些操做源碼分析一下。由於其中有不少思想是值得咱們去借鑑的。 ConcurrentHashMap
中的分段鎖。這個思想在 JDK 1.8 中 爲了優化 JUC 下的原子鎖 CAS 高併發狀況下致使自旋次數太多效率低下。引用 Adder
。其中就是借鑑了分段鎖的思想。AtomicLong
對比 LongAdder
。 有興趣能夠查看。java
若是有人問你瞭解 ConcurrentHashMap
嗎? 你能夠這樣回答,瞭解。 ConcurrentHashMap
是爲了 取代 HashMap
非線程安全的,一種線程安全實現類。它有一個 Segment
數組,Segment
自己就是至關於一個 HashMap
對象。裏面是一個 HashEntry
數組,數組中的每個 HashEntry
都是一個鍵值對,也是一個鏈表的表頭。若是別人問你,那 ConcurrentHashMap
get
或者 put
一個對象的時候是怎麼操做的 ,你該怎麼回答。emmm..... 繼續往下看。會有你要的答案。c++
分析源碼,先從構造函數開始。直接研究帶全部參數的構造方法,其餘一些重載的構造方法,最裏面仍是調用了該構造方法。在看構造方法以前,須要 明白 sshift 是表示併發數的2的幾回方 好比並發數是16 那麼他的值就是 4 。ssize 是 segment
數組的大小。算法
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 用來與 key的hashCode >>> 運算 獲取HashCode的高位
segmentShift = 32 - sshift;
// 高位與 它作與運算 eg 假如 默認的建立該對象 那麼 segmentShift = 28 segmentMask=15(二進制爲1111) 假如如今put一個值 他的key的HashCode值爲2的32次方 那麼 他在segment裏面的定位時 2的32次方 無符號 高位補零 右移28個 那麼就等於 10000(二進制) 等於 16 與 1111 作與運算 等於0 也就是定位在 segment[0]上 。
segmentMask = ssize - 1;
// segment數組大小爲 16
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
// segment數組中 每一個HashEntry數組的大小,
int cap = 1;
while (cap < c)
cap <<= 1;
// 爲segment數組中的每一個HashEntry數組初始化大小,每一個semengt中只有一個HashEntry數組。若是你設置的 ConcurrentHashMap 初始化大小爲16的話,則 segment數組中每一個的HashEntry的大小爲1,若是你初始化他的大小爲28 的話。它會根據上面的運算,cap的大小爲2,也就是segment數組中的每一個HashEntry數組的大小爲2 ,總的大小爲32。
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor);
}
複製代碼
上面的註釋應該都挺清楚了,要注意的是 ConcurrentHashMap
的大小 是全部 Segment
數組中每一個HashEntry
數組的大小相加的和。數組
ConcurrentHashMap
每次 put
的時候都是須要加鎖的,只不過會鎖住他所在的那個Segment
數組位置。其餘的不鎖,這也就是分段鎖,默認支持16個併發。提及put,以數組的形式存儲的數據,就會涉及到擴容。這樣是接下來須要好好討論的一個事情。安全
public V put(K key, V value) {
// key value 不能爲null
if (value == null)
throw new NullPointerException();
// 獲取hash值
int hash = hash(key.hashCode());
// 先獲取hash二進制數的高位與15的二進制作與運算,獲得segment數組的位置。
return segmentFor(hash).put(key, hash, value, false);
}
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 鎖住
lock();
try {
int c = count;
if (c++ > threshold) // ensure capacity
// 擴容操做
rehash();
// 獲取 Segment數組中的其中的HashEntry數組
HashEntry<K,V>[] tab = table;
// 獲取在在HashEntry數組中的位置。
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
// 判斷是不是該key。
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
// 若是存在該key的數據 ,那麼更新該值 返回舊值
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
//頭插法插入 tab[index]
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();
}
}
// 看下擴容操做的細節
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return;
// HashEntry數組,新的數組爲它的兩倍
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
// 閾值
threshold = (int)(newTable.length * loadFactor);
//他的二進制添加覺得 原來他的大小爲3 那麼二進制就是11 如今就爲 7 二進制爲 111
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
// 舊的HashEntry。
HashEntry<K,V> e = oldTable[i];
if (e != null) {
// 下一個 該HashEntry數組上的 HashEntry是否爲鏈表,有下一個值。
HashEntry<K,V> next = e.next;
// 該HashEntry的新位置 若是高位爲1 那麼他在HashEntry數組中的位置就是老的HashEntry數組中的加上這個倍數。舉個例子
// 假如e.hash 原來的的二進制位...111 老的HashEntry數組的大小爲 4 那麼e.hash和 4-1 也就是3 作與運算 獲得的值也就是二進制的11
// 值位3 如今新的HashEntry數組的大小爲 8 也就是 e.hash 和 8-1 作與運算 獲得的值 也就是二進制位 111 位 7 。
int idx = e.hash & sizeMask;
// 沒有的話就直接放入該位置了,若是有的話往下看:
if (next == null)
newTable[idx] = e;
else {
HashEntry<K,V> lastRun = e;
// 假如idx 等於 7
int lastIdx = idx;
// 找到最後一個 HashEntry中的位置,而且後面的HashEntry的位置都是同樣的。舉個例子
// 假如這個鏈表中的全部HashEntry的Hash值爲 1-5-1-5-5-5 。那麼最後lastIdx = 5 也就是1-5-1後面的這個5 。lastRun 爲 1-5-1後面的這個5的HashEnrty。
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
//
lastRun = last;
}
}
// 將 lastRun 複製給 這個新的Table 那麼後面還有 5-5-5 這些的就不用移動了 直接帶過來了。 這就是上面那個for循環作的事情
newTable[lastIdx] = lastRun;
// 對前面的 1-5-1作操做了 1就是在新HashEntry書中的1的位置 5的後就是頭插法 ,查到新HashEntry的頭部了
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}
複製代碼
其實put
方法中有點難理解的就是 把查找到後面若是有全部相同的 HashEntry
的key
的位置是同樣的話,就不用額外的進行Hash
從新定位了。不知道我描述的清不清楚。若是還有不清楚的話,能夠私信一下我。併發
ConcurrentHashMap
中 get
方法是不會加鎖的,若是get
的值爲null的時候,這個時候會對這個HashEntry
進行加鎖。預防此時併發出現的問題。ssh
public V get(Object key) {
//定位Segment數組中的HashEntry數組爲位置
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
V get(Object key, int hash) {
// 曾經put進去過,也就是裏面有值
if (count != 0) { // read-volatile
// 定位HashEntry數組中的HashEntry。
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v;
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
複製代碼
ConcurrentHashMap
中get
方法是比較簡單的。看一看就知道了。函數
這一遍ConcurrentHashMap
源碼分析,能夠說是本身寫了大半個月吧。好早以前就準備寫了。老是寫一點,而後就停筆了。加上本身換了公司的緣由,又忙上了一段時間,致使一拖再落。哇,嚴重拖延症患者。上面本身也是所有透徹以後寫下來的,若是有些表達不夠清晰的還得多加包涵,若是有不一樣的能夠下方瀏覽討論一下。上面不少關鍵的代碼我都寫上了註釋,能夠配合着註釋,而後本身對源碼進行研究,查看,若是還有不是很透徹的話,本身多翻一翻其餘人寫的。最近一直在寫LeetCode上的動態規劃這些算法題。其實也就是抄一遍。等之後有了感悟再來寫這一些吧。高併發