在ConcurrentHashmap,已經有一個線程安全的容器HashTable,可是ConcurrentHashMap比HashTable更加高效html
HashTable容器使用synchronized來保證線程安全:
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下,由於在一個線程訪問HashTable的同步方法,其它線程也訪問HashTable的同步方法是,會進入阻塞或輪詢狀態
java
ConcurrentHashMap採用CAS+Synchronized方式,來保證線程安全:
在jdk8,之前ConcurrentHashMap採用**「鎖分段技術」**,首先將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問。在jdk8之後,採用CAS+SynChronized方式來保證線程安全。
關於爲何用「CAS+Synchronized」代替「ReentrantLick+Segment」,看這篇文章ConcurrentHashMap 1.8爲何要使用CAS+Synchronized取代Segment+ReentrantLock?
接下來則是關於插入操做的具體分析node
//put()中調用了putVal,咱們直接對putval進行分析
final V putVal(K key, V value, boolean onlyIfAbsent) {
//從這裏能夠看出ConcurrentHashMap並不容許 k,v爲null
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
//一個死循環,當插入操做完成後纔會跳成循環
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
//第一次插入時,初始化table,initTable()的分析,見後邊
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//若是要插入的數組的節點爲null,直接進行插入操做,casTabAt()爲原子操做,保證了線程安全
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
//若是table在擴容,就讓當前線程幫助table擴容,提高效率,helpTransfer()中調用了transfer方法,這兩個方法的分析見後邊
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//傳入的 onlyIfAbsent=false,因此不會走這個部分
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
//數組的插入節點不爲null,則要向後查找
else {
V oldVal = null;
//同步代碼塊,f爲這條bins或則tree的頭節點,爲何鎖對象爲這個頭節點?
synchronized (f) {
//tabAt()也爲原子操做,爲何加鎖以後還要採用原子操做?由於判斷的是當前這個節點,也就是這個鎖對象,是否已經改變
//再次判斷頭節點是否爲先前得出的節點,由於以前操做沒有加鎖,可能這個節點已經被改變
if (tabAt(tab, i) == f) {
//fh=f.hash>0,也就是說沒有進行擴容操做
if (fh >= 0) {
//鏈表的長度
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//在這條bin中有相同的node,則進行更新
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//不斷向後查找,若是沒有節點和插入節點相同,則將節點插入到末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//f,爲TreeBin,實際上就表明了,這個節點爲TreeNode,即爲紅黑樹,
//ConcurrenthashMap數組中放入的實際是TreeBin,treeBin完成了對紅黑樹的包裝
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
//最後在判斷一次鏈表長度是否超過閾值,超過則進行轉換位紅黑樹的操做
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//進行計數,並檢查是否須要擴容,或者正在擴容時,幫助進行擴容
addCount(1L, binCount);
return null;
}
複製代碼
/**
* Initializes table, using the size recorded in sizeCtl.
官方註釋中的sizeCtl很是重要
private transient volatile int sizeCtl;
負數表明正在進行初始化或擴容操做
-1表明正在初始化
-N 表示有N-1個線程正在進行擴容操做
正數或0表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小,這一點相似於擴容閾值的概念
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sc<0,表明正在進行初始化,將線程掛起
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//CAS操做,將sizeCtl置爲-1,表明搶到了鎖,進行init
else if (U.compareAndSetInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//DEFAULT_CAPACITY=16
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//對sizeCtl也進行增大,n-n>>>2等價於 n*o.75
sc = n - (n >>> 2);
}
} finally {
//對sizeCtl更新
sizeCtl = sc;
}
break;
}
}
return tab;
}
複製代碼
//helpTransfer調用了transfer方法
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
/* *ForwardingNode:官方註釋 A node inserted at head of bins during transfer operations. * 噹噹前節點完成轉移操做後就會將當前節點設爲ForwardingNode,來表示當前節點已經完成轉移操做 * nextTab:ForwardingNode中的一個變量,新的table,ForwardingNode會在transfer中進行初始化,所以nextTab會在那個時候賦值 */
//節點正在進行轉移操做
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//resizeStamp(),產生一個標誌位
int rs = resizeStamp(tab.length);//實際上高16位爲0,只有低16位有效
//若是 nextTab 沒有被併發修改 且 tab 也沒有被併發修改
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
/* *sc>>>RESIZE_STAMP_SHIFT(16)!=rs,sc左移16位不等於rs,標識符發生了變化,從這裏能夠看出sc即sizeCtl的高16位標識符 *sc==rs+1,表示擴容已經結束了,爲何表示擴容結束了?具體分析見後面sizeCtl的分析 *sc=rs+MAX_RESIZERS(65535),表示達到最大線程數 *transferIndex,轉移的下標,表示正在調整下標 */
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//調用transfer增長一個線程爲其擴容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
複製代碼
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//stride,能夠理解爲步長,當數組長度太長時,就會將數組分段,一個線程處理一段
//這個stride即爲每段的長度
int n = tab.length, stride;
//對數組分段得出stride的大小,MIN_TRANSFER_STRIDE(16),stride最小值爲16
//從MIN_TRANSFER_STRIDE的介紹中能夠看出,是爲了防止將stride設置的過小,就會產生過多線程,進行過分的內存競爭
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab==null,進行擴容操做,爲原table2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//從這裏能夠看出,轉移操做是從數組末尾開始的
transferIndex = n;
}
int nextn = nextTab.length;
//初始化fwd,將以前初始化的nextTab傳進去
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance標誌位表示作完了一個位置的轉移操做,能夠進行下一個位置的轉移操做
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
//將transferIndex賦值給nextIndex,transferIndex<=0,表示原數組的全部位置都有線程進行處理了
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//這裏進行,nextIndex的賦值 = nextBound,nextBound=nextIndex-stride爲上一次的邊界
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//全部的轉移操做以及完成
if (finishing) {
nextTable = null;
table = nextTab;
//從新計算sizeCtl
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//採用CAS,更新sc的值,每一個線程完成操做後就會將sc-1,
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//全部的操做已經完成,爲何這裏表示全部操做以及完成?見後邊sc的分析
//簡單說一下,在第一個線程進入是, sc=rs<<16+2;每次增長一條線程sc+1,減小一條sc-1,當sc=rs<<16+2時表示全部線程完成操做
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//若是位置 i=null,那麼放入剛剛初始化的 ForwardingNode 」空節點「,表明已經完成操做
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//加鎖處理轉移操做
synchronized (f) {
if (tabAt(tab, i) == f) {
//和hashmap相同,將一個鏈表分爲兩個,一個的索引是原來的位置,另外一個是原索引+n;
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
//構建兩條反序鏈表
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//放在原索引的鏈表
setTabAt(nextTab, i, ln);
//放在索引爲原索引+n的鏈表
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//若是爲treeNode,進行treenode的相關split操做
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//若是擴容以後不爲長度小於UNTREEIFY_THRESHOLD,則轉換爲鏈表結構
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//將,兩條鏈表賦值到新數組
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
複製代碼
sizectl的分析
1. rs=resizeStamp(table.length);
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
Integer.numberOfLeadingZeros()返回最高位之前0的個數,例如16 00**010000,返回27
RESIZE_STAMP_BITS=16
因此咱們能夠得出, rs實際是一個16有效值的數字,由於高16位全爲0;
2. addCount()
部分源碼
···
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
//正在進行轉移操做
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//若是能夠幫助進行transfer則將sc+1,表明多了一條線程,幫助轉移操做
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//若是,沒在擴容,或第一次進行擴容時,sc=re<<16+2,即sc的初始值
else if (U.compareAndSetInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
3.結論
從這部分源碼中咱們能夠看出sc,和rs的關係,
即sc 高16位表示length生成的標識符,低16位則表示正在幫助擴容的線程數,初始值爲2
因此在前邊 sc-2=rs<<16,來判斷是否已經結束擴容操做
sc=rs+1,當第一個線程結束後,sc-1=rs+2-1=rs+1;也表示擴容已經結束
複製代碼
(1)ConcurrentHashMap的get()沒有加鎖,如何保證讀到的數據的正確性?
數組
實際上 volatile V val;
volatile Node<K,V> next;
transient volatile Node<K,V>[] table;
對於Node節點的val,next,以及table都用volatile修飾
可是table用volatile修飾是保證數組在擴容時的可見性,而不能保證對數組中元素的可見性,
由於table[i]保證的是 table[i]這個對應的地址的可見性
而真正保證讀操做正確的是,Node節點中的val,next被volatile修飾
複製代碼
(2)在進行擴容時,鎖住的對象爲每一個hash桶的頭節點,保證了多線程對數組的併發分段修改安全