之前寫過介紹HashMap的文章,文中提到過HashMap在put的時候,插入的元素超過了容量(由負載因子決定)的範圍就會觸發擴容操做,就是rehash,這個會從新將原數組的內容從新hash到新的擴容數組中,在多線程的環境下,存在同時其餘的元素也在進行put操做,若是hash值相同,可能出現同時在同一數組下用鏈表表示,形成閉環,致使在get時會出現死循環,因此HashMap是線程不安全的。java
咱們來了解另外一個鍵值存儲集合HashTable,它是線程安全的,它在全部涉及到多線程操做的都加上了synchronized關鍵字來鎖住整個table,這就意味着全部的線程都在競爭一把鎖,在多線程的環境下,它是安全的,可是無疑是效率低下的。node
其實HashTable有不少的優化空間,鎖住整個table這麼粗暴的方法能夠變相的柔和點,好比在多線程的環境下,對不一樣的數據集進行操做時其實根本就不須要去競爭一個鎖,由於他們不一樣hash值,不會由於rehash形成線程不安全,因此互不影響,這就是鎖分離技術,將鎖的粒度下降,利用多個鎖來控制多個小的table,這就是這篇文章的主角ConcurrentHashMap JDK1.7版本的核心思想數組
在JDK1.7版本中,ConcurrentHashMap的數據結構是由一個Segment數組和多個HashEntry組成,以下圖所示:安全
Segment數組的意義就是將一個大的table分割成多個小的table來進行加鎖,也就是上面的提到的鎖分離技術,而每個Segment元素存儲的是HashEntry數組+鏈表,這個和HashMap的數據存儲結構同樣數據結構
ConcurrentHashMap的初始化是會經過位與運算來初始化Segment的大小,用ssize來表示,以下所示多線程
1
2
3
4
5
6
|
int
sshift =
0
;
int
ssize =
1
;
while
(ssize < concurrencyLevel) {
++sshift;
ssize <<=
1
;
}
|
如上所示,由於ssize用位於運算來計算(ssize <<=1
),因此Segment的大小取值都是以2的N次方,無關concurrencyLevel的取值,固然concurrencyLevel最大隻能用16位的二進制來表示,即65536,換句話說,Segment的大小最多65536個,沒有指定concurrencyLevel元素初始化,Segment的大小ssize默認爲16併發
每個Segment元素下的HashEntry的初始化也是按照位於運算來計算,用cap來表示,以下所示dom
1
2
3
|
int
cap =
1
;
while
(cap < c)
cap <<=
1
;
|
如上所示,HashEntry大小的計算也是2的N次方(cap <<=1), cap的初始值爲1,因此HashEntry最小的容量爲2ssh
對於ConcurrentHashMap的數據插入,這裏要進行兩次Hash去定位數據的存儲位置ide
1
|
static
class
Segment<K,V>
extends
ReentrantLock
implements
Serializable {
|
從上Segment的繼承體系能夠看出,Segment實現了ReentrantLock,也就帶有鎖的功能,當執行put操做時,會進行第一次key的hash來定位Segment的位置,若是該Segment尚未初始化,即經過CAS操做進行賦值,而後進行第二次hash操做,找到相應的HashEntry的位置,這裏會利用繼承過來的鎖的特性,在將數據插入指定的HashEntry位置時(鏈表的尾端),會經過繼承ReentrantLock的tryLock()方法嘗試去獲取鎖,若是獲取成功就直接插入相應的位置,若是已經有線程獲取該Segment的鎖,那當前線程會以自旋的方式去繼續的調用tryLock()方法去獲取鎖,超過指定次數就掛起,等待喚醒
ConcurrentHashMap的get操做跟HashMap相似,只是ConcurrentHashMap第一次須要通過一次hash定位到Segment的位置,而後再hash定位到指定的HashEntry,遍歷該HashEntry下的鏈表進行對比,成功就返回,不成功就返回null
計算ConcurrentHashMap的元素大小是一個有趣的問題,由於他是併發操做的,就是在你計算size的時候,他還在併發的插入數據,可能會致使你計算出來的size和你實際的size有相差(在你return size的時候,插入了多個數據),要解決這個問題,JDK1.7版本用兩種方案
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
try
{
for
(;;) {
if
(retries++ == RETRIES_BEFORE_LOCK) {
for
(
int
j =
0
; j < segments.length; ++j) ensureSegment(j).lock();
// force creation
}
sum = 0L;
size =
0
;
overflow =
false
;
for
(
int
j =
0
; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if
(seg !=
null
) { sum += seg.modCount;
int
c = seg.count;
if
(c <
0
|| (size += c) <
0
)
overflow =
true
;
} }
if
(sum == last)
break
;
last = sum; } }
finally
{
if
(retries > RETRIES_BEFORE_LOCK) {
for
(
int
j =
0
; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
|
JDK1.8的實現已經摒棄了Segment的概念,而是直接用Node數組+鏈表+紅黑樹的數據結構來實現,併發控制使用Synchronized和CAS來操做,整個看起來就像是優化過且線程安全的HashMap,雖然在JDK1.8中還能看到Segment的數據結構,可是已經簡化了屬性,只是爲了兼容舊版本
在深刻JDK1.8的put和get實現以前要知道一些常量設計和數據結構,這些是構成ConcurrentHashMap實現結構的基礎,下面看一下基本屬性:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
// node數組最大容量:2^30=1073741824
private
static
final
int
MAXIMUM_CAPACITY =
1
<<
30
;
// 默認初始值,必須是2的幕數
private
static
final
int
DEFAULT_CAPACITY =
16
;
//數組可能最大值,須要與toArray()相關方法關聯
static
final
int
MAX_ARRAY_SIZE = Integer.MAX_VALUE -
8
;
//併發級別,遺留下來的,爲兼容之前的版本
private
static
final
int
DEFAULT_CONCURRENCY_LEVEL =
16
;
// 負載因子
private
static
final
float
LOAD_FACTOR =
0
.75f;
// 鏈表轉紅黑樹閥值,> 8 鏈表轉換爲紅黑樹
static
final
int
TREEIFY_THRESHOLD =
8
;
//樹轉鏈表閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))
static
final
int
UNTREEIFY_THRESHOLD =
6
;
static
final
int
MIN_TREEIFY_CAPACITY =
64
;
private
static
final
int
MIN_TRANSFER_STRIDE =
16
;
private
static
int
RESIZE_STAMP_BITS =
16
;
// 2^15-1,help resize的最大線程數
private
static
final
int
MAX_RESIZERS = (
1
<< (
32
- RESIZE_STAMP_BITS)) -
1
;
// 32-16=16,sizeCtl中記錄size大小的偏移量
private
static
final
int
RESIZE_STAMP_SHIFT =
32
- RESIZE_STAMP_BITS;
// forwarding nodes的hash值
static
final
int
MOVED = -
1
;
// 樹根節點的hash值
static
final
int
TREEBIN = -
2
;
// ReservationNode的hash值
static
final
int
RESERVED = -
3
;
// 可用處理器數量
static
final
int
NCPU = Runtime.getRuntime().availableProcessors();
//存放node的數組
transient
volatile
Node<K,V>[] table;
/*控制標識符,用來控制table的初始化和擴容的操做,不一樣的值有不一樣的含義
*當爲負數時:-
1
表明正在初始化,-N表明有N-
1
個線程正在 進行擴容
*當爲
0
時:表明當時的table尚未被初始化
*當爲正數時:表示初始化或者下一次進行擴容的大小
private
transient
volatile
int
sizeCtl;
|
基本屬性定義了ConcurrentHashMap的一些邊界以及操做時的一些控制,下面看一些內部的一些結構組成,這些是整個ConcurrentHashMap整個數據結構的核心
Node是ConcurrentHashMap存儲結構的基本單元,繼承於HashMap中的Entry,用於存儲數據,源代碼以下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
static
class
Node<K,V>
implements
Map.Entry<K,V> {
//鏈表的數據結構
final
int
hash;
final
K key;
//val和next都會在擴容時發生變化,因此加上volatile來保持可見性和禁止重排序
volatile
V val;
volatile
Node<K,V> next;
Node(
int
hash, K key, V val, Node<K,V> next) {
this
.hash = hash;
this
.key = key;
this
.val = val;
this
.next = next;
}
public
final
K getKey() {
return
key; }
public
final
V getValue() {
return
val; }
public
final
int
hashCode() {
return
key.hashCode() ^ val.hashCode(); }
public
final
String toString(){
return
key +
"="
+ val; }
//不容許更新value
public
final
V setValue(V value) {
throw
new
UnsupportedOperationException();
}
public
final
boolean
equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return
((o
instanceof
Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) !=
null
&&
(v = e.getValue()) !=
null
&&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
//用於map中的get()方法,子類重寫
Node<K,V> find(
int
h, Object k) {
Node<K,V> e =
this
;
if
(k !=
null
) {
do
{
K ek;
if
(e.hash == h &&
((ek = e.key) == k || (ek !=
null
&& k.equals(ek))))
return
e;
}
while
((e = e.next) !=
null
);
}
return
null
;
}
}
|
Node數據結構很簡單,從上可知,就是一個鏈表,可是隻容許對數據進行查找,不容許進行修改
TreeNode繼承與Node,可是數據結構換成了二叉樹結構,它是紅黑樹的數據的存儲結構,用於紅黑樹中存儲數據,當鏈表的節點數大於8時會轉換成紅黑樹的結構,他就是經過TreeNode做爲存儲結構代替Node來轉換成黑紅樹源代碼以下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
static
final
class
TreeNode<K,V>
extends
Node<K,V> {
//樹形結構的屬性定義
TreeNode<K,V> parent;
// red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
// needed to unlink next upon deletion
boolean
red;
//標誌紅黑樹的紅節點
TreeNode(
int
hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super
(hash, key, val, next);
this
.parent = parent;
}
Node<K,V> find(
int
h, Object k) {
return
findTreeNode(h, k,
null
);
}
//根據key查找 從根節點開始找出相應的TreeNode,
final
TreeNode<K,V> findTreeNode(
int
h, Object k, Class<?> kc) {
if
(k !=
null
) {
TreeNode<K,V> p =
this
;
do
{
int
ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if
((ph = p.hash) > h)
p = pl;
else
if
(ph < h)
p = pr;
else
if
((pk = p.key) == k || (pk !=
null
&& k.equals(pk)))
return
p;
else
if
(pl ==
null
)
p = pr;
else
if
(pr ==
null
)
p = pl;
else
if
((kc !=
null
||
(kc = comparableClassFor(k)) !=
null
) &&
(dir = compareComparables(kc, k, pk)) !=
0
)
p = (dir <
0
) ? pl : pr;
else
if
((q = pr.findTreeNode(h, k, kc)) !=
null
)
return
q;
else
p = pl;
}
while
(p !=
null
);
}
return
null
;
}
}
|
TreeBin從字面含義中能夠理解爲存儲樹形結構的容器,而樹形結構就是指TreeNode,因此TreeBin就是封裝TreeNode的容器,它提供轉換黑紅樹的一些條件和鎖的控制,部分源碼結構以下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
static
final
class
TreeBin<K,V>
extends
Node<K,V> {
//指向TreeNode列表和根節點
TreeNode<K,V> root;
volatile
TreeNode<K,V> first;
volatile
Thread waiter;
volatile
int
lockState;
// 讀寫鎖狀態
static
final
int
WRITER =
1
;
// 獲取寫鎖的狀態
static
final
int
WAITER =
2
;
// 等待寫鎖的狀態
static
final
int
READER =
4
;
// 增長數據時讀鎖的狀態
/**
* 初始化紅黑樹
*/
TreeBin(TreeNode<K,V> b) {
super
(TREEBIN,
null
,
null
,
null
);
this
.first = b;
TreeNode<K,V> r =
null
;
for
(TreeNode<K,V> x = b, next; x !=
null
; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right =
null
;
if
(r ==
null
) {
x.parent =
null
;
x.red =
false
;
r = x;
}
else
{
K k = x.key;
int
h = x.hash;
Class<?> kc =
null
;
for
(TreeNode<K,V> p = r;;) {
int
dir, ph;
K pk = p.key;
if
((ph = p.hash) > h)
dir = -
1
;
else
if
(ph < h)
dir =
1
;
else
if
((kc ==
null
&&
(kc = comparableClassFor(k)) ==
null
) ||
(dir = compareComparables(kc, k, pk)) ==
0
)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if
((p = (dir <=
0
) ? p.left : p.right) ==
null
) {
x.parent = xp;
if
(dir <=
0
)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break
;
}
}
}
}
this
.root = r;
assert
checkInvariants(root);
}
......
}
|
介紹了ConcurrentHashMap主要的屬性與內部的數據結構,如今經過一個簡單的例子以debug的視角看看ConcurrentHashMap的具體操做細節
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public
class
TestConcurrentHashMap{
public
static
void
main(String[] args){
ConcurrentHashMap<String,String> map =
new
ConcurrentHashMap();
//初始化ConcurrentHashMap
//新增我的信息
map.put(
"id"
,
"1"
);
map.put(
"name"
,
"andy"
);
map.put(
"sex"
,
"男"
);
//獲取姓名
String name = map.get(
"name"
);
Assert.assertEquals(name,
"andy"
);
//計算大小
int
size = map.size();
Assert.assertEquals(size,
3
);
}
}
|
咱們先經過new ConcurrentHashMap()
來進行初始化
1
2
|
public
ConcurrentHashMap() {
}
|
由上你會發現ConcurrentHashMap的初始化實際上是一個空實現,並無作任何事,這裏後面會講到,這也是和其餘的集合類有區別的地方,初始化操做並非在構造函數實現的,而是在put操做中實現,固然ConcurrentHashMap還提供了其餘的構造函數,有指定容量大小或者指定負載因子,跟HashMap同樣,這裏就不作介紹了
在上面的例子中咱們新增我的信息會調用put方法,咱們來看下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
public
V put(K key, V value) {
return
putVal(key, value,
false
);
}
/** Implementation for put and putIfAbsent */
final
V putVal(K key, V value,
boolean
onlyIfAbsent) {
if
(key ==
null
|| value ==
null
)
throw
new
NullPointerException();
int
hash = spread(key.hashCode());
//兩次hash,減小hash衝突,能夠均勻分佈
int
binCount =
0
;
for
(Node<K,V>[] tab = table;;) {
//對這個table進行迭代
Node<K,V> f;
int
n, i, fh;
//這裏就是上面構造方法沒有進行初始化,在這裏進行判斷,爲null就調用initTable進行初始化,屬於懶漢模式初始化
if
(tab ==
null
|| (n = tab.length) ==
0
)
tab = initTable();
else
if
((f = tabAt(tab, i = (n -
1
) & hash)) ==
null
) {
//若是i位置沒有數據,就直接無鎖插入
if
(casTabAt(tab, i,
null
,
new
Node<K,V>(hash, key, value,
null
)))
break
;
// no lock when adding to empty bin
}
else
if
((fh = f.hash) == MOVED)
//若是在進行擴容,則先進行擴容操做
tab = helpTransfer(tab, f);
else
{
V oldVal =
null
;
//若是以上條件都不知足,那就要進行加鎖操做,也就是存在hash衝突,鎖住鏈表或者紅黑樹的頭結點
synchronized
(f) {
if
(tabAt(tab, i) == f) {
if
(fh >=
0
) {
//表示該節點是鏈表結構
binCount =
1
;
for
(Node<K,V> e = f;; ++binCount) {
K ek;
//這裏涉及到相同的key進行put就會覆蓋原先的value
if
(e.hash == hash &&
((ek = e.key) == key ||
(ek !=
null
&& key.equals(ek)))) {
oldVal = e.val;
if
(!onlyIfAbsent)
e.val = value;
break
;
}
Node<K,V> pred = e;
if
((e = e.next) ==
null
) {
//插入鏈表尾部
pred.next =
new
Node<K,V>(hash, key,
value,
null
);
break
;
}
}
}
else
if
(f
instanceof
TreeBin) {
//紅黑樹結構
Node<K,V> p;
binCount =
2
;
//紅黑樹結構旋轉插入
if
((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) !=
null
) {
oldVal = p.val;
if
(!onlyIfAbsent)
p.val = value;
}
}
}
}
if
(binCount !=
0
) {
//若是鏈表的長度大於8時就會進行紅黑樹的轉換
if
(binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if
(oldVal !=
null
)
return
oldVal;
break
;
}
}
}
addCount(1L, binCount);
//統計size,而且檢查是否須要擴容
return
null
;
}
|
這個put的過程很清晰,對當前的table進行無條件自循環直到put成功,能夠分紅如下六步流程來概述
如今咱們來對每一步的細節進行源碼分析,在第一步中,符合條件會進行初始化操做,咱們來看看initTable()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private
final
Node<K,V>[] initTable() {
Node<K,V>[] tab;
int
sc;
while
((tab = table) ==
null
|| tab.length ==
0
) {
//空的table才能進入初始化操做
if
((sc = sizeCtl) <
0
)
//sizeCtl<0表示其餘線程已經在初始化了或者擴容了,掛起當前線程
Thread.yield();
// lost initialization race; just spin
else
if
(U.compareAndSwapInt(
this
, SIZECTL, sc, -
1
)) {
//CAS操做SIZECTL爲-1,表示初始化狀態
try
{
if
((tab = table) ==
null
|| tab.length ==
0
) {
int
n = (sc >
0
) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings
(
"unchecked"
)
Node<K,V>[] nt = (Node<K,V>[])
new
Node<?,?>[n];
//初始化
table = tab = nt;
sc = n - (n >>>
2
);
//記錄下次擴容的大小
}
}
finally
{
sizeCtl = sc;
}
break
;
}
}
return
tab;
}
|
在第二步中沒有hash衝突就直接調用Unsafe的方法CAS插入該元素,進入第三步若是容器正在擴容,則會調用helpTransfer()方法幫助擴容,如今咱們跟進helpTransfer()方法看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/**
*幫助從舊的table的元素複製到新的table中
*/
final
Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab;
int
sc;
if
(tab !=
null
&& (f
instanceof
ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) !=
null
) {
//新的table nextTba已經存在前提下才能幫助擴容
int
rs = resizeStamp(tab.length);
while
(nextTab == nextTable && table == tab &&
(sc = sizeCtl) <
0
) {
if
((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +
1
||
sc == rs + MAX_RESIZERS || transferIndex <=
0
)
break
;
if
(U.compareAndSwapInt(
this
, SIZECTL, sc, sc +
1
)) {
transfer(tab, nextTab);
//調用擴容方法
break
;
}
}
return
nextTab;
}
return
table;
}
|
其實helpTransfer()方法的目的就是調用多個工做線程一塊兒幫助進行擴容,這樣的效率就會更高,而不是隻有檢查到要擴容的那個線程進行擴容操做,其餘線程就要等待擴容操做完成才能工做
既然這裏涉及到擴容的操做,咱們也一塊兒來看看擴容方法transfer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
private
final
void
transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int
n = tab.length, stride;
// 每核處理的量小於16,則強制賦值16
if
((stride = (NCPU >
1
) ? (n >>>
3
) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// subdivide range
if
(nextTab ==
null
) {
// initiating
try
{
@SuppressWarnings
(
"unchecked"
)
Node<K,V>[] nt = (Node<K,V>[])
new
Node<?,?>[n <<
1
];
//構建一個nextTable對象,其容量爲原來容量的兩倍
nextTab = nt;
}
catch
(Throwable ex) {
// try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return
;
}
nextTable = nextTab;
transferIndex = n;
}
int
nextn = nextTab.length;
// 鏈接點指針,用於標誌位(fwd的hash值爲-1,fwd.nextTable=nextTab)
ForwardingNode<K,V> fwd =
new
ForwardingNode<K,V>(nextTab);
// 當advance == true時,代表該節點已經處理過了
boolean
advance =
true
;
boolean
finishing =
false
;
// to ensure sweep before committing nextTab
for
(
int
i =
0
, bound =
0
;;) {
Node<K,V> f;
int
fh;
// 控制 --i ,遍歷原hash表中的節點
while
(advance) {
int
nextIndex, nextBound;
if
(--i >= bound || finishing)
advance =
false
;
else
if
((nextIndex = transferIndex) <=
0
) {
i = -
1
;
advance =
false
;
}
// 用CAS計算獲得的transferIndex
else
if
(U.compareAndSwapInt
(
this
, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride :
0
))) {
bound = nextBound;
i = nextIndex -
1
;
advance =
false
;
}
}
if
(i <
0
|| i >= n || i + n >= nextn) {
int
sc;
// 已經完成全部節點複製了
if
(finishing) {
nextTable =
null
;
table = nextTab;
// table 指向nextTable
sizeCtl = (n <<
1
) - (n >>>
1
);
// sizeCtl閾值爲原來的1.5倍
return
;
// 跳出死循環,
}
// CAS 更擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做
if
(U.compareAndSwapInt(
this
, SIZECTL, sc = sizeCtl, sc -
1
)) {
if
((sc -
2
) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return
;
finishing = advance =
true
;
i = n;
// recheck before commit
}
}
// 遍歷的節點爲null,則放入到ForwardingNode 指針節點
else
if
((f = tabAt(tab, i)) ==
null
)
advance = casTabAt(tab, i,
null
, fwd);
// f.hash == -1 表示遍歷到了ForwardingNode節點,意味着該節點已經處理過了
// 這裏是控制併發擴容的核心
else
if
((fh = f.hash) == MOVED)
advance =
true
;
// already processed
else
{
// 節點加鎖
synchronized
(f) {
// 節點複製工做
if
(tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 ,表示爲鏈表節點
if
(fh >=
0
) {
// 構造兩個鏈表 一個是原鏈表 另外一個是原鏈表的反序排列
int
runBit = fh & n;
Node<K,V> lastRun = f;
for
(Node<K,V> p = f.next; p !=
null
; p = p.next) {
int
b = p.hash & n;
if
(b != runBit) {
runBit = b;
lastRun = p;
}
}
if
(runBit ==
0
) {
ln = lastRun;
hn =
null
;
}
else
{
hn = lastRun;
ln =
null
;
}
for
(Node<K,V> p = f; p != lastRun; p = p.next) {
int
ph = p.hash; K pk = p.key; V pv = p.val;
if
((ph & n) ==
0
)
ln =
new
Node<K,V>(ph, pk, pv, ln);
else
hn =
new
Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置處插上鍊表
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置處插上鍊表
setTabAt(nextTab, i + n, hn);
// 在table i 位置處插上ForwardingNode 表示該節點已經處理過了
setTabAt(tab, i, fwd);
// advance = true 能夠執行--i動做,遍歷節點
advance =
true
;
}
// 若是是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致
else
if
(f
instanceof
TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo =
null
, loTail =
null
;
TreeNode<K,V> hi =
null
, hiTail =
null
;
int
lc =
0
, hc =
0
;
for
(Node<K,V> e = t.first; e !=
null
; e = e.next) {
int
h = e.hash;
TreeNode<K,V> p =
new
TreeNode<K,V>
(h, e.key, e.val,
null
,
null
);
if
((h & n) ==
0
) {
if
((p.prev = loTail) ==
null
)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else
{
if
((p.prev = hiTail) ==
null
)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 擴容後樹節點個數若<=6,將樹轉鏈表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc !=
0
) ?
new
TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc !=
0
) ?
new
TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance =
true
;
}
}
}
}
}
}
|
其實helpTransfer()方法的目的就是調用多個工做線程一塊兒幫助進行擴容,這樣的效率就會更高,而不是隻有檢查到要擴容的那個線程進行擴容操做,其餘線程就要等待擴容操做完成才能工做
既然這裏涉及到擴容的操做,咱們也一塊兒來看看擴容方法transfer()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
private
final
void
transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int
n = tab.length, stride;
// 每核處理的量小於16,則強制賦值16
if
((stride = (NCPU >
1
) ? (n >>>
3
) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// subdivide range
if
(nextTab ==
null
) {
// initiating
try
{
@SuppressWarnings
(
"unchecked"
)
Node<K,V>[] nt = (Node<K,V>[])
new
Node<?,?>[n <<
1
];
//構建一個nextTable對象,其容量爲原來容量的兩倍
nextTab = nt;
}
catch
(Throwable ex) {
// try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return
;
}
nextTable = nextTab;
transferIndex = n;
}
int
nextn = nextTab.length;
// 鏈接點指針,用於標誌位(fwd的hash值爲-1,fwd.nextTable=nextTab)
ForwardingNode<K,V> fwd =
new
ForwardingNode<K,V>(nextTab);
// 當advance == true時,代表該節點已經處理過了
boolean
advance =
true
;
boolean
finishing =
false
;
// to ensure sweep before committing nextTab
for
(
int
i =
0
, bound =
0
;;) {
Node<K,V> f;
int
fh;
// 控制 --i ,遍歷原hash表中的節點
while
(advance) {
int
nextIndex, nextBound;
if
(--i >= bound || finishing)
advance =
false
;
else
if
((nextIndex = transferIndex) <=
0
) {
i = -
1
;
advance =
false
;
}
// 用CAS計算獲得的transferIndex
else
if
(U.compareAndSwapInt
(
this
, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride :
0
))) {
bound = nextBound;
i = nextIndex -
1
;
advance =
false
;
}
}
if
(i <
0
|| i >= n || i + n >= nextn) {
int
sc;
// 已經完成全部節點複製了
if
(finishing) {
nextTable =
null
;
table = nextTab;
// table 指向nextTable
sizeCtl = (n <<
1
) - (n >>>
1
);
// sizeCtl閾值爲原來的1.5倍
return
;
// 跳出死循環,
}
// CAS 更擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做
if
(U.compareAndSwapInt(
this
, SIZECTL, sc = sizeCtl, sc -
1
)) {
if
((sc -
2
) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return
;
finishing = advance =
true
;
i = n;
// recheck before commit
}
}
// 遍歷的節點爲null,則放入到ForwardingNode 指針節點
else
if
((f = tabAt(tab, i)) ==
null
)
advance = casTabAt(tab, i,
null
, fwd);
// f.hash == -1 表示遍歷到了ForwardingNode節點,意味着該節點已經處理過了
// 這裏是控制併發擴容的核心
else
if
((fh = f.hash) == MOVED)
advance =
true
;
// already processed
else
{
// 節點加鎖
synchronized
(f) {
// 節點複製工做
if
(tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 ,表示爲鏈表節點
if
(fh >=
0
) {
// 構造兩個鏈表 一個是原鏈表 另外一個是原鏈表的反序排列
int
runBit = fh & n;
Node<K,V> lastRun = f;
for
(Node<K,V> p = f.next; p !=
null
; p = p.next) {
int
b = p.hash & n;
if
(b != runBit) {
runBit = b;
lastRun = p;
}
}
if
(runBit ==
0
) {
ln = lastRun;
hn =
null
;
}
else
{
hn = lastRun;
ln =
null
;
}
for
(Node<K,V> p = f; p != lastRun; p = p.next) {
int
ph = p.hash; K pk = p.key; V pv = p.val;
if
((ph & n) ==
0
)
ln =
new
Node<K,V>(ph, pk, pv, ln);
else
hn =
new
Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置處插上鍊表
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置處插上鍊表
setTabAt(nextTab, i + n, hn);
// 在table i 位置處插上ForwardingNode 表示該節點已經處理過了
setTabAt(tab, i, fwd);
// advance = true 能夠執行--i動做,遍歷節點
advance =
true
;
}
// 若是是TreeBin,則按照紅黑樹進行處理,處理邏輯與上面一致
else
if
(f
instanceof
TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo =
null
, loTail =
null
;
TreeNode<K,V> hi =
null
, hiTail =
null
;
int
lc =
0
, hc =
0
;
for
(Node<K,V> e = t.first; e !=
null
; e = e.next) {
int
h = e.hash;
TreeNode<K,V> p =
new
TreeNode<K,V>
(h, e.key, e.val,
null
,
null
);
if
((h & n) ==
0
) {
if
((p.prev = loTail) ==
null
)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else
{
if
((p.prev = hiTail) ==
null
)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 擴容後樹節點個數若<=6,將樹轉鏈表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc !=
0
) ?
new
TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc !=
0
) ?
new
TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance =
true
;
}
}
}
}
}
}
|
擴容過程有點複雜,這裏主要涉及到多線程併發擴容,ForwardingNode的做用就是支持擴容操做,將已處理的節點和空節點置爲ForwardingNode,併發處理時多個線程通過ForwardingNode就表示已經遍歷了,就日後遍歷,下圖是多線程合做擴容的過程:
介紹完擴容過程,咱們再次回到put流程,在第四步中是向鏈表或者紅黑樹里加節點,到第五步,會調用treeifyBin()方法進行鏈表轉紅黑樹的過程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
private
final
void
treeifyBin(Node<K,V>[] tab,
int
index) {
Node<K,V> b;
int
n, sc;
if
(tab !=
null
) {
//若是整個table的數量小於64,就擴容至原來的一倍,不轉紅黑樹了
//由於這個閾值擴容能夠減小hash衝突,沒必要要去轉紅黑樹
if
((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n <<
1
);
else
if
((b = tabAt(tab, index)) !=
null
&& b.hash >=
0
) {
synchronized
(b) {
if
(tabAt(tab, index) == b) {
TreeNode<K,V> hd =
null
, tl =
null
;
for
(Node<K,V> e = b; e !=
null
; e = e.next) {
//封裝成TreeNode
TreeNode<K,V> p =
new
TreeNode<K,V>(e.hash, e.key, e.val,
null
,
null
);
if
((p.prev = tl) ==
null
)
hd = p;
else
tl.next = p;
tl = p;
}
//經過TreeBin對象對TreeNode轉換成紅黑樹
setTabAt(tab, index,
new
TreeBin<K,V>(hd));
}
}
}
}
}
|
到第六步表示已經數據加入成功了,如今調用addCount()方法計算ConcurrentHashMap的size,在原來的基礎上加一,如今來看看addCount()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
private
final
void
addCount(
long
x,
int
check) {
CounterCell[] as;
long
b, s;
//更新baseCount,table的數量,counterCells表示元素個數的變化
if
((as = counterCells) !=
null
||
!U.compareAndSwapLong(
this
, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a;
long
v;
int
m;
boolean
uncontended =
true
;
//若是多個線程都在執行,則CAS失敗,執行fullAddCount,所有加入count
if
(as ==
null
|| (m = as.length -
1
) <
0
||
(a = as[ThreadLocalRandom.getProbe() & m]) ==
null
||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return
;
}
if
(check <=
1
)
return
;
s = sumCount();
}
//check>=0表示須要進行擴容操做
if
(check >=
0
) {
Node<K,V>[] tab, nt;
int
n, sc;
while
(s >= (
long
)(sc = sizeCtl) && (tab = table) !=
null
&&
(n = tab.length) < MAXIMUM_CAPACITY) {
int
rs = resizeStamp(n);
if
(sc <
0
) {
if
((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs +
1
||
sc == rs + MAX_RESIZERS || (nt = nextTable) ==
null
||
transferIndex <=
0
)
break
;
if
(U.compareAndSwapInt(
this
, SIZECTL, sc, sc +
1
))
transfer(tab, nt);
}
//當前線程發起庫哦哦讓操做,nextTable=null
else
if
(U.compareAndSwapInt(
this
, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) +
2
))
transfer(tab,
null
);
s = sumCount();
}
}
}
|
put的流程如今已經分析完了,你能夠從中發現,他在併發處理中使用的是樂觀鎖,當有衝突的時候才進行併發處理,並且流程步驟很清晰,可是細節設計的很複雜,畢竟多線程的場景也複雜
咱們如今要回到開始的例子中,咱們對我的信息進行了新增以後,咱們要獲取所新增的信息,使用String name = map.get(「name」)獲取新增的name信息,如今咱們依舊用debug的方式來分析下ConcurrentHashMap的獲取方法get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public
V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p;
int
n, eh; K ek;
int
h = spread(key.hashCode());
//計算兩次hash
if
((tab = table) !=
null
&& (n = tab.length) >
0
&&
(e = tabAt(tab, (n -
1
) & h)) !=
null
) {
//讀取首節點的Node元素
if
((eh = e.hash) == h) {
//若是該節點就是首節點就返回
if
((ek = e.key) == key || (ek !=
null
&& key.equals(ek)))
return
e.val;
}
//hash值爲負值表示正在擴容,這個時候查的是ForwardingNode的find方法來定位到nextTable來
//查找,查找到就返回
else
if
(eh <
0
)
return
(p = e.find(h, key)) !=
null
? p.val :
null
;
while
((e = e.next) !=
null
) {
//既不是首節點也不是ForwardingNode,那就往下遍歷
if
(e.hash == h &&
((ek = e.key) == key || (ek !=
null
&& key.equals(ek))))
return
e.val;
}
}
return
null
;
}
|
ConcurrentHashMap的get操做的流程很簡單,也很清晰,能夠分爲三個步驟來描述
最後咱們來看下例子中最後獲取size的方式int size = map.size();,如今讓咱們看下size()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public
int
size() {
long
n = sumCount();
return
((n < 0L) ?
0
:
(n > (
long
)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(
int
)n);
}
final
long
sumCount() {
CounterCell[] as = counterCells; CounterCell a;
//變化的數量
long
sum = baseCount;
if
(as !=
null
) {
for
(
int
i =
0
; i < as.length; ++i) {
if
((a = as[i]) !=
null
)
sum += a.value;
}
}
return
sum;
}
|
在JDK1.8版本中,對於size的計算,在擴容和addCount()方法就已經有處理了,JDK1.7是在調用size()方法纔去計算,其實在併發集合中去計算size是沒有多大的意義的,由於size是實時在變的,只能計算某一刻的大小,可是某一刻太快了,人的感知是一個時間段,因此並非很精確
其實能夠看出JDK1.8版本的ConcurrentHashMap的數據結構已經接近HashMap,相對而言,ConcurrentHashMap只是增長了同步的操做來控制併發,從JDK1.7版本的ReentrantLock+Segment+HashEntry,到JDK1.8版本中synchronized+CAS+HashEntry+紅黑樹,相對而言,總結以下思考