Java併發系列[9]----ConcurrentHashMap源碼分析

咱們知道哈希表是一種很是高效的數據結構,設計優良的哈希函數可使其上的增刪改查操做達到O(1)級別。Java爲咱們提供了一個現成的哈希結構,那就是HashMap類,在前面的文章中我曾經介紹過HashMap類,知道它的全部方法都未進行同步,所以在多線程環境中是不安全的。爲此,Java爲咱們提供了另一個HashTable類,它對於多線程同步的處理很是簡單粗暴,那就是在HashMap的基礎上對其全部方法都使用synchronized關鍵字進行加鎖。這種方法雖然簡單,但致使了一個問題,那就是在同一時間內只能由一個線程去操做哈希表。即便這些線程都只是進行讀操做也必需要排隊,這在競爭激烈的多線程環境中極爲影響性能。本篇介紹的ConcurrentHashMap就是爲了解決這個問題的,它的內部使用分段鎖將鎖進行細粒度化,從而使得多個線程可以同時操做哈希表,這樣極大的提升了性能。下圖是其內部結構的示意圖。node

1. ConcurrentHashMap有哪些成員變量?數組

 1 //默認初始化容量
 2 static final int DEFAULT_INITIAL_CAPACITY = 16;
 3 
 4 //默認加載因子
 5 static final float DEFAULT_LOAD_FACTOR = 0.75f;
 6 
 7 //默認併發級別
 8 static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 9 
10 //集合最大容量
11 static final int MAXIMUM_CAPACITY = 1 << 30;
12 
13 //分段鎖的最小數量
14 static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
15 
16 //分段鎖的最大數量
17 static final int MAX_SEGMENTS = 1 << 16;
18 
19 //加鎖前的重試次數
20 static final int RETRIES_BEFORE_LOCK = 2;
21 
22 //分段鎖的掩碼值
23 final int segmentMask;
24 
25 //分段鎖的移位值
26 final int segmentShift;
27 
28 //分段鎖數組
29 final Segment<K,V>[] segments;

在閱讀完本篇文章以前,相信讀者不能理解這些成員變量的具體含義和做用,不過請讀者們耐心看下去,後面將會在具體場景中一一介紹到這些成員變量的做用。在這裏讀者只需對這些成員變量留個眼熟便可。可是仍有個別變量是咱們如今須要瞭解的,例如Segment數組表明分段鎖集合,併發級別則表明分段鎖的數量(也意味有多少線程能夠同時操做),初始化容量表明整個容器的容量,加載因子表明容器元素能夠達到多滿的一種程度。安全

2. 分段鎖的內部結構是怎樣的?數據結構

 1 //分段鎖
 2 static final class Segment<K,V> extends ReentrantLock implements Serializable {
 3     //自旋最大次數
 4     static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
 5     //哈希表
 6     transient volatile HashEntry<K,V>[] table;
 7     //元素總數
 8     transient int count;
 9     //修改次數
10     transient int modCount;
11     //元素閥值
12     transient int threshold;
13     //加載因子
14     final float loadFactor;
15     //省略如下內容
16     ...
17 }

Segment是ConcurrentHashMap的靜態內部類,能夠看到它繼承自ReentrantLock,所以它在本質上是一個鎖。它在內部持有一個HashEntry數組(哈希表),而且保證全部對該數組的增刪改查方法都是線程安全的,具體是怎樣實現的後面會講到。全部對ConcurrentHashMap的增刪改查操做均可以委託Segment來進行,所以ConcurrentHashMap可以保證在多線程環境下是安全的。又由於不一樣的Segment是不一樣的鎖,因此多線程能夠同時操做不一樣的Segment,也就意味着多線程能夠同時操做ConcurrentHashMap,這樣就能避免HashTable的缺陷,從而極大的提升性能。多線程

