上次寫了一文看懂HashMap,談到HashMap的線程安全問題就不得不聊聊ConcurrentHashMap
,若你不瞭解HashMap的話能夠看看上面那篇文章,ConcurrentHashMap和HashMap在不少地方是相似的,好比底層都是數組+鏈表+紅黑樹、數組大小都是2的冪次方等.......一些重複的知識點在這裏就不細講了。這篇文章主要會解決如下幾個問題:node
其實ConcurrentHashMap相比HashMap複雜了許多,主要是由於會涉及到許多併發層面的知識點,好比CAS
算法、volitale
以及synchronized
關鍵字等,本文會粗略介紹一下相關知識點,接下來咱們先聊聊HashMap的線程安全問題以及爲何要使用ConcurrentHashMap。面試
HashMap在併發環境下主要有如下幾個問題:算法
在1.7版本,當擴容後生成新數組,在轉移元素的過程當中,使用的是頭插法,也就是鏈表的順序會翻轉,當多個線程執行插入操做時可能會發生死循環。在1.8版本時將頭插法改爲了尾插法,解決了死循環的問題。segmentfault
當兩個線程同時插入元素時可能會發生數據被覆蓋的狀況數組
先看下源碼安全
if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null);
當兩個線程同時執行到以上代碼時,發現沒有發生哈希衝突,因而新建Node節點插入,這時先插入的節點會被後插入的節點覆蓋,致使數據丟失。數據結構
給全部方法加synchronized
鎖,很是低效,如今已經淘汰。多線程
Collections
包提供的一個方法,會同步整個對象,也不推薦使用架構
儘管沒有同步整個Map,可是它仍然是線程安全的,讀操做很是快,而寫操做則是經過加鎖完成的,推薦使用併發
在開始以前須要先介紹下CAS
算法,這也是ConcurrentHashMap實現線程安全的一個關鍵點。
CAS能夠看作是樂觀鎖
的一種實現方式,Java原子類中的遞增操做就經過CAS自旋實現的。
CAS全稱 Compare And Swap(比較與交換),是一種無鎖算法。在不使用鎖(沒有線程被阻塞)的狀況下實現多線程之間的變量同步。
CAS底層就是經過Unsafe類中的方法來實現的,以下所示:
unsafe.compareAndSwapInt(this, valueOffset, expect, update)
下面介紹一下各個參數
經過valueOffset能夠拿到value的值,當且僅當value的值等於expect時,CAS經過原子方式用新值update來更新value的值,不然不會執行任何操做。
整個「比較+更新」操做封裝在
compareAndSwapInt()
中,在JNI裏是藉助於一個CPU指令完成的,屬於原子操做,能夠保證多個線程都可以看到同一個變量的修改值。
ConcurrentHashMap的源碼中除了普通的CAS操做,還定義了三個原子操做,用於對指定位置的節點進行操做。正是這些原子操做保證了ConcurrentHashMap的線程安全,以下所示:
//獲取tab數組的第i個node static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //使用CAS嘗試更新table[i] static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //寫入table[i] static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
ConcurrentHashMap支持併發的讀寫。跟1.7版本相比,JDK1.8的實現已經摒棄了Segment的概念,而是直接用Node數組+鏈表+紅黑樹的數據結構來實現,併發控制使用Synchronized
和CAS
來操做,雖然源碼裏面還保留了,也只是爲了兼容性的考慮,所以本文主要講解的是JDK1.8版本的ConcurrentHashMap。
先來介紹一個核心屬性sizeCtl
private transient volatile int sizeCtl;
用途:控制table數組的初始化和擴容的操做,不一樣的值有不一樣的含義
其它屬性
transient volatile Node<K,V>[] table;//哈希數組,保存Ndode節點 private transient volatile Node<K,V>[] nextTable;//擴容用的數組,只有在擴容時纔不爲null private static final int DEFAULT_CAPACITY = 16;//默認大小 private static final float LOAD_FACTOR = 0.75f;//負載因子 static final int MOVED = -1; //表示正在擴容
在上面咱們能夠看到volatile
關鍵字,這裏先簡單介紹一下該關鍵字的做用:
在多線程環境下,某個共享變量若是被其中一個線程給修改了,其餘線程可以當即知道這個共享變量已經被修改了,當其餘線程要讀取這個變量的時候,最終會去內存中讀取,而不是從本身的工做空間中讀取
虛擬機在進行代碼編譯優化的時候,對於那些改變順序以後不會對最終變量的值形成影響的代碼,是有可能將他們進行重排序的,可是在多線程下可能會引起線程安全問題,使用volatile能夠禁止重排序。
注意:volatile關鍵字沒法保證變量的原子性。
在面試中volatile底層實現機制也是常考的一個知識點,因爲篇幅有限這裏只是簡單介紹一下概念,若是對原理感興趣的同窗能夠上網搜索一下相關資料。
ConcurrentHashMap和HashMap都是由數組+鏈表+紅黑樹構成,不過有一個不一樣的是ConcurrentHashMap的數組中放入的不是TreeNode結點,而是將TreeNode包裝起來的TreeBin對象,以下圖所示:
public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
和HashMap實現差很少,也是用tableSizeFor
方法來確保數組大小爲2的冪次方, 能夠看出構造函數主要是設定sizeCtl
的值,並未對錶進行初始化。當表未初始化的時候,sizeCtl的值其實指定的是表的長度。
tableSizeFor方法用來保證數組爲2的冪次方,若是不瞭解其實現能夠參考一文看懂HashMap
在ConcurrentHashMap裏table數組第一次初始化是在initTable
裏執行的,這點和HashMap有點不一樣,簡單看下初始化步驟:
sizeCtl < 0
說明有別的線程正在初始化或擴容,自旋等待CAS
去更新sizeCtl
的值sizeCtl
設置爲容量閾值(也就是HashMap的threshold)private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //當sizeCtl<0說明有別的線程正在初始化或擴容,自旋等待 if ((sc = sizeCtl) < 0) Thread.yield(); //SIZECTL:表示當前對象的內存偏移量,sc表示指望值,-1表示要替換的值,設定爲-1表示要初始化表 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //檢查table數組是否已經被初始化 if ((tab = table) == null || tab.length == 0) { //若sc=0則設置默認容量16,不然設置爲指定容量大小 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//初始化數組 table = tab = nt; sc = n - (n >>> 2);//n - (n >>> 2) = 0.75n,也就是說sc的值等於threshold } } finally { sizeCtl = sc; } break; } } return tab; }
這裏須要注意的一點是在else if塊裏面須要從新判斷一次table是否未初始化,由於在finally塊裏改變了sizeCtl值,這時候其它線程是可以進入else if塊中的,這樣就會執行兩次初始化操做了。
在介紹get方法以前先來看看ConurrentHashMap如何計算key的hash值,ConcurrentHashMap用了spread函數來求hash值,它與HashMap的hash函數有略微不一樣,代碼以下:
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
除了高16位和低16位或操做以外,最後還和HASH_BITS
相與,其值爲0x7fffffff
。它的做用主要是使hash值爲正數。在ConcurrentHashMap中,Hash值爲負數有特別的意義,如-1表示ForwardingNode結點,-2表示TreeBin結點。
什麼是ForwardingNode結點和TreeBin結點?
//只在擴容時出現,實現了擴容時舊錶和新表的鏈接 static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null);//MOVED = -1 this.nextTable = tab; } ...... }
ForwardingNode節點是Node節點的子類,hash值固定爲-1,只在擴容 transfer的時候出現,當舊數組中所有的節點都遷移到新數組中時,舊數組就在數組中放置一個ForwardingNode。讀操做或者迭代讀時碰到ForwardingNode時,將操做轉發到擴容後的新的table數組上去執行,寫操做遇見它時,則嘗試幫助擴容。
至於TreeBin節點也是繼承自Node,hash值固定爲-2,是紅黑樹的包裝結點。(有關紅黑樹因爲篇幅有限這裏就不展開講了)
查詢步驟
(n-1)&hash
計算下標)hash = -1
則調用ForwardingNode的find函數轉發到nextTable上查找;若 hash = -2
則調用TreeBin的find函數查找元素接下來看看get方法源碼
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode());//計算hash值 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) {//判斷第一個節點是否就是目標節點 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //若是hash值小於0,有兩種狀況 //-1是ForwardingNode,則用find函數轉發到nextTable上查找 //-2是TreeBin,調用TreeBin的find函數。 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //正常節點則遍歷查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
tabAt方法使用volatile來獲取數組上的元素,在介紹CA時已經說過了,若是忘記了請翻到上面查看。
從代碼也能夠看出get方法是不加鎖的,這裏比較須要注意的一點是hash值爲-1的ForwardingNode節點,當讀操做碰到ForwardingNode時會調用find方法轉發到擴容後的新的table數組上去執行,咱們來看看find方法的實現:
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //到新數組上查找元素 Node<K,V> find(int h, Object k) { //使用循環,避免屢次碰到ForwardingNode致使遞歸過深 outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; //第一個節點就是要找的節點 if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { //繼續遇見ForwardingNode的狀況,這裏至關因而遞歸調用一次本方法 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else//遇見特殊節點,調用其find方法進行查找 return e.find(h, k); } if ((e = e.next) == null)//普通節點直接循環遍歷鏈表 return null; } } } }
擴容時當數組爲空或完成擴容後將ForwardingNode結點插入數組槽中,而find操做在新表中進行查詢。巧妙利用ForwardingNode將舊錶和新錶鏈接起來,保證了其餘線程擴容時也能對結點正常訪問。
仍是同樣先來看看插入過程:
tabAt
方法讀取節點,若沒有發生hash衝突則用CAS
插入節點ForwardingNode
節點,說明在擴容,調用hlepTransfer
幫助擴容synchronized
對節點加鎖,以後遍歷鏈表,若元素已存在則更新舊值,不然在尾部插入節點TreeBin
節點則調用putTreeVal
方法插入addCount
方法對節點數量+1,在該方法裏面也會判斷是否須要擴容put方法源碼:
public V put(K key, V value) { return putVal(key, value, false);//false表示若已存在則進行覆蓋 } final V putVal(K key, V value, boolean onlyIfAbsent) { //不容許key或value爲null,這點和HashMap不一樣 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0;//記錄當前鏈表或紅黑樹長度 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //判斷是否須要初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //用tabAt方法讀取table[i],若沒有發生hash衝突則用CAS插入節點 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } //若hash值爲-1,則爲ForwardingNode結點,說明在擴容,調用hlepTransfer幫助擴容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { //若是是普通鏈表結點或樹結點,使用synchronized對節點加鎖 V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) {//二次檢查,相似於單例模式的雙重檢查 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //若元素已存在則更新value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //插入節點到鏈表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } }//若是是TreeBin節點則調用putTreeVal方法插入 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //若鏈表長度太長,則調用treeifyBin將鏈表轉換爲紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount);//節點數量+1,檢查是否須要進行擴容 return null; }
插入過程也不是很難,不少地方和HashMap差很少,能夠照着註釋多看幾遍就懂了。
在put源碼最後會調用addCount
方法來修改元素個數,在addCount方法裏面又會檢查是否須要調用transfer
方法來擴容,ConcurrentHashMap的併發擴容是設計的一個精髓,因爲博主能力有限至今還未搞懂,如有興趣的能夠參考這篇:阿里十年架構師,教你深度分析ConcurrentHashMap原理分析
源碼不是很難,主要是replaceNode方法的幾個參數搞懂就行,這裏再也不細講。
public V remove(Object key) { return replaceNode(key, null, null); } //cv是指望值,當待刪除節點的值等於cv時,用value替換舊值 final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //table未初始化 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //正在擴容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; //符合更新value或者刪除節點的條件 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; //更新value if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else //CAS設置節點 setTabAt(tab, i, e.next); } break; } //當前節點不是目標節點,繼續遍歷下一個節點 pred = e; if ((e = e.next) == null) //到達鏈表尾部,依舊沒有找到,跳出循環 break; } } //紅黑樹 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { //若是刪除了節點,更新size if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
有關ConcurrentHashMap到這裏暫時先告一段落了,博主當初還覺得三天能夠寫完,到今天已經第五天了沒想到還沒搞定,其中還有一些經典的設計好比transfer擴容方法因爲能力有限沒列出來,不過本篇對於面試來講應該已經夠用了,若是有哪裏寫得不對歡迎各位指出來!