ConcurrentHashMap的瞭解

概述

從JDK1.2起,就有了HashMap,正如前一篇文章所說,HashMap不是線程安全的,所以多線程操做時須要格外當心。 面試

在JDK1.5中,偉大的Doug Lea給咱們帶來了concurrent包,今後Map也有安全的了。 算法

ConcurrentHashMap具體是怎麼實現線程安全的呢,確定不多是每一個方法加synchronized,那樣就變成了HashTable。 數組

從ConcurrentHashMap代碼中能夠看出,它引入了一個「分段鎖」的概念,具體能夠理解爲把一個大的Map拆分紅N個小的HashTable,根據key.hashCode()來決定把key放到哪一個HashTable中。 安全

在ConcurrentHashMap中,就是把Map分紅了N個Segment,put和get的時候,都是現根據key.hashCode()算出放到哪一個Segment中: 多線程


HashMap是非線程安全的,HashTable是線程安全的。 併發

那個時候沒怎麼寫Java代碼,因此根本就沒有據說過ConcurrentHashMap,只知道面試的時候就記住這句話就好了…至於爲何是線程安全的,內部怎麼實現的,統統不瞭解。

今天咱們將深刻剖析一個比HashTable性能更優的線程安全的Map類,它就是ConcurrentHashMap,本文基於Java 7的源碼作剖析ssh

ConcurrentHashMap的目的