3. ConcurrentHashMap初始化時作了些什麼?併發

 1 //核心構造器
 2 @SuppressWarnings("unchecked")
 3 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
 4     if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
 5         throw new IllegalArgumentException();
 6     }
 7     //確保併發級別不大於限定值
 8     if (concurrencyLevel > MAX_SEGMENTS) {
 9         concurrencyLevel = MAX_SEGMENTS;
10     }
11     int sshift = 0;
12     int ssize = 1;
13     //保證ssize爲2的冪, 且是最接近的大於等於併發級別的數
14     while (ssize < concurrencyLevel) {
15         ++sshift;
16         ssize <<= 1;
17     }
18     //計算分段鎖的移位值
19     this.segmentShift = 32 - sshift;
20     //計算分段鎖的掩碼值
21     this.segmentMask = ssize - 1;
22     //總的初始容量不能大於限定值
23     if (initialCapacity > MAXIMUM_CAPACITY) {
24         initialCapacity = MAXIMUM_CAPACITY;
25     }
26     //獲取每一個分段鎖的初始容量
27     int c = initialCapacity / ssize;
28     //分段鎖容量總和不小於初始總容量
29     if (c * ssize < initialCapacity) {
30         ++c;
31     }
32     int cap = MIN_SEGMENT_TABLE_CAPACITY;
33     //保證cap爲2的冪, 且是最接近的大於等於c的數
34     while (cap < c) {
35         cap <<= 1;
36     }
37     //新建一個Segment對象模版
38     Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
39     //新建指定大小的分段鎖數組
40     Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
41     //使用UnSafe給數組第0個元素賦值
42     UNSAFE.putOrderedObject(ss, SBASE, s0);
43     this.segments = ss;
44 }

ConcurrentHashMap有多個構造器,可是上面貼出的是它的核心構造器,其餘構造器都經過調用它來完成初始化。核心構造器須要傳入三個參數,分別是初始容量,加載因子和併發級別。在前面介紹成員變量時咱們能夠知道默認的初始容量爲16,加載因子爲0.75f,併發級別爲16。如今咱們看到核心構造器的代碼,首先是經過傳入的concurrencyLevel來計算出ssize,ssize是Segment數組的長度,它必須保證是2的冪,這樣就能夠經過hash&ssize-1來計算分段鎖在數組中的下標。因爲傳入的concurrencyLevel不能保證是2的冪,因此不能直接用它來看成Segment數組的長度,所以咱們要找到一個最接近concurrencyLevel的2的冪,用它來做爲數組的長度。假如如今傳入的concurrencyLevel=15,經過上面代碼能夠計算出ssize=16,sshift=4。接下來立馬能夠算出segmentShift=16,segmentMask=15。注意這裏的segmentShift是分段鎖的移位值,segmentMask是分段鎖的掩碼值,這兩個值是用來計算分段鎖在數組中的下標,在下面咱們會講到。在算出分段鎖的個數ssize以後,就能夠根據傳入的總容量來計算每一個分段鎖的容量,它的值c = initialCapacity / ssize。分段鎖的容量也就是HashEntry數組的長度,一樣也必須保證是2的冪,而上面算出的c的值不能保證這一點,因此不能直接用c做爲HashEntry數組的長度,須要另外找到一個最接近c的2的冪,將這個值賦給cap,而後用cap來做爲HashEntry數組的長度。如今咱們有了ssize和cap,就能夠新建分段鎖數組Segment[]和元素數組HashEntry[]了。注意,與JDK1.6不一樣是的,在JDK1.7中只新建了Segment數組,並無對它初始化,初始化Segment的操做留到了插入操做時進行。ssh

4. 經過怎樣的方式來定位鎖和定位元素?函數

 1 //根據哈希碼獲取分段鎖
 2 @SuppressWarnings("unchecked")
 3 private Segment<K,V> segmentForHash(int h) {
 4     long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 5     return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
 6 }
 7 
 8 //根據哈希碼獲取元素
 9 @SuppressWarnings("unchecked")
