上一篇文章我講了一下HashMap的相關源碼實現,而且咱們知道它是線程不安全的,在併發環境中使用時,HashMap在擴容的時候有可能會生成一個環形鏈表,從而致使get造成死循環超時。那這篇咱們就來介紹一下併發環境下使用的HashMap——ConcurrentHashMap,下面是它的類關係圖。java
Segment是一種可重入鎖,在ConcurrentHashMap裏扮演鎖的角色;HashEntry則用於存儲鍵值對數據。node
一個ConcurrentHashMap裏包含一個Segment數組。Segment的結構和HashMap相似,是一種數組和鏈表結構。一個Segment裏包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每一個Segment守護着一個HashEntry數組裏的元素,當對HashEntry數組的數據進行修改時,必須首先得到與它對應的Segment鎖。算法
ConcurrentHashMap經過使用分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。數組
static final class Segment extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
... ...
}複製代碼
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
複製代碼
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
transient volatile HashEntry<K,V>[] table;
transient int count;
transient int modCount;
transient int threshold;
final float loadFactor;
}
final Segment<K,V>[] segments;複製代碼
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)複製代碼
static final int DEFAULT_CONCURRENCY_LEVEL = 16;複製代碼
接下來咱們看一下ConcurrentHashMap中的幾個關鍵函數,get,put,rehash(擴容), size方法,看看他是如何實現併發的。
安全
segment的put方法實現bash
/** * The number of elements. Accessed only either within locks * or among other volatile reads that maintain visibility. */
transient int count;複製代碼
static final int RETRIES_BEFORE_LOCK = 2;
public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
// 超過嘗試次數,則對每一個 Segment 加鎖
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 連續兩次獲得的結果一致,則認爲這個結果是正確的
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}複製代碼
ConcurrentHashMap如何判斷統計過程當中Segment的cout發生了變化?數據結構
數據結構採用數組 + 鏈表 + 紅黑樹的方式實現。當鏈表中(bucket)的節點個數超過8個時,會轉換成紅黑樹的數據結構存儲,這樣設計的目的是爲了提升同一個鏈表衝突過大狀況下的讀取效率。併發
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}複製代碼
ConcurrentHashMap中有三個核心的CAS操做async
//獲取索引i處Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法設置i位置上的Node節點(將c和table[i]比較,相同則插入v)
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile設置節點位置i的值,僅在上鎖區被調用
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
} 複製代碼
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//若是一個線程發現sizeCtl<0,意味着另外的線程
//執行CAS操做成功,當前線程只須要讓出cpu時間片
if ((sc = sizeCtl) < 0)
Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//CAS方法把sizectl置爲-1,表示本線程正在進行初始化
try {
if ((tab = table) == null || tab.length == 0) {
//DEFAULT_CAPACITY 默認初始容量是 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化數組,長度爲 16 或初始化時提供的長度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
//將這個數組賦值給 table,table 是 volatile 的
table = tab = nt;
//若是 n 爲 16 的話,那麼這裏 sc = 12
//其實就是 0.75 * n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}複製代碼
調用initTable會判斷sizeCtl的值,若值爲-1則表示正在初始化,會調用yield()去等待。函數
若值爲0,這時先調用CAS算法去設置爲-1,再初始化。
因此執行第一次put操做的線程會執行Unsafe.compareAndSwapInt方法修改sizeCtl爲-1,有且只有一個線程可以修改爲功,其它線程經過Thread.yield()讓出CPU時間片等待table初始化完成。
綜上所述,能夠知道初始化是單線程操做。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不容許key、value爲空
if (key == null || value == null) throw new NullPointerException();
//返回 (h ^ (h >>> 16)) & HASH_BITS;
int hash = spread(key.hashCode());
int binCount = 0;
//循環,直到插入成功
for (Node[] tab = table;;) {
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//table爲空,初始化table
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//索引處無值
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)// MOVED=-1;
//檢測到正在擴容,則幫助其擴容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//上鎖(hash值相同的鏈表的頭節點)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
//遍歷鏈表節點
binCount = 1;
for (Node e = f;; ++binCount) {
K ek;
// hash和key相同,則修改value
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//僅putIfAbsent()方法中onlyIfAbsent爲true
if (!onlyIfAbsent)
//putIfAbsent()包含key則返回get,不然put並返回
e.val = value;
break;
}
Node pred = e;
//已遍歷到鏈表尾部,直接插入
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {// 樹節點
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判斷是否要將鏈表轉換爲紅黑樹,臨界值和HashMap同樣也是8
if (binCount >= TREEIFY_THRESHOLD)
//若length<64,直接tryPresize,兩倍table.length;不轉樹
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}複製代碼
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
複製代碼
int index = (n - 1) & hash複製代碼
4. 若是f爲null,說明table中這個位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node節點。
6. 其他狀況把新的Node節點按鏈表或紅黑樹的方式插入到合適的位置,這個過程採用同步內置鎖實現併發,代碼如上。
在節點f上進行同步,節點插入以前,再次利用tabAt(tab, i) == f判斷,防止被其它線程修改。
private final void treeifyBin(Node[] tab, int index) {
Node b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 爲 64
// 因此,若是數組長度小於 64 的時候,其實也就是 32 或者 16 或者更小的時候,會進行數組擴容
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 後面咱們再詳細分析這個方法
tryPresize(n << 1);
// b 是頭結點
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 加鎖
synchronized (b) {
if (tabAt(tab, index) == b) {
// 下面就是遍歷鏈表,創建一顆紅黑樹
TreeNode hd = null, tl = null;
for (Node e = b; e != null; e = e.next) {
TreeNode p =
new TreeNode(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
// 將紅黑樹設置到數組相應位置中
setTabAt(tab, index, new TreeBin(hd));
}
}
}
}
}複製代碼
這裏的擴容也是作翻倍擴容,擴容後數組容量爲原來的 2 倍。
// 首先要說明的是,方法參數 size 傳進來的時候就已經翻了倍了
private final void tryPresize(int size) {
// c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 這個 if 分支和以前說的初始化數組的代碼基本上是同樣的
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2); // 0.75 * n
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 2. 用 CAS 將 sizeCtl 加 1,而後執行 transfer 方法
// 此時 nextTab 不爲 null
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 1. 將 sizeCtl 設置爲 (rs << RESIZE_STAMP_SHIFT) + 2)
// 調用 transfer 方法,此時 nextTab 參數爲 null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}複製代碼
至於transfer()方法的源碼這裏我就不分析了,它的大概功能就是將原來的 tab 數組的元素遷移到新的 nextTab 數組中。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//tabAt(i),獲取索引i處Node
// 判斷頭結點是否就是咱們須要的節點
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 若是頭結點的 hash<0,說明正在擴容,或者該位置是紅黑樹
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//遍歷鏈表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}複製代碼
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}複製代碼
到這裏我就基本把ConcurrentHashMap在 JDK 1.7和1.8中的實現大概捋了一遍,並詳細分析了幾個重要的方法實現:初始化、put、get。在 JDK1.8中ConcurrentHashMap發生了較大的變化,經過使用CAS+synchronized的實現取代了原先 1.7 中的Segment分段鎖機制,從而支持更高的併發量。
這只是我對ConcurrentHashMap的第二次學習,若想更好地理解掌握ConcurrentHashMap的實現之精妙,我的以爲還需之後再多看幾回,相信每次都會有新的收穫。