一說到HashMap與Hashtable區別就會想到前者線程不安全,後者線程安全。可是當咱們須要線程安全的時候,Hashtable並非一個良好的選擇,concurrentHashMap纔是。node
獲得線程安全的HashMap有如下三種方式:
①.使用Hashtable
②.使用Collections.synchronizedMap()方法
③.使用concurrentHashMap
有這3中方式,爲何推薦使用第三種,而不是其他兩個方法?答案:效率,咱們來看看那兩種方式的源碼
算法
咱們能夠看到也使用synchronized關鍵字將HashMap包裝成一個線程安全的map與Hashtable相似,每次只有一個線程能訪問此對象。數組
public static Map synchronizedMap(Map m) { return new SynchronizedMap<>(m); }複製代碼private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } 複製代碼複製代碼private static class SynchronizedMap<K,V> implements Map<K,V>, Serializable { private final Map<K,V> m; // Backing Map final Object mutex; // Object on which to synchronize SynchronizedMap(Map<K,V> m) { this.m = Objects.requireNonNull(m); mutex = this; } public int size() { synchronized (mutex) {return m.size();} } public boolean isEmpty() { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey(Object key) { synchronized (mutex) {return m.containsKey(key);} } 複製代碼
/**
* 存放node的數組,大小是2的冪次方
*/
transient volatile Node[] table;
/**
* 擴容時用於存放數據的變量,平時爲null
*/
private transient volatile Node[] nextTable;
/**
* 經過CAS更新,記錄容器的容量大小
*/
private transient volatile long baseCount;
/**
* 控制標誌符
* 負數: 表明正在進行初始化或擴容操做,其中-1表示正在初始化,-N 表示有N-1個線程正在進行擴容操做
* 正數或0: 表明hash表尚未被初始化,這個數值表示初始化或下一次進行擴容的大小,相似於擴容閾值
* 它的值始終是當前ConcurrentHashMap容量的0.75倍,這與loadfactor是對應的。
* 實際容量 >= sizeCtl,則擴容
*/
private transient volatile int sizeCtl;
/**
* 下次transfer方法的起始下標index加上1以後的值
*/
private transient volatile int transferIndex;
/**
* CAS自旋鎖標誌位
*/
private transient volatile int cellsBusy;
/**
* counter cell表,長度總爲2的冪次
*/
private transient volatile CounterCell[] counterCells;
複製代碼
static class Node implements Map.Entry {
final int hash;
final K key;
volatile V val;
volatile Node next;
...
}
複製代碼
static final class TreeNode extends LinkedHashMap.Entry {
TreeNode parent; // red-black tree links
TreeNode left;
TreeNode right;
TreeNode prev; // needed to unlink next upon deletion
boolean red;
...
}
複製代碼
static final class TreeBin extends Node {
TreeNode root;
volatile TreeNode first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
...
}
複製代碼
static final class ForwardingNode extends Node {
final Node[] nextTable;
//ForwardingNode節點hash爲-1,若操做中遇到此類型節點,代表有線程正在擴容
ForwardingNode(Node[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
...
}
複製代碼
/**
* 默認構造方法
*/
public ConcurrentHashMap() {}
/**
* 指定容量
*/
public ConcurrentHashMap(int initialCapacity) {
//異常狀況
if (initialCapacity < 0)
throw new IllegalArgumentException();
//計算容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/**
* 指定map
*/
public ConcurrentHashMap(Map m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
/**
* 指定容量、負載因子
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/**
* 指定容量、負載因子、併發級別
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
複製代碼
concurrentHashMap調用Unsafe類中的方法實現CAS(這個算法的基本思想就是不斷地去比較當前內存中的變量值與你指定的一個變量值是否相等,若是相等,則接受你指定的修改的值,不然拒絕你的操做),其內部方法大多爲native方法即直接調用操做系統底層資源執行相應任務,提供了一些能夠直接操控內存和線程的底層操做。安全
initTable方法判斷sizeCtl值,若sizeCtl值爲-1即有其餘線程正在初始化,調用Thread.yield()讓出CPU時間片,而正在初始化的線程經過Unsafe.compareAndSwapInt方法修改sizeCtl爲-1,初始化數組和擴容閾值。數據結構
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//若sizeCtl<0,即存在其餘線程正在初始化操做,確保只有一個線程進行初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
//利用CAS方法把sizectl的值置爲-1,代表已有線程進行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
//得到桶容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//初始化node數組
Node[] nt = (Node[])new Node[n];
table = tab = nt;
//計算擴容閾值0.75n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
複製代碼
從源碼咱們能夠看出最外層的流程控制採用while循環,而非if條件分支,由於Thread.yield()會讓出cpu時間,目的在於確保數組初始化成功多線程
ConcurrentHashMap的put操做與HashMap很類似,但ConcurrentHashMap不容許null做爲key和value,而且因爲須要保證線程安全,有如下兩個多線程狀況:
併發
①.若是一個或多個線程正在對ConcurrentHashMap進行擴容操做,當前線程也要進入擴容的操做中。這個擴容的操做之因此能被檢測到,是由於transfer方法會將已經操做過擴容桶頭結點置爲ForwardingNode節點,若是檢測到須要插入的位置被該節點佔有,就幫助進行擴容。
dom
②.若是檢測到要插入的節點是非空且不是ForwardingNode節點,就對這個節點加鎖,這樣就保證了線程安全。
ide
final V putVal(K key, V value, boolean onlyIfAbsent) {
//與HashMap不一樣,ConcurrentHashMap不容許null做爲key或value
if (key == null || value == null) throw new NullPointerException();
//計算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node[] tab = table;;) {
Node f; int n, i, fh;
//若table爲空的話,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//若當前數組i位置上的節點爲null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//CAS插入節點(V:當前數組i位置上的節點; O:null; N:新Node對象)
if (casTabAt(tab, i, null,
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//當前正在擴容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//鎖住當前數組i位置上的節點
synchronized (f) {
//判斷是否節點f是否爲當前數組i位置上的節點,防止被其它線程修改
if (tabAt(tab, i) == f) {
//當前位置桶的結構爲鏈表
if (fh >= 0) {
binCount = 1;
//遍歷鏈表節點
for (Node e = f;; ++binCount) {
K ek;
//若hash值與key值相同,進行替換
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node pred = e;
//若鏈表中找不到,尾插節點
if ((e = e.next) == null) {
pred.next = new Node(hash, key,
value, null);
break;
}
}
}
//當前位置桶結構爲紅黑樹,TreeBin哈希值固定爲-2
else if (f instanceof TreeBin) {
Node p;
binCount = 2;
//遍歷紅黑樹上節點,更新或增長節點
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//若鏈表長度超過8,將鏈表轉爲紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//節點數+1,若超過閾值則擴容
addCount(1L, binCount);
return null;
}
/**
* hash算法
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
複製代碼
大體流程:
①.首先判斷key和value是否爲null,爲null拋異常。
與HashMap不一樣,ConcurrentHashMap不容許null做爲key或value,爲何這樣設計?
性能
由於ConcurrentHashmap是支持併發的,這樣會有一個問題,當你經過get(k)獲取對應的value時,若是獲取到的是null時,你沒法判斷,它是put(k,v)的時候value爲null,仍是這個key歷來沒有作過映射。HashMap是非併發的,能夠經過contains(key)來作這個判斷。而支持併發的Map在調用m.contains(key)和m.get(key),m可能已經不一樣了。參考
②.從新計算hash值,由於1.8concurrentHashMap引入了紅黑樹來處理較多的哈希衝突,簡化了hash算法,不過對比了1.8HashMap的hash算法,除了由於concurrentHashMap不容許null爲key,沒有把null的hash值算爲0之外,其多了一步位運算,之因此這樣是爲了確保重哈希算出的必定不爲負數,我認爲是由於若算出負值,可能會影響後續的節點類型判斷
③.判斷當前table是否爲空,空的話初始化table,初始化方法已闡述過再也不多提
④.根據重哈希算出的值經過與運算獲得桶索引,利用Unsafe類直接獲取內存內存中對應位置上的節點,若沒有碰撞即桶中無結點CAS直接添加
⑤.若發生碰撞,判斷桶中第一個節點類型。是ForwardingNode節點的話,代表有其它線程正在擴容,則一塊兒進行擴容操做
⑥.剩下場景,按節點相應結構鏈表或紅黑樹的方式插入或更新新節點
⑦.若新增節點後鏈表長度大於8,就把這個鏈表轉換成紅黑樹
⑧.最後節點數量+1,校驗是否超過閾值,若超過則擴容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//利用CAS方法更新baseCount的值
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//多線程CAS發生失敗的時候執行
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
///若是check值大於等於0 則須要檢驗是否須要進行擴容操做
if (check >= 0) {
Node[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
複製代碼
咱們能夠看到在更新baseCount時用了2次CAS操做,一次直接更新basecount,若更新失敗則更新CELLVALUE,若還更新失敗會調用fullAddCount()方法,這個方法會死循環一直將因爲多線程競爭致使CAS失敗的容量變化值存到CounterCell數組中,爲size()方法作準備
對於ConcurrentHashMap來講,這個table裏到底裝了多少東西實際上是個不肯定的數量,由於不可能在調用size()方法的時候像GC的「stop the world」同樣讓其餘線程都停下來讓你去統計,所以只能說這個數量是個估計值。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
複製代碼
爲了統計元素個數,ConcurrentHashMap經過累加baseCount和CounterCell數組中的數量,元素個數保存baseCount中,部分元素的變化個數保存在CounterCell數組中,獲得元素的總個數。
支持多線程擴容,沒有加鎖 ,其目的不只爲了知足concurrent的要求,而是但願利用併發處理去減小擴容帶來的時間影響。
如下兩個場景可能會觸發擴容機制(與1.8HashMap相同):
①.當桶中鏈表長度達到閾值8,但整個ConcurrentHashMap節點數量小於64
②.新增節點後,整個ConcurrentHashMap節點數量超過閾值
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 建立node數組,容量爲當前的兩倍
Node[] nt = (Node[])new Node[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 若擴容時出現OOM異常,則將閾值設爲最大,代表不支持擴容
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 建立ForwardingNode節點,做爲標記位,代表當前位置桶已作過處理
ForwardingNode fwd = new ForwardingNode(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//經過CAS設置transferIndex屬性值,並初始化i和bound值
//i指當前處理的槽位序號,bound指須要處理的槽位邊界
//先處理最後一個桶的節點;
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 將原數組中節點複製到新數組中去
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//若是全部的節點都已經完成複製工做 就把nextTable賦值給table 清空臨時對象nextTable
if (finishing) {
nextTable = null;
table = nextTab;
//設置新擴容閾值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新擴容閾值,在這裏面sizectl值減一,說明新加入一個線程參與到擴容操做
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//鎖住i位置上桶的節點
synchronized (f) {
//確保f是i位置上桶的節點
if (tabAt(tab, i) == f) {
Node ln, hn;
//當前桶是鏈式結構
if (fh >= 0) {
//構造兩個鏈表
int runBit = fh & n;
Node lastRun = f;
//相似於1.8HashMap,只須要看新增的1bit是0仍是1進行分類
for (Node p = f.next; p != null; p = p.next) {
//n是就數組長度,不是長度-1
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
//在nextTable的i位置上插入一個鏈表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另外一個鏈表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode節點 表示已經處理過該節點
setTabAt(tab, i, fwd);
//設置advance爲true 返回到上面的while循環中 就能夠執行i--操做
advance = true;
}
//當前桶是紅黑樹結構,操做和上面的相似
else if (f instanceof TreeBin) {
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//若是擴容後已經再也不須要tree的結構 反向轉換爲鏈表結構
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
複製代碼
如圖所示(藍色:hash值第X位爲0,灰色:hash值第X位爲1)
看過JDK8 HashMap擴容方法應該會感受有點像,ConcurrentHashMap對鏈表結構的處理相似HashMap的resize()方法最後部分,對紅黑樹結構處理相似HashMap裏TreeNode內部類的split()方法,TreeBin的構造方法相似於TreeNode內部類的treeify()方法,可是因爲ConcurrentHashMap須要保證線程安全其操做仍是複雜得多
問題:爲何要反序構建鏈表?
參考了下1.7HashMap頭插節點思想,由於後插入的數據被使用的頻次更高,沒法隨機訪問只能從頭開始遍歷查詢,而ConcurrentHashMap利用內置鎖synchronized鎖住桶頭節點,由於後插入的數據被使用的頻次更高,倒序使鎖持有時間更短,等待時間也就短了(僅本人見解)
給定一個key來肯定value的時候,必須知足兩個條件 key相同 hash值相同,對於節點可能在鏈表或樹上的狀況,須要分別去查找。
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
//計算hash值
int h = spread(key.hashCode());
////根據hash值肯定節點位置
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//桶首節點的key與查找的key相同,則直接返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//樹節點場景
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//鏈表場景
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
複製代碼
不考慮細節處理,JDK 7中ConcurrentHashMap最主要採用segment,多線程競爭會先鎖住segment,在其put操做中會先定位segment位置,再定位segment中具體桶位置,而JDK 8直接操做Node數組中的桶,鎖住桶首節點,鎖的粒度變小了。另外segment繼承ReentrantLock,而ReentrantLock相對於synchronized關鍵字多了等待可中斷、公平性、綁定多個條件,但針對於性能而言,因爲synchronized內置鎖不斷在優化,其性能不會差。