10 static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
11     HashEntry<K,V>[] tab;
12     return (seg == null || (tab = seg.table) == null) ? null :
13     (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
14 }

在JDK1.7中是經過UnSafe來獲取數組元素的,所以這裏比JDK1.6多了些計算數組元素偏移量的代碼,這些代碼咱們暫時不關注,如今咱們只需知道下面這兩點:
a. 經過哈希碼計算分段鎖在數組中的下標:(h >>> segmentShift) & segmentMask。
b. 經過哈希碼計算元素在數組中的下標:(tab.length - 1) & h。
如今咱們假設傳給構造器的兩個參數爲initialCapacity=128, concurrencyLevel=16。根據計算能夠獲得ssize=16, sshift=4,segmentShift=28,segmentMask=15。一樣,算得每一個分段鎖內的HashEntry數組的長度爲8,因此tab.length-1=7。根據這些值,咱們經過下圖來解釋如何根據同一個哈希碼來定位分段鎖和元素。高併發

能夠看到分段鎖和元素的定位都是經過元素的哈希碼來決定的。定位分段鎖是取哈希碼的高位值(從32位處取起),定位元素是取的哈希碼的低位值。如今有個問題,它們一個從32位的左端取起,一個從32位的右端取起,那麼會在某個時刻產生衝突嗎?咱們在成員變量裏能夠找到MAXIMUM_CAPACITY = 1 << 30,MAX_SEGMENTS = 1 << 16,這說明定位分段鎖和定位元素使用的總的位數不超過30,而且定位分段鎖使用的位數不超過16,因此至少還隔着2位的空餘,所以是不會產生衝突的。性能

5. 查找元素具體是怎樣實現的?

 1 //根據key獲取value
 2 public V get(Object key) {
 3     Segment<K,V> s;
 4     HashEntry<K,V>[] tab;
 5     //使用哈希函數計算哈希碼
 6     int h = hash(key);
 7     //根據哈希碼計算分段鎖的索引
 8     long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
 9     //獲取分段鎖和對應的哈希表
10     if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
11         //根據哈希碼獲取鏈表頭結點, 再對鏈表進行遍歷
12         for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
13                  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
14              e != null; e = e.next) {
15             K k;
16             //根據key和hash找到對應元素後返回value值
17             if ((k = e.key) == key || (e.hash == h && key.equals(k))) {
18                 return e.value;
19             }
20         }
21     }
22     return null;
23 }

在JDK1.6中分段鎖的get方法是經過下標來訪問數組元素的,而在JDK1.7中是經過UnSafe的getObjectVolatile方法來讀取數組中的元素。爲啥要這樣作?咱們知道雖然Segment對象持有的HashEntry數組引用是volatile類型的,可是數組內的元素引用不是volatile類型的,所以多線程對數組元素的修改是不安全的,可能會在數組中讀取到還沒有構造完成的對象。在JDK1.6中是經過第二次加鎖讀取來保證安全的,而JDK1.7中經過UnSafe的getObjectVolatile方法來讀取一樣也是爲了保證這一點。使用getObjectVolatile方法讀取數組元素須要先得到元素在數組中的偏移量,在這裏根據哈希碼計算獲得分段鎖在數組中的偏移量爲u,而後經過偏移量u來嘗試讀取分段鎖。因爲分段鎖數組在構造時沒進行初始化,所以可能讀出來一個空值,因此須要先進行判斷。在肯定分段鎖和它內部的哈希表都不爲空以後,再經過哈希碼讀取HashEntry數組的元素,根據上面的結構圖能夠看到,這時得到的是鏈表的頭結點。以後再從頭至尾的對鏈表進行遍歷查找,若是找到對應的值就將其返回,不然就返回null。以上就是整個查找元素的過程。

