ConcurrentHashMap
顧名思義就是同步的HashMap,也就是線程安全的HashMap,因此本篇介紹的ConcurrentHashMap和HashMap有着很重要的關係,因此建議以前沒有了解過HashMap的能夠先看看這篇關於HashMap的原理分析《HashMap從認識到源碼分析》,本篇繼續以JDK1.8
版本的源碼進行分析,最後在介紹完ConcurrentHashMap以後會對ConcurrentHashMap、Hashtable和HashMap作一個比較和總結。html
咱們先看一下ConcurrentHashMap
實現了哪些接口、繼承了哪些類,對ConcurrentHashMap
有一個總體認知。 算法
ConcurrentHashMap
繼承
AbstractMap
接口,這個和
HashMap
同樣,而後實現了
ConcurrentMap
接口,這個和
HashMap
不同,
HashMap
是直接實現的
Map
接口。
ConcurrentHashMap
的結構,這裏列舉幾個重要的成員變量
table
、
nextTable
、
baseCount
、
sizeCtl
、
transferIndex
、
cellsBusy
Map.Entry
接口threshold
的做用,在不一樣的地方有不一樣的值也有不一樣的用途
-1
表明正在初始化-N
表明有N-1
個線程正在進行擴容操做0
表明hash表尚未被初始化ConcurrentHashMap
和HashMap
同樣都是採用拉鍊法處理哈希衝突,且都爲了防止單鏈表過長影響查詢效率,因此當鏈表長度超過某一個值時候將用紅黑樹代替鏈表進行存儲,採用了數組+鏈表+紅黑樹的結構
HashMap
和ConcurrentHashMap
仍是很類似的,只是ConcurrentHashMap
在某些操做上採用了CAS
+ synchronized
來保證併發狀況下的安全。ConcurrentHashMap
處理併發狀況下的線程安全問題,這不得不提到Hashtable
,由於Hashtable
也是線程安全的,那ConcurrentHashMap
和Hashtable
有什麼區別或者有什麼高明之處嘛?以致於官方都推薦使用ConcurrentHashMap
來代替Hashtable
Hashtable
採用對象鎖(synchronized修飾對象方法)來保證線程安全,也就是一個Hashtable
對象只有一把鎖,若是線程1拿了對象A的鎖進行有synchronized
修飾的put
方法,其餘線程是沒法操做對象A中有synchronized
修飾的方法的(如get
方法、remove
方法等),競爭激烈因此效率低下。而ConcurrentHashMap
採用CAS
+ synchronized
來保證併發安全性,且synchronized
關鍵字不是用在方法上而是用在了具體的對象上,實現了更小粒度的鎖,等會源碼分析的時候在細說這個SUN大師們的鬼斧神工Hashtable
採用的是數組 + 鏈表,當鏈表過長會影響查詢效率,而ConcurrentHashMap
採用數組 + 鏈表 + 紅黑樹,當鏈表長度超過某一個值,則將鏈表轉成紅黑樹,提升查詢效率。ConcurrentHashMap
的構造函數有5個,從數量上看就和HashMap
、Hashtable
(4個)的不一樣,多出的那個構造函數是public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
,即除了傳入容量大小、負載因子以外還多傳入了一個整型的concurrencyLevel
,這個整型是咱們預先估計的併發量,好比咱們估計併發是30
,那麼就能夠傳入30
。
其餘的4個構造函數的參數和HashMap
的同樣,而具體的初始化過程卻又不相同,HashMap
和Hashtable
傳入的容量大小和負載因子都是爲了計算出初始閾值(threshold),而ConcurrentHashMap
傳入的容量大小和負載因子是爲了計算出sizeCtl用於初始化table
,這個sizeCtl即table數組的大小,不一樣的構造函數計算sizeCtl方法都不同。數組
//無參構造函數,什麼也不作,table的初始化放在了第一次插入數據時,默認容量大小是16和HashMap的同樣,默認sizeCtl爲0
public ConcurrentHashMap() {
}
//傳入容量大小的構造函數。
public ConcurrentHashMap(int initialCapacity) {
//若是傳入的容量大小小於0 則拋出異常。
if (initialCapacity < 0)
throw new IllegalArgumentException();
//若是傳入的容量大小大於容許的最大容量值 則cap取容許的容量最大值 不然cap =
//((傳入的容量大小 + 傳入的容量大小無符號右移1位 + 1)的結果向上取最近的2冪次方),
//即若是傳入的容量大小是12 則 cap = 32(12 + (12 >>> 1) + 1=19
//向上取2的冪次方即32),這裏爲啥必定要是2的冪次方,緣由和HashMap的threshold同樣,都是爲
//了讓位運算和取模運算的結果同樣。
//MAXIMUM_CAPACITY即容許的最大容量值 爲2^30。
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
//tableSizeFor這個函數即實現了將一個整數取2的冪次方。
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//將上面計算出的cap 賦值給sizeCtl,注意此時sizeCtl爲正數,表明進行擴容的容量大小。
this.sizeCtl = cap;
}
//包含指定Map的構造函數。
//置sizeCtl爲默認容量大小 即16。
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
//傳入容量大小和負載因子的構造函數。
//默認併發數大小是1。
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
//傳入容量大小、負載因子和併發數大小的構造函數
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//若是傳入的容量大小 小於 傳入的併發數大小,
//則容量大小取併發數大小,這樣作的緣由是確保每個Node只會分配給一個線程,而一個線程則
//能夠分配到多個Node,好比當容量大小爲64,併發數大
//小爲16時,則每一個線程分配到4個Node。
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
//size = 1.0 + (long)initialCapacity / loadFactor 這裏計算方法和上面的構造函數不同。
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//若是size大於容許的最大容量值則 sizeCtl = 容許的最大容量值 不然 sizeCtl =
//size取2的冪次方。
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
複製代碼
null
,爲null
拋出異常。spread()
方法計算key的hashCode()得到哈希地址,這個HashMap類似。synchronized
,也就是容許多個線程去嘗試初始化table,可是在初始化函數裏面使用了CAS
保證只有一個線程去執行初始化過程。null
,則直接調用實現CAS
原子性操做的casTabAt()
方法將節點插入到table中,若是插入成功則完成put操做,結束返回。插入失敗(被別的線程搶先插入了)則繼續往下執行。helpTransfer()
方法協助擴容。treeifyBin()
方法將鏈表轉成紅黑樹,以避免鏈表過長影響效率。addCount()
方法,做用是將ConcurrentHashMap的鍵值對數量+1,還有另外一個做用是檢查ConcurrentHashMap是否須要擴容。public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不容許鍵值爲null,這點與線程安全的Hashtable保持一致,和HashMap不一樣。
if (key == null || value == null) throw new NullPointerException();
//取鍵key的hashCode()和HashMap、Hashtable都同樣,而後再執行spread()方法計算獲得哈希地
//址,這個spread()方法和HashMap的hash()方法同樣,都是將hashCode()作無符號右移16位,只不
//過spread()加多了 &0x7fffffff,讓結果爲正數。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//若是table數組爲空或者長度爲0(未初始化),則調用initTable()初始化table,初始化函數
//下面介紹。
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//調用實現了CAS原子性操做的tabAt方法
//tabAt方法的第一個參數是Node數組的引用,第二個參數在Node數組的下標,實現的是在Nod
//e數組中查找指定下標的Node,若是找到則返回該Node節點(鏈表頭節點),不然返回null,
//這裏的i = (n - 1)&hash便是計算待插入的節點在table的下標,即table容量-1的結果和哈
//希地址作與運算,和HashMap的算法同樣。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//若是該下標上並無節點(即鏈表爲空),則直接調用實現了CAS原子性操做的
//casTable()方法,
//casTable()方法的第一個參數是Node數組的引用,第二個參數是待操做的下標,第三
//個參數是指望值,第四個參數是待操做的Node節點,實現的是將Node數組下標爲參數二
//的節點替換成參數四的節點,若是指望值和實際值不符返回false,不然參數四的節點成
//功替換上去,返回ture,即插入成功。注意這裏:若是插入成功了則跳出for循環,插入
//失敗的話(其餘線程搶先插入了),那麼會執行到下面的代碼。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//若是該下標上的節點的哈希地址爲-1(即鏈表的頭節點爲ForwardingNode節點),則表示
//table須要擴容,值得注意的是ConcurrentHashMap初始化和擴容不是用同一個方法,而
//HashMap和Hashtable都是用同一個方法,當前線程會去協助擴容,擴容過程後面介紹。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//若是該下標上的節點既不是空也不是須要擴容,則表示這個鏈表能夠插入值,將進入到鏈表
//中,將新節點插入或者覆蓋舊值。
else {
V oldVal = null;
//經過關鍵字synchroized對該下標上的節點加鎖(至關於鎖住鎖住
//該下標上的鏈表),其餘下標上的節點並無加鎖,因此其餘線程
//能夠安全的得到其餘下標上的鏈表進行操做,也正是由於這個所
//以提升了ConcurrentHashMap的效率,提升了併發度。
synchronized (f) {
if (tabAt(tab, i) == f) {
//若是該下標上的節點的哈希地址大於等於0,則表示這是
//個鏈表。
if (fh >= 0) {
binCount = 1;
//遍歷鏈表。
for (Node<K,V> e = f;; ++binCount) {
K ek;
//若是哈希地址、鍵key相同 或者 鍵key不爲空
//且鍵key相同,則表示存在鍵key和待插入的鍵
//key相同,則執行更新值value的操做。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//若是找到了鏈表的最後一個節點都沒有找到相
//同鍵Key的,則是插入操做,將插入的鍵值新建
//個節點而且添加到鏈表尾部,這個和HashMap一
//樣都是插入到尾部。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//若是該下標上的節點的哈希地址小於0 且爲樹節點
//則將帶插入鍵值新增到紅黑樹
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
//若是插入的結果不爲null,則表示爲替換
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
key,value)) != null){
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//判斷鏈表的長度是否大於等於鏈表的閾值(8),大於則將鏈表轉成
//紅黑樹,提升效率。這點和HashMap同樣。
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
複製代碼
spread()
方法計算key的hashCode()得到哈希地址。public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//運用鍵key的hashCode()計算出哈希地址
int h = spread(key.hashCode());
//若是table不爲空 且 table長度大於0 且 計算出的下標上bucket不爲空,
//則表明這個bucket存在,進入到bucket中查找,
//其中(n - 1) & h爲計算出鍵key相對應的數組下標的算法。
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//若是哈希地址、鍵key相同則表示查找到,返回value,這裏查找到的是頭節點。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//若是bucket頭節點的哈希地址小於0,則表明bucket爲紅黑樹,在紅黑樹中查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//若是bucket頭節點的哈希地址不小於0,則表明bucket爲鏈表,遍歷鏈表,在鏈表中查找。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
複製代碼
spread()
方法計算出鍵key的哈希地址。null
。addCount
方法,將當前table存儲的鍵值對數量-1。public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
//計算須要移除的鍵key的哈希地址。
int hash = spread(key.hashCode());
//遍歷table。
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//table爲空,或者鍵key所在的bucket爲空,則跳出循環返回。
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//若是當前table正在擴容,則調用helpTransfer方法,去協助擴容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//將鍵key所在的bucket加鎖。
synchronized (f) {
if (tabAt(tab, i) == f) {
//bucket頭節點的哈希地址大於等於0,爲鏈表。
if (fh >= 0) {
validated = true;
//遍歷鏈表。
for (Node<K,V> e = f, pred = null;;) {
K ek;
//找到哈希地址、鍵key相同的節點,進行移除。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
//若是bucket的頭節點小於0,即爲紅黑樹。
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
//找到節點,而且移除。
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//調用addCount方法,將當前ConcurrentHashMap存儲的鍵值對數量-1。
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
複製代碼
table
的初始化主要由initTable()方法實現的,initTable()方法初始化一個合適大小的數組,而後設置sizeCtl。 咱們知道ConcurrentHashMap
是線程安全的,即支持多線程的,那麼一開始不少個線程同時執行put()
方法,而table
又沒初始化,那麼就會不少個線程會去執行initTable()方法嘗試初始化table,而put
方法和initTable
方法都是沒有加鎖的(synchronize),那SUN的大師們是怎麼保證線程安全的呢? 經過源碼能夠看得出,table的初始化只能由一個線程完成,可是每一個線程均可以爭搶去初始化table。安全
null
,即需不須要首次初始化,若是某個線程進到這個方法後,其餘線程已經將table初始化好了,那麼該線程結束該方法返回。null
,進入到while循環,若是sizeCtl
小於0(其餘線程正在對table初始化),那麼該線程調用Thread.yield()
掛起該線程,讓出CPU時間,該線程也從運行態轉成就緒態,等該線程從就緒態轉成運行態的時候,別的線程已經table初始化好了,那麼該線程結束while循環,結束初始化方法返回。若是從就緒態轉成運行態後,table仍然爲null
,則繼續while循環。null
且sizeCtl
不小於0,則調用實現CAS
原子性操做的compareAndSwap()
方法將sizeCtl設置成-1,告訴別的線程我正在初始化table,這樣別的線程沒法對table進行初始化。若是設置成功,則再次判斷table是否爲空,不爲空則初始化table,容量大小爲默認的容量大小(16),或者爲sizeCtl。其中sizeCtl的初始化是在構造函數中進行的,sizeCtl = ((傳入的容量大小 + 傳入的容量大小無符號右移1位 + 1)的結果向上取最近的2冪次方)private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//若是table爲null或者長度爲0, //則一直循環試圖初始化table(若是某一時刻別的線程將table初始化好了,那table不爲null,該//線程就結束while循環)。
while ((tab = table) == null || tab.length == 0) {
//若是sizeCtl小於0,
//即有其餘線程正在初始化或者擴容,執行Thread.yield()將當前線程掛起,讓出CPU時間,
//該線程從運行態轉成就緒態。
//若是該線程從就緒態轉成運行態了,此時table可能已被別的線程初始化完成,table不爲
//null,該線程結束while循環。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//若是此時sizeCtl不小於0,即沒有別的線程在作table初始化和擴容操做,
//那麼該線程就會調用Unsafe的CAS操做compareAndSwapInt嘗試將sizeCtl的值修改爲
//-1(sizeCtl=-1表示table正在初始化,別的線程若是也進入了initTable方法則會執行
//Thread.yield()將它的線程掛起 讓出CPU時間),
//若是compareAndSwapInt將sizeCtl=-1設置成功 則進入if裏面,不然繼續while循環。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次確認當前table爲null即還未初始化,這個判斷不能少。
if ((tab = table) == null || tab.length == 0) {
//若是sc(sizeCtl)大於0,則n=sc,不然n=默認的容量大
小16,
//這裏的sc=sizeCtl=0,即若是在構造函數沒有指定容量
大小,
//不然使用了有參數的構造函數,sc=sizeCtl=指定的容量大小。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//建立指定容量的Node數組(table)。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//計算閾值,n - (n >>> 2) = 0.75n當ConcurrentHashMap儲存的鍵值對數量
//大於這個閾值,就會發生擴容。
//這裏的0.75至關於HashMap的默認負載因子,能夠發現HashMap、Hashtable若是
//使用傳入了負載因子的構造函數初始化的話,那麼每次擴容,新閾值都是=新容
//量 * 負載因子,而ConcurrentHashMap無論使用的哪種構造函數初始化,
//新閾值都是=新容量 * 0.75。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
複製代碼
transfer()
方法爲ConcurrentHashMap
擴容操做的核心方法。因爲ConcurrentHashMap
支持多線程擴容,並且也沒有進行加鎖,因此實現會變得有點兒複雜。整個擴容操做分爲兩步:bash
//協助擴容方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//若是當前table不爲null 且 f爲ForwardingNode節點 且 //新的table即nextTable存在的狀況下才能協助擴容,該方法的做用是讓線程參與擴容的複製。
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//更新sizeCtl的值,+1,表明新增一個線程參與擴容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
//擴容的方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//根據服務器CPU數量來決定每一個線程負責的bucket數量,避免由於擴容的線程過多反而影響性能。
//若是CPU數量爲1,則stride=1,不然將須要遷移的bucket數量(table大小)除以CPU數量,平分給
//各個線程,可是若是每一個線程負責的bucket數量小於限制的最小是(16)的話,則強制給每一個線程
//分配16個bucket數。
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//若是nextTable還未初始化,則初始化nextTable,這個初始化和iniTable初始化同樣,只能由
//一個線程完成。
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//分配任務和控制當前線程的任務進度,這部分是transfer()的核心邏輯,描述瞭如何與其餘線
//程協同工做。
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//遷移過程(對當前指向的bucket),這部分的邏輯與HashMap相似,拿舊數組的容量當作一
//個掩碼,而後與節點的hash進行與操做,能夠得出該節點的新增有效位,若是新增有效位爲
//0就放入一個鏈表A,若是爲1就放入另外一個鏈表B,鏈表A在新數組中的位置不變(跟在舊數
//組的索引一致),鏈表B在新數組中的位置爲原索引加上舊數組容量。
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
複製代碼
addCount()
作的工做是更新table的size,也就是table存儲的鍵值對數量,在使用put()
和remove()
方法的時候都會在執行成功以後調用addCount()
來更新table的size。對於ConcurrentHashMap
來講,它到底有儲存有多少個鍵值對,誰也不知道,由於他是支持併發的,儲存的數量無時無刻都在變化着,因此說ConcurrentHashMap
也只是統計一個大概的值,爲了統計出這個值也是大費周章才統計出來的。 服務器
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//若是計算盒子不是空,或者修改baseCount的值+x失敗,則放棄對baseCount的修改。
//這裏的大概意思就是首先嚐試直接修改baseCount,達到計數的目的,若是修改baseCount失敗(
//多個線程同時修改,則失敗)
//則使用CounterCell數組來達到計數的目的。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//若是計數盒子是空的 或者隨機取餘一個數組爲空 或者修改這個槽位的變量失敗,
//即表示出現了併發,則執行fullAddCount()方法進行死循環插入,同時返回,
//不然表明修改這個槽位的變量成功了,繼續往下執行,不進入if。
//每一個線程都會經過ThreadLocalRandom.getProbe() & m尋址找到屬於它的CounterCell,
//而後進行計數。ThreadLocalRandom是一個線程私有的僞隨機數生成器,
//每一個線程的probe都是不一樣的。CounterCell數組的大小永遠是一個2的n次方,初始容量
//爲2,每次擴容的新容量都是以前容量乘以二,處於性能考慮,它的最大容量上限是機器
//的CPU數量,因此說CounterCell數組的碰撞衝突是很嚴重的。
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//併發過大,使用CAS修改CounterCell失敗時候執行fullAddCount,
fullAddCount(x, uncontended);
return;
}
//若是上面對盒子的賦值成功,且check<=1,則直接返回,不然調用sumConut()方法計算
if (check <= 1)
return;
s = sumCount();
}
//若是check>=0,則檢查是否須要擴容。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
複製代碼
size
和mappingCount
方法都是用來統計table的size的,這二者不一樣的地方在size
返回的是一個int
類型,便可以表示size的範圍是[-2^31,2^31-1],超過這個範圍就返回int能表示的最大值,mappingCount
返回的是一個long
類型,便可以表示size的範圍是[-2^63,2^63-1]。
這兩個方法都是調用的sumCount()方法實現統計。數據結構
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
複製代碼
\ | HashMap | Hashtable | ConcurrentHashMap |
---|---|---|---|
是否線程安全 | 否 | 是 | 是 |
線程安全採用的方式 | 採用synchronized 類鎖,效率低 |
採用CAS + synchronized ,鎖住的只有當前操做的bucket,不影響其餘線程對其餘bucket的操做,效率高 |
|
數據結構 | 數組+鏈表+紅黑樹(鏈表長度超過8則轉紅黑樹) | 數組+鏈表 | 數組+鏈表+紅黑樹(鏈表長度超過8則轉紅黑樹) |
是否容許null 鍵值 |
是 | 否 | 否 |
哈希地址算法 | (key的hashCode)^(key的hashCode無符號右移16位) | key的hashCode | ( (key的hashCode)^(key的hashCode無符號右移16位) )&0x7fffffff |
定位算法 | 哈希地址&(容量大小-1) | (哈希地址&0x7fffffff)%容量大小 | 哈希地址&(容量大小-1) |
擴容算法 | 當鍵值對數量大於閾值,則容量擴容到原來的2倍 | 當鍵值對數量大於等於閾值,則容量擴容到原來的2倍+1 | 當鍵值對數量大於等於sizeCtl,單線程建立新哈希表,多線程複製bucket到新哈希表,容量擴容到原來的2倍 |
鏈表插入 | 將新節點插入到鏈表尾部 | 將新節點插入到鏈表頭部 | 將新節點插入到鏈表尾部 |
繼承的類 | 繼承abstractMap 抽象類 |
繼承Dictionary 抽象類 |
繼承abstractMap 抽象類 |
實現的接口 | 實現Map 接口 |
實現Map 接口 |
實現ConcurrentMap 接口 |
默認容量大小 | 16 | 11 | 16 |
默認負載因子 | 0.75 | 0.75 | 0.75 |
統計size方式 | 直接返回成員變量size |
直接返回成員變量count |
遍歷CounterCell 數組的值進行累加,最後加上baseCount 的值即爲size |
【死磕Java併發】—–J.U.C之Java併發容器:ConcurrentHashMap
Map 你們族的那點事兒 ( 7 ) :ConcurrentHashMap
Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析
Java 8 ConcurrentHashMap源碼分析多線程
原文地址:https://ddnd.cn/2019/03/10/jdk1-8-concurrenthashmap/併發