多線程環境下,使用Hashmap進行put操做會引發死循環,致使CPU利用率接近100%,因此在併發狀況下不能使用HashMap。雖然已經有一個線程安全的HashTable,可是HashTable容器使用synchronized(他的get和put方法的實現代碼以下)來保證線程安全,在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法時,訪問其餘同步方法的線程就可能會進入阻塞或者輪訓狀態。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,而且也不能使用get方法來獲取元素,因此競爭越激烈效率越低。 async

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
public synchronized V get(Object key) {  Entry<?,?> tab[] = table;  int hash = key.hashCode();  int index = (hash & 0x7FFFFFFF) % tab.length;  for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {  if ((e.hash == hash) && e.key.equals(key)) {  return (V)e.value;  }  }  return null; } public synchronized V put(K key, V value) {  // Make sure the value is not null  if (value == null) {  throw new NullPointerException();  }   // Makes sure the key is not already in the hashtable.  Entry<?,?> tab[] = table;  int hash = key.hashCode();  int index = (hash & 0x7FFFFFFF) % tab.length;  @SuppressWarnings("unchecked")  Entry<K,V> entry = (Entry<K,V>)tab[index];  for(; entry != null ; entry = entry.next) {  if ((entry.hash == hash) && entry.key.equals(key)) {  V old = entry.value;  entry.value = value;  return old;  }  }   addEntry(hash, key, value, index);  return null; }

在這麼惡劣的環境下,ConcurrentHashMap應運而生。 ide

實現原理

ConcurrentHashMap使用分段鎖技術,將數據分紅一段一段的存儲,而後給每一段數據配一把鎖,當一個線程佔用鎖訪問其中一個段數據的時候,其餘段的數據也能被其餘線程訪問,可以實現真正的併發訪問。以下圖是ConcurrentHashMap的內部結構圖:

從圖中能夠看到,ConcurrentHashMap內部分爲不少個Segment,每個Segment擁有一把鎖,而後每一個Segment(繼承ReentrantLock)下面包含不少個HashEntry列表數組。對於一個key,須要通過三次(爲何要hash三次下文會詳細講解)hash操做,才能最終定位這個元素的位置,這三次hash分別爲: 函數

  1. 對於一個key,先進行一次hash操做,獲得hash值h1,也即h1 = hash1(key);
  2. 將獲得的h1的高几位進行第二次hash,獲得hash值h2,也即h2 = hash2(h1高几位),經過h2可以肯定該元素的放在哪一個Segment;
  3. 將獲得的h1進行第三次hash,獲得hash值h3,也即h3 = hash3(h1),經過h3可以肯定該元素放置在哪一個HashEntry。

初始化

先看看ConcurrentHashMap的初始化作了哪些事情,構造函數的源碼以下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
public ConcurrentHashMap(int initialCapacity,  float loadFactor, int concurrencyLevel) {  if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)  throw new IllegalArgumentException();  if (concurrencyLevel > MAX_SEGMENTS)  concurrencyLevel = MAX_SEGMENTS;  // Find power-of-two sizes best matching arguments  int sshift = 0;  int ssize = 1;  while (ssize < concurrencyLevel) {  ++sshift;  ssize <<= 1;  }  this.segmentShift = 32 - sshift;  this.segmentMask = ssize - 1;  if (initialCapacity > MAXIMUM_CAPACITY)  initialCapacity = MAXIMUM_CAPACITY;  int c = initialCapacity / ssize;  if (c * ssize < initialCapacity)  ++c;  int cap = MIN_SEGMENT_TABLE_CAPACITY;  while (cap < c)  cap <<= 1;  // create segments and segments[0]  Segment<K,V> s0 =  new Segment<K,V>(loadFactor, (int)(cap * loadFactor),  (HashEntry<K,V>[])new HashEntry[cap]);  Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];  UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]  this.segments = ss;  }

傳入的參數有initialCapacity,loadFactor,concurrencyLevel這三個。

  • initialCapacity表示新建立的這個ConcurrentHashMap的初始容量,也就是上面的結構圖中的Entry數量。默認值爲static final int DEFAULT_INITIAL_CAPACITY = 16;
  • loadFactor表示負載因子,就是當ConcurrentHashMap中的元素個數大於loadFactor * 最大容量時就須要rehash,擴容。默認值爲static final float DEFAULT_LOAD_FACTOR = 0.75f;
  • concurrencyLevel表示併發級別,這個值用來肯定Segment的個數,Segment的個數是大於等於concurrencyLevel的第一個2的n次方的數。好比,若是concurrencyLevel爲12,13,14,15,16這些數,則Segment的數目爲16(2的4次方)。默認值爲static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想狀況下ConcurrentHashMap的真正的併發訪問量可以達到concurrencyLevel,由於有concurrencyLevel個Segment,假若有concurrencyLevel個線程須要訪問Map,而且須要訪問的數據都剛好分別落在不一樣的Segment中,則這些線程可以無競爭地自由訪問(由於他們不須要競爭同一把鎖),達到同時訪問的效果。這也是爲何這個參數起名爲「併發級別」的緣由。

初始化的一些動做:

  1. 驗證參數的合法性,若是不合法,直接拋出異常。
  2. concurrencyLevel也就是Segment的個數不能超過規定的最大Segment的個數,默認值爲static final int MAX_SEGMENTS = 1 << 16;,若是超過這個值,設置爲這個值。
  3. 而後使用循環找到大於等於concurrencyLevel的第一個2的n次方的數ssize,這個數就是Segment數組的大小,並記錄一共向左按位移動的次數sshift,並令segmentShift = 32 - sshift,而且segmentMask的值等於ssize - 1,segmentMask的各個二進制位都爲1,目的是以後能夠經過key的hash值與這個值作&運算肯定Segment的索引。
  4. 檢查給的容量值是否大於容許的最大容量值,若是大於該值,設置爲該值。最大容量值爲static final int MAXIMUM_CAPACITY = 1 << 30;。
  5. 而後計算每一個Segment平均應該放置多少個元素,這個值c是向上取整的值。好比初始容量爲15,Segment個數爲4,則每一個Segment平均須要放置4個元素。
  6. 最後建立一個Segment實例,將其當作Segment數組的第一個元素。

put操做

put操做的源碼以下:

1 2 3 4 5 6 7 8 9 10 11
public V put(K key, V value) {  Segment<K,V> s;  if (value == null)  throw new NullPointerException();  int hash = hash(key);  int j = (hash >>> segmentShift) & segmentMask;  if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck  (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment  s = ensureSegment(j);  return s.put(key, hash, value, false);  }

操做步驟以下:

  1. 判斷value是否爲null,若是爲null,直接拋出異常。
  2. key經過一次hash運算獲得一個hash值。(這個hash運算下文詳說)
  3. 將獲得hash值向右按位移動segmentShift位,而後再與segmentMask作&運算獲得segment的索引j。
    在初始化的時候咱們說過segmentShift的值等於32-sshift,例如concurrencyLevel等於16,則sshift等於4,則segmentShift爲28。hash值是一個32位的整數,將其向右移動28位就變成這個樣子:
    0000 0000 0000 0000 0000 0000 0000 xxxx,而後再用這個值與segmentMask作&運算,也就是取最後四位的值。這個值肯定Segment的索引。
  4. 使用Unsafe的方式從Segment數組中獲取該索引對應的Segment對象。
  5. 向這個Segment對象中put值,這個put操做也基本是同樣的步驟(經過&運算獲取HashEntry的索引,而後set)。

get操做

get操做的源碼以下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
public V get(Object key) {  Segment<K,V> s; // manually integrate access methods to reduce overhead  HashEntry<K,V>[] tab;  int h = hash(key);  long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;  if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&  (tab = s.table) != null) {  for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);  e != null; e = e.next) {  K k;  if ((k = e.key) == key || (e.hash == h && key.equals(k)))  return e.value;  }  }  return null;  }

操做步驟爲:

  1. 和put操做同樣,先經過key進行兩次hash肯定應該去哪一個Segment中取數據。
  2. 使用Unsafe獲取對應的Segment,而後再進行一次&運算獲得HashEntry鏈表的位置,而後從鏈表頭開始遍歷整個鏈表(由於Hash可能會有碰撞,因此用一個鏈表保存),若是找到對應的key,則返回對應的value值,若是鏈表遍歷完都沒有找到對應的key,則說明Map中不包含該key,返回null。

size操做

size操做與put和get操做最大的區別在於,size操做須要遍歷全部的Segment才能算出整個Map的大小,而put和get都只關心一個Segment。假設咱們當前遍歷的Segment爲SA,那麼在遍歷SA過程當中其餘的Segment好比SB可能會被修改,因而這一次運算出來的size值可能並非Map當前的真正大小。因此一個比較簡單的辦法就是計算Map大小的時候全部的Segment都Lock住,不能更新(包含put,remove等等)數據,計算完以後再Unlock。這是普通人可以想到的方案,可是牛逼的做者還有一個更好的Idea:先給3次機會,不lock全部的Segment,遍歷全部Segment,累加各個Segment的大小獲得整個Map的大小,若是某相鄰的兩次計算獲取的全部Segment的更新的次數(每一個Segment都有一個modCount變量,這個變量在Segment中的Entry被修改時會加一,經過這個值能夠獲得每一個Segment的更新操做的次數)是同樣的,說明計算過程當中沒有更新操做,則直接返回這個值。若是這三次不加鎖的計算過程當中Map的更新次數有變化,則以後的計算先對全部的Segment加鎖,再遍歷全部Segment計算Map大小,最後再解鎖全部Segment。源代碼以下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
public int size() {  // Try a few times to get accurate count. On failure due to  // continuous async changes in table, resort to locking.  final Segment<K,V>[] segments = this.segments;  int size;  boolean overflow; // true if size overflows 32 bits  long sum; // sum of modCounts  long last = 0L; // previous sum  int retries = -1; // first iteration isn't retry  try {  for (;;) {  if (retries++ == RETRIES_BEFORE_LOCK) {  for (int j = 0; j < segments.length; ++j)  ensureSegment(j).lock(); // force creation  }  sum = 0L;  size = 0;  overflow = false;  for (int j = 0; j < segments.length; ++j) {  Segment<K,V> seg = segmentAt(segments, j);  if (seg != null) {  sum += seg.modCount;  int c = seg.count;  if (c < 0 || (size += c) < 0)  overflow = true;  }  }  if (sum == last)  break;  last = sum;  }  } finally {  if (retries > RETRIES_BEFORE_LOCK) {  for (int j = 0; j < segments.length; ++j)  segmentAt(segments, j).unlock();  }  }  return overflow ? Integer.MAX_VALUE : size;  }

舉個例子:

一個Map有4個Segment,標記爲S1,S2,S3,S4,如今咱們要獲取Map的size。計算過程是這樣的:第一次計算,不對S1,S2,S3,S4加鎖,遍歷全部的Segment,假設每一個Segment的大小分別爲1,2,3,4,更新操做次數分別爲:2,2,3,1,則此次計算能夠獲得Map的總大小爲1+2+3+4=10,總共更新操做次數爲2+2+3+1=8;第二次計算,不對S1,S2,S3,S4加鎖,遍歷全部Segment,假設此次每一個Segment的大小變成了2,2,3,4,更新次數分別爲3,2,3,1,由於兩次計算獲得的Map更新次數不一致(第一次是8,第二次是9)則能夠判定這段時間Map數據被更新,則此時應該再試一次;第三次計算,不對S1,S2,S3,S4加鎖,遍歷全部Segment,假設每一個Segment的更新操做次數仍是爲3,2,3,1,則由於第二次計算和第三次計算獲得的Map的更新操做的次數是一致的,就能說明第二次計算和第三次計算這段時間內Map數據沒有被更新,此時能夠直接返回第三次計算獲得的Map的大小。最壞的狀況:第三次計算獲得的數據更新次數和第二次也不同,則只能先對全部Segment加鎖再計算最後解鎖。

containsValue操做

containsValue操做採用了和size操做同樣的想法:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
public boolean containsValue(Object value) {  // Same idea as size()  if (value == null)  throw new NullPointerException();  final Segment<K,V>[] segments = this.segments;  boolean found = false;  long last = 0;  int retries = -1;  try {  outer: for (;;) {  if (retries++ == RETRIES_BEFORE_LOCK) {  for (int j = 0; j < segments.length; ++j)  ensureSegment(j).lock(); // force creation  }  long hashSum = 0L;  int sum = 0;  for (int j = 0; j < segments.length; ++j) {  HashEntry<K,V>[] tab;  Segment<K,V> seg = segmentAt(segments, j);  if (seg != null && (tab = seg.table) != null) {  for (int i = 0 ; i < tab.length; i++) {  HashEntry<K,V> e;  for (e = entryAt(tab, i); e != null; e = e.next) {  V v = e.value;  if (v != null && value.equals(v)) {  found = true;  break outer;  }  }  }  sum += seg.modCount;  }  }  if (retries > 0 && sum == last)  break;  last = sum;  }  } finally {  if (retries > RETRIES_BEFORE_LOCK) {  for (int j = 0; j < segments.length; ++j)  segmentAt(segments, j).unlock();  }  }  return found;  }

關於hash

你們必定還記得使用一個key定位Segment以前進行過一次hash操做吧?此次hash的做用是什麼呢?看看hash的源代碼:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
private int hash(Object k) {  int h = hashSeed;   if ((0 != h) && (k instanceof String)) {  return sun.misc.Hashing.stringHash32((String) k);  }   h ^= k.hashCode();   // Spread bits to regularize both segment and index locations,  // using variant of single-word Wang/Jenkins hash.  h += (h << 15) ^ 0xffffcd7d;  h ^= (h >>> 10);  h += (h << 3);  h ^= (h >>> 6);  h += (h << 2) + (h << 14);  return h ^ (h >>> 16);  }

源碼中的註釋是這樣的:

Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.

這裏用到了Wang/Jenkins hash算法的變種,主要的目的是爲了減小哈希衝突,使元素可以均勻的分佈在不一樣的Segment上,從而提升容器的存取效率。假如哈希的質量差到極點,那麼全部的元素都在一個Segment中,不只存取元素緩慢,分段鎖也會失去意義。

舉個簡單的例子:

1 2 3 4
System.out.println(Integer.parseInt("0001111", 2) & 15); System.out.println(Integer.parseInt("0011111", 2) & 15); System.out.println(Integer.parseInt("0111111", 2) & 15); System.out.println(Integer.parseInt("1111111", 2) & 15);

這些數字獲得的hash值都是同樣的,全是15,因此若是不進行第一次預hash,發生衝突的概率仍是很大的,可是若是咱們先把上例中的二進制數字使用hash()函數先進行一次預hash,獲得的結果是這樣的:

0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010

上面這個例子引用自: InfoQ
能夠看到每一位的數據都散開了,而且ConcurrentHashMap中是使用預hash值的高位參與運算的。好比以前說的先將hash值向右按位移動28位,再與15作&運算,獲得的結果都別爲:4,15,7,8,沒有衝突!

注意事項

  • ConcurrentHashMap中的key和value值都不能爲null。
  • ConcurrentHashMap的整個操做過程當中大量使用了Unsafe類來獲取Segment/HashEntry,這裏Unsafe的主要做用是提供原子操做。Unsafe這個類比較恐怖,破壞力極強,通常場景不建議使用,若是有興趣能夠到這裏作詳細的瞭解Java中不爲人知的特性
  • ConcurrentHashMap是線程安全的類並不能保證使用了ConcurrentHashMap的操做都是線程安全的!

http://qifuguang.me/2015/09/10/[Java%E5%B9%B6%E5%8F%91%E5%8C%85%E5%AD%A6%E4%B9%A0%E5%85%AB]%E6%B7%B1%E5%BA%A6%E5%89%96%E6%9E%90ConcurrentHashMap/

相關文章
相關標籤/搜索