6. 插入元素具體是怎樣實現的?

 1 //向集合添加鍵值對(若存在則替換)
 2 @SuppressWarnings("unchecked")
 3 public V put(K key, V value) {
 4     Segment<K,V> s;
 5     //傳入的value不能爲空
 6     if (value == null) throw new NullPointerException();
 7     //使用哈希函數計算哈希碼
 8     int hash = hash(key);
 9     //根據哈希碼計算分段鎖的下標
10     int j = (hash >>> segmentShift) & segmentMask;
11     //根據下標去嘗試獲取分段鎖
12     if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
13         //得到的分段鎖爲空就去構造一個
14         s = ensureSegment(j);
15     }
16     //調用分段鎖的put方法
17     return s.put(key, hash, value, false);
18 }
19 
20 //向集合添加鍵值對(不存在才添加)
21 @SuppressWarnings("unchecked")
22 public V putIfAbsent(K key, V value) {
23     Segment<K,V> s;
24     //傳入的value不能爲空
25     if (value == null) throw new NullPointerException();
26     //使用哈希函數計算哈希碼
27     int hash = hash(key);
28     //根據哈希碼計算分段鎖的下標
29     int j = (hash >>> segmentShift) & segmentMask;
30     //根據下標去嘗試獲取分段鎖
31     if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
32         //得到的分段鎖爲空就去構造一個
33         s = ensureSegment(j);
34     }
35     //調用分段鎖的put方法
36     return s.put(key, hash, value, true);
37 }

ConcurrentHashMap中有兩個添加鍵值對的方法,經過put方法添加時若是存在則會進行覆蓋,經過putIfAbsent方法添加時若是存在則不進行覆蓋,這兩個方法都是調用分段鎖的put方法來完成操做,只是傳入的最後一個參數不一樣而已。在上面代碼中咱們能夠看到首先是根據key的哈希碼來計算出分段鎖在數組中的下標,而後根據下標使用UnSafe類getObject方法來讀取分段鎖。因爲在構造ConcurrentHashMap時沒有對Segment數組中的元素初始化,因此可能讀到一個空值,這時會先經過ensureSegment方法新建一個分段鎖。獲取到分段鎖以後再調用它的put方法完成添加操做,下面咱們來看看具體是怎樣操做的。

 1 //添加鍵值對
 2 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
 3     //嘗試獲取鎖, 若失敗則進行自旋
 4     HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
 5     V oldValue;
 6     try {
 7         HashEntry<K,V>[] tab = table;
 8         //計算元素在數組中的下標
 9         int index = (tab.length - 1) & hash;
10         //根據下標獲取鏈表頭結點
11         HashEntry<K,V> first = entryAt(tab, index);
12         for (HashEntry<K,V> e = first;;) {
13             //遍歷鏈表尋找該元素, 找到則進行替換
14             if (e != null) {
15                 K k;
16                 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
17                     oldValue = e.value;
18                     //根據參數決定是否替換舊值
19                     if (!onlyIfAbsent) {
20                         e.value = value;
21                         ++modCount;
22                     }
23                     break;
24                 }
25                 e = e.next;
26             //沒找到則在鏈表添加一個結點
27             } else {
28                 //將node結點插入鏈表頭部
29                 if (node != null) {
30                     node.setNext(first);
31                 } else {
32                     node = new HashEntry<K,V>(hash, key, value, first);
33                 }
34                 //插入結點後將元素老是加1
35                 int c = count + 1;
36                 //元素超過閥值則進行擴容
37                 if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
38                     rehash(node);
39                 //不然就將哈希表指定下標替換爲node結點
40                 } else {
41                     setEntryAt(tab, index, node);
42                 }
43                 ++modCount;
44                 count = c;
45                 oldValue = null;
46                 break;
47             }
48         }
49     } finally {
50         unlock();
51     }
52     return oldValue;
53 }

爲保證線程安全,分段鎖中的put操做是須要進行加鎖的,因此線程一開始就會去獲取鎖,若是獲取成功就繼續執行,若獲取失敗則調用scanAndLockForPut方法進行自旋,在自旋過程當中會先去掃描哈希表去查找指定的key,若是key不存在就會新建一個HashEntry返回,這樣在獲取到鎖以後就沒必要再去新建了,爲的是在等待鎖的過程當中順便作些事情,不至於白白浪費時間,可見做者的良苦用心。具體自旋方法咱們後面再細講,如今先把關注點拉回來,線程在成功獲取到鎖以後會根據計算到的下標,獲取指定下標的元素。此時獲取到的是鏈表的頭結點,若是頭結點不爲空就對鏈表進行遍歷查找,找到以後再根據onlyIfAbsent參數的值決定是否進行替換。若是遍歷沒找到就會新建一個HashEntry指向頭結點,此時若是自旋時建立了HashEntry,則直接將它的next指向當前頭結點,若是自旋時沒有建立就在這裏新建一個HashEntry並指向頭結點。在向鏈表添加元素以後檢查元素總數是否超過閥值,若是超過就調用rehash進行擴容,沒超過的話就直接將數組對應下標的元素引用指向新添加的node。setEntryAt方法內部是經過調用UnSafe的putOrderedObject方法來更改數組元素引用的,這樣就保證了其餘線程在讀取時能夠讀到最新的值。

7. 刪除元素具體是怎樣實現的?

 1 //刪除指定元素(找到對應元素後直接刪除)
 2 public V remove(Object key) {
 3     //使用哈希函數計算哈希碼
 4     int hash = hash(key);
 5     //根據哈希碼獲取分段鎖的索引
 6     Segment<K,V> s = segmentForHash(hash);
 7     //調用分段鎖的remove方法
 8     return s == null ? null : s.remove(key, hash, null);
 9 }
10 
11 //刪除指定元素(查找值等於給定值才刪除)
12 public boolean remove(Object key, Object value) {
13     //使用哈希函數計算哈希碼
14     int hash = hash(key);
15     Segment<K,V> s;
16     //確保分段鎖不爲空才調用remove方法
17     return value != null && (s = segmentForHash(hash)) != null && s.remove(key, hash, value) != null;
18 }

ConcurrentHashMap提供了兩種刪除操做,一種是找到後直接刪除,一種是找到後先比較再刪除。這兩種刪除方法都是先根據key的哈希碼找到對應的分段鎖後,再經過調用分段鎖的remove方法完成刪除操做。下面咱們來看看分段鎖的remove方法。

 1 //刪除指定元素
 2 final V remove(Object key, int hash, Object value) {
 3     //嘗試獲取鎖, 若失敗則進行自旋
 4     if (!tryLock()) {
 5         scanAndLock(key, hash);
 6     }
 7     V oldValue = null;
 8     try {
 9         HashEntry<K,V>[] tab = table;
10         //計算元素在數組中的下標
11         int index = (tab.length - 1) & hash;
12         //根據下標取得數組元素(鏈表頭結點)
13         HashEntry<K,V> e = entryAt(tab, index);
14         HashEntry<K,V> pred = null;
15         //遍歷鏈表尋找要刪除的元素
16         while (e != null) {
17             K k;
18             //next指向當前結點的後繼結點
19             HashEntry<K,V> next = e.next;
20             //根據key和hash尋找對應結點
21             if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
22                 V v = e.value;
23                 //傳入的value不等於v就跳過, 其餘狀況就進行刪除操做
24                 if (value == null || value == v || value.equals(v)) {
25                     //若是pred爲空則表明要刪除的結點爲頭結點
26                     if (pred == null) {
27                         //從新設置鏈表頭結點
28                         setEntryAt(tab, index, next);
29                     } else {
30                         //設置pred結點的後繼爲next結點
31                         pred.setNext(next);
32                     }
33                     ++modCount;
34                     --count;
35                     //記錄元素刪除以前的值
36                     oldValue = v;
37                 }
38                 break;
39             }
40             //若e不是要找的結點就將pred引用指向它
41             pred = e;
42             //檢查下一個結點
43             e = next;
44         }
45     } finally {
46         unlock();
47     }
48     return oldValue;
49 }

在刪除分段鎖中的元素時須要先獲取鎖,若是獲取失敗就調用scanAndLock方法進行自旋,若是獲取成功就執行下一步,首先計算數組下標而後經過下標獲取HashEntry數組的元素,這裏得到了鏈表的頭結點,接下來就是對鏈表進行遍歷查找,在此以前先用next指針記錄當前結點的後繼結點,而後對比key和hash看看是不是要找的結點,若是是的話就執行下一個if判斷。知足value爲空或者value的值等於結點當前值這兩個條件就會進入到if語句中進行刪除操做,不然直接跳過。在if語句中執行刪除操做時會有兩種狀況,若是當前結點爲頭結點則直接將next結點設置爲頭結點,若是當前結點不是頭結點則將pred結點的後繼設置爲next結點。這裏的pred結點表示當前結點的前繼結點,每次在要檢查下一個結點以前就將pred指向當前結點,這就保證了pred結點老是當前結點的前繼結點。注意,與JDK1.6不一樣,在JDK1.7中HashEntry對象的next變量不是final的,所以這裏能夠經過直接修改next引用的值來刪除元素,因爲next變量是volatile類型的,因此讀線程能夠立刻讀到最新的值。

8. 替換元素具體是怎樣實現的?

 1 //替換指定元素(CAS操做)
 2 public boolean replace(K key, V oldValue, V newValue) {
 3     //使用哈希函數計算哈希碼
 4     int hash = hash(key);
 5     //保證oldValue和newValue不爲空
 6     if (oldValue == null || newValue == null) throw new NullPointerException();
 7     //根據哈希碼獲取分段鎖的索引
 8     Segment<K,V> s = segmentForHash(hash);
 9     //調用分段鎖的replace方法
10     return s != null && s.replace(key, hash, oldValue, newValue);
11 }
12 
13 //替換元素操做(CAS操做)
14 final boolean replace(K key, int hash, V oldValue, V newValue) {
15     //嘗試獲取鎖, 若失敗則進行自旋
16     if (!tryLock()) {
17         scanAndLock(key, hash);
18     }
19     boolean replaced = false;
20     try {
21         HashEntry<K,V> e;
22         //經過hash直接找到頭結點而後對鏈表遍歷
23         for (e = entryForHash(this, hash); e != null; e = e.next) {
24             K k;
25             //根據key和hash找到要替換的結點
26             if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
27                 //若是指定的當前值正確則進行替換
28                 if (oldValue.equals(e.value)) {
29                     e.value = newValue;
30                     ++modCount;
31                     replaced = true;
32                 }
33                 //不然不進行任何操做直接返回
34                 break;
35             }
36         }
37     } finally {
38         unlock();
39     }
40     return replaced;
41 }

ConcurrentHashMap一樣提供了兩種替換操做,一種是找到後直接替換,另外一種是找到後先比較再替換(CAS操做)。這兩種操做的實現大體是相同的,只是CAS操做在替換前多了一層比較操做,所以咱們只需簡單瞭解其中一種操做便可。這裏拿CAS操做進行分析,仍是老套路,首先根據key的哈希碼找到對應的分段鎖,而後調用它的replace方法。進入分段鎖中的replace方法後須要先去獲取鎖,若是獲取失敗則進行自旋,若是獲取成功則進行下一步。首先根據hash碼獲取鏈表頭結點,而後根據key和hash進行遍歷查找,找到了對應的元素以後,比較給定的oldValue是不是當前值,若是不是則放棄修改,若是是則用新值進行替換。因爲HashEntry對象的value域是volatile類型的,所以能夠直接替換。

9. 自旋時具體作了些什麼?

 1 //自旋等待獲取鎖(put操做)
 2 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
 3     //根據哈希碼獲取頭結點
 4     HashEntry<K,V> first = entryForHash(this, hash);
 5     HashEntry<K,V> e = first;
 6     HashEntry<K,V> node = null;
 7     int retries = -1;
 8     //在while循環內自旋
 9     while (!tryLock()) {
10         HashEntry<K,V> f;
11         if (retries < 0) {
12             //若是頭結點爲空就新建一個node
13             if (e == null) {
14                 if (node == null) {
15                     node = new HashEntry<K,V>(hash, key, value, null);
16                 }
17                 retries = 0;
18             //不然就遍歷鏈表定位該結點
19             } else if (key.equals(e.key)) {
20                 retries = 0;
21             } else {
22                 e = e.next;
23             }
24           //retries每次在這加1, 並判斷是否超過最大值
25         } else if (++retries > MAX_SCAN_RETRIES) {
26             lock();
27             break;
28           //retries爲偶數時去判斷first有沒有改變
29         } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
30             e = first = f;
31             retries = -1;
32         }
33     }
34     return node;
35 }
36 
37 //自旋等待獲取鎖(remove和replace操做)
38 private void scanAndLock(Object key, int hash) {
39     //根據哈希碼獲取鏈表頭結點
40     HashEntry<K,V> first = entryForHash(this, hash);
41     HashEntry<K,V> e = first;
42     int retries = -1;
43     //在while循環裏自旋
44     while (!tryLock()) {
45         HashEntry<K,V> f;
46         if (retries < 0) {
47             //遍歷鏈表定位到該結點
48             if (e == null || key.equals(e.key)) {
49                 retries = 0;
50             } else {
51                 e = e.next;
52             }
53           //retries每次在這加1, 並判斷是否超過最大值
54         } else if (++retries > MAX_SCAN_RETRIES) {
55             lock();
56             break;
57           //retries爲偶數時去判斷first有沒有改變
58         } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) {
59             e = first = f;
60             retries = -1;
61         }
62     }
63 }

在前面咱們講到過,分段鎖中的put,remove,replace這些操做都會要求先去獲取鎖,只有成功得到鎖以後才能進行下一步操做,若是獲取失敗就會進行自旋。自旋操做也是在JDK1.7中添加的,爲了不線程頻繁的掛起和喚醒,以此提升併發操做時的性能。在put方法中調用的是scanAndLockForPut,在remove和replace方法中調用的是scanAndLock。這兩種自旋方法大體是相同的,這裏咱們只分析scanAndLockForPut方法。首先仍是先根據hash碼得到鏈表頭結點,以後線程會進入while循環中執行,退出該循環的惟一方式是成功獲取鎖,而在這期間線程不會被掛起。剛進入循環時retries的值爲-1,這時線程不會立刻再去嘗試獲取鎖,而是先去尋找到key對應的結點(沒找到會新建一個),而後再將retries設爲0,接下來就會一次次的嘗試獲取鎖,對應retries的值也會每次加1,直到超過最大嘗試次數若是還沒獲取到鎖,就會調用lock方法進行阻塞獲取。在嘗試獲取鎖的期間,還會每隔一次(retries爲偶數)去檢查頭結點是否被改變,若是被改變則將retries重置回-1,而後再重走一遍剛纔的流程。這就是線程自旋時所作的操做,需注意的是若是在自旋時檢測到頭結點已被改變,則會延長線程的自旋時間。

10. 哈希表擴容時都作了哪些操做?

 1 //再哈希
 2 @SuppressWarnings("unchecked")
 3 private void rehash(HashEntry<K,V> node) {
 4     //獲取舊哈希表的引用
 5     HashEntry<K,V>[] oldTable = table;
 6     //獲取舊哈希表的容量
 7     int oldCapacity = oldTable.length;
 8     //計算新哈希表的容量(爲舊哈希表的2倍)
 9     int newCapacity = oldCapacity << 1;
10     //計算新的元素閥值
11     threshold = (int)(newCapacity * loadFactor);
12     //新建一個HashEntry數組
13     HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
14     //生成新的掩碼值
15     int sizeMask = newCapacity - 1;
16     //遍歷舊錶的全部元素
17     for (int i = 0; i < oldCapacity ; i++) {
18         //取得鏈表頭結點
19         HashEntry<K,V> e = oldTable[i];
20         if (e != null) {
21             HashEntry<K,V> next = e.next;
22             //計算元素在新表中的索引
23             int idx = e.hash & sizeMask;
24             //next爲空代表鏈表只有一個結點
25             if (next == null) {
26                 //直接把該結點放到新表中
27                 newTable[idx] = e;
28             }else {
29                 HashEntry<K,V> lastRun = e;
30                 int lastIdx = idx;
31                 //定位lastRun結點, 將lastRun以後的結點直接放到新表中
32                 for (HashEntry<K,V> last = next; last != null; last = last.next) {
33                     int k = last.hash & sizeMask;
34                     if (k != lastIdx) {
35                         lastIdx = k;
36                         lastRun = last;
37                     }
38                 }
39                 newTable[lastIdx] = lastRun;
40                 //遍歷在鏈表lastRun結點以前的元素, 將它們依次複製到新表中
41                 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
42                     V v = p.value;
43                     int h = p.hash;
44                     int k = h & sizeMask;
45                     HashEntry<K,V> n = newTable[k];
46                     newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
47                 }
48             }
49         }
50     }
51     //計算傳入結點在新表中的下標
52     int nodeIndex = node.hash & sizeMask;
53     //將傳入結點添加到鏈表頭結點
54     node.setNext(newTable[nodeIndex]);
55     //將新表指定下標元素換成傳入結點
56     newTable[nodeIndex] = node;
57     //將哈希表引用指向新表
58     table = newTable;
59 }

rehash方法在put方法中被調用,咱們知道在put方法時會新建元素並添加到哈希數組中,隨着元素的增多發生哈希衝突的可能性越大,哈希表的性能也會隨之降低。所以每次put操做時都會檢查元素總數是否超過閥值,若是超過則調用rehash方法進行擴容。由於數組長度一旦肯定則不能再被改變,所以須要新建一個數組來替換原先的數組。從代碼中能夠知道新建立的數組長度爲原數組的2倍(oldCapacity << 1)。建立好新數組後須要將舊數組中的全部元素移到新數組中,所以須要計算每一個元素在新數組中的下標。計算新下標的過程以下圖所示。

咱們知道下標直接取的是哈希碼的後幾位,因爲新數組的容量是直接用舊數組容量右移1位得來的,所以掩碼位數向右增長1位,取到的哈希碼位數也向右增長1位。如上圖,若舊的掩碼值爲111,則元素下標爲101,擴容後新的掩碼值爲1111,則計算出元素的新下標爲0101。因爲同一條鏈表上的元素下標是相同的,如今假設鏈表全部元素的下標爲101,在擴容後該鏈表元素的新下標只有0101或1101這兩種狀況,所以數組擴容會打亂原先的鏈表並將鏈表元素分紅兩批。在計算出新下標後須要將元素移動到新數組中,在HashMap中經過直接修改next引用致使了多線程的死鎖。雖然在ConcurrentHashMap中經過加鎖避免了這種狀況,可是咱們知道next域是volatile類型的,它的改動能立馬被讀線程讀取到,所以爲保證線程安全採用複製元素來遷移數組。可是對鏈表中每一個元素都進行復制有點影響性能,做者發現鏈表尾部有許多元素的next是不變的,它們在新數組中的下標是相同的,所以能夠考慮總體移動這部分元素。具統計實際操做中只有1/6的元素是必須複製的,因此總體移動鏈表尾部元素(lastRun後面的元素)是能夠提高必定性能的。

注:本篇文章基於JDK1.7版本。

相關文章
相關標籤/搜索