從ConcurrentHashMap的演進看Java多線程核心技術 Java進階(六)

本文分析了HashMap的實現原理,以及resize可能引發死循環和Fast-fail等線程不安全行爲。同時結合源碼從數據結構,尋址方式,同步方式,計算size等角度分析了JDK 1.7和JDK 1.8中ConcurrentHashMap的實現原理。java

原創文章,同步首發自做者我的博客,轉載請在文章開頭處以超連接註明出處。http://www.jasongj.com/java/concurrenthashmap/react

線程不安全的HashMap

衆所周知,HashMap是非線程安全的。而HashMap的線程不安全主要體如今resize時的死循環及使用迭代器時的fast-fail上。數組

注:本章的代碼均基於JDK 1.7.0_67緩存

HashMap工做原理

HashMap數據結構

經常使用的底層數據結構主要有數組和鏈表。數組存儲區間連續,佔用內存較多,尋址容易,插入和刪除困難。鏈表存儲區間離散,佔用內存較少,尋址困難,插入和刪除容易。安全

HashMap要實現的是哈希表的效果,儘可能實現O(1)級別的增刪改查。它的具體實現則是同時使用了數組和鏈表,能夠認爲最外層是一個數組,數組的每一個元素是一個鏈表的表頭。數據結構

HashMap尋址方式

對於新插入的數據或者待讀取的數據,HashMap將Key的哈希值對數組長度取模,結果做爲該Entry在數組中的index。在計算機中,取模的代價遠高於位操做的代價,所以HashMap要求數組的長度必須爲2的N次方。此時將Key的哈希值對2^N-1進行與運算,其效果即與取模等效。HashMap並不要求用戶在指定HashMap容量時必須傳入一個2的N次方的整數,而是會經過Integer.highestOneBit算出比指定整數小的最大的2^N值,其實現方法以下。多線程

public static int highestOneBit(int i) {
  i |= (i >>  1);
  i |= (i >>  2);
  i |= (i >>  4);
  i |= (i >>  8);
  i |= (i >> 16);
  return i - (i >>> 1);
}

因爲Key的哈希值的分佈直接決定了全部數據在哈希表上的分佈或者說決定了哈希衝突的可能性,所以爲防止糟糕的Key的hashCode實現(例如低位都相同,只有高位不相同,與2^N-1取與後的結果都相同),JDK 1.7的HashMap經過以下方法使得最終的哈希值的二進制形式中的1儘可能均勻分佈從而儘量減小哈希衝突。併發

int h = hashSeed;
h ^= k.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);

resize死循環

transfer方法

當HashMap的size超過Capacity*loadFactor時,須要對HashMap進行擴容。具體方法是,建立一個新的,長度爲原來Capacity兩倍的數組,保證新的Capacity仍爲2的N次方,從而保證上述尋址方式仍適用。同時須要經過以下transfer方法將原來的全部數據所有從新插入(rehash)到新的數組中。ssh

void transfer(Entry[] newTable, boolean rehash) {
  int newCapacity = newTable.length;
  for (Entry<K,V> e : table) {
    while(null != e) {
      Entry<K,V> next = e.next;
      if (rehash) {
        e.hash = null == e.key ? 0 : hash(e.key);
      }
      int i = indexFor(e.hash, newCapacity);
      e.next = newTable[i];
      newTable[i] = e;
      e = next;
    }
  }
}

該方法並不保證線程安全,並且在多線程併發調用時,可能出現死循環。其執行過程以下。從步驟2可見,轉移時鏈表順序反轉。高併發

  1. 遍歷原數組中的元素
  2. 對鏈表上的每個節點遍歷:用next取得要轉移那個元素的下一個,將e轉移到新數組的頭部,使用頭插法插入節點
  3. 循環2,直到鏈表節點所有轉移
  4. 循環1,直到全部元素所有轉移

單線程rehash

單線程狀況下,rehash無問題。下圖演示了單線程條件下的rehash過程
HashMap rehash single thread

多線程併發下的rehash

這裏假設有兩個線程同時執行了put操做並引起了rehash,執行了transfer方法,並假設線程一進入transfer方法並執行完next = e.next後,由於線程調度所分配時間片用完而「暫停」,此時線程二完成了transfer方法的執行。此時狀態以下。

HashMap rehash multi thread step 1

接着線程1被喚醒,繼續執行第一輪循環的剩餘部分

e.next = newTable[1] = null
newTable[1] = e = key(5)
e = next = key(9)

結果以下圖所示
HashMap rehash multi thread step 2

接着執行下一輪循環,結果狀態圖以下所示
HashMap rehash multi thread step 3

繼續下一輪循環,結果狀態圖以下所示
HashMap rehash multi thread step 4

此時循環鏈表造成,而且key(11)沒法加入到線程1的新數組。在下一次訪問該鏈表時會出現死循環。

Fast-fail

產生緣由

在使用迭代器的過程當中若是HashMap被修改,那麼ConcurrentModificationException將被拋出,也即Fast-fail策略。

當HashMap的iterator()方法被調用時,會構造並返回一個新的EntryIterator對象,並將EntryIterator的expectedModCount設置爲HashMap的modCount(該變量記錄了HashMap被修改的次數)。

HashIterator() {
  expectedModCount = modCount;
  if (size > 0) { // advance to first entry
  Entry[] t = table;
  while (index < t.length && (next = t[index++]) == null)
    ;
  }
}

在經過該Iterator的next方法訪問下一個Entry時,它會先檢查本身的expectedModCount與HashMap的modCount是否相等,若是不相等,說明HashMap被修改,直接拋出ConcurrentModificationException。該Iterator的remove方法也會作相似的檢查。該異常的拋出意在提醒用戶及早意識到線程安全問題。

線程安全解決方案

單線程條件下,爲避免出現ConcurrentModificationException,須要保證只經過HashMap自己或者只經過Iterator去修改數據,不能在Iterator使用結束以前使用HashMap自己的方法修改數據。由於經過Iterator刪除數據時,HashMap的modCount和Iterator的expectedModCount都會自增,不影響兩者的相等性。若是是增長數據,只能經過HashMap自己的方法完成,此時若是要繼續遍歷數據,須要從新調用iterator()方法從而從新構造出一個新的Iterator,使得新Iterator的expectedModCount與更新後的HashMap的modCount相等。

多線程條件下,可以使用Collections.synchronizedMap方法構造出一個同步Map,或者直接使用線程安全的ConcurrentHashMap。

Java 7基於分段鎖的ConcurrentHashMap

注:本章的代碼均基於JDK 1.7.0_67

數據結構

Java 7中的ConcurrentHashMap的底層數據結構仍然是數組和鏈表。與HashMap不一樣的是,ConcurrentHashMap最外層不是一個大的數組,而是一個Segment的數組。每一個Segment包含一個與HashMap數據結構差很少的鏈表數組。總體數據結構以下圖所示。
JAVA 7 ConcurrentHashMap

尋址方式

在讀寫某個Key時,先取該Key的哈希值。並將哈希值的高N位對Segment個數取模從而獲得該Key應該屬於哪一個Segment,接着如同操做HashMap同樣操做這個Segment。爲了保證不一樣的值均勻分佈到不一樣的Segment,須要經過以下方法計算哈希值。

private int hash(Object k) {
  int h = hashSeed;
  if ((0 != h) && (k instanceof String)) {
    return sun.misc.Hashing.stringHash32((String) k);
  }
  h ^= k.hashCode();
  h += (h <<  15) ^ 0xffffcd7d;
  h ^= (h >>> 10);
  h += (h <<   3);
  h ^= (h >>>  6);
  h += (h <<   2) + (h << 14);
  return h ^ (h >>> 16);
}

一樣爲了提升取模運算效率,經過以下計算,ssize即爲大於concurrencyLevel的最小的2的N次方,同時segmentMask爲2^N-1。這一點跟上文中計算數組長度的方法一致。對於某一個Key的哈希值,只須要向右移segmentShift位以取高sshift位,再與segmentMask取與操做便可獲得它在Segment數組上的索引。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

同步方式

Segment繼承自ReentrantLock,因此咱們能夠很方便的對每個Segment上鎖。

對於讀操做,獲取Key所在的Segment時,須要保證可見性(請參考如何保證多線程條件下的可見性)。具體實現上可使用volatile關鍵字,也可以使用鎖。但使用鎖開銷太大,而使用volatile時每次寫操做都會讓全部CPU內緩存無效,也有必定開銷。ConcurrentHashMap使用以下方法保證可見性,取得最新的Segment。

Segment<K,V> s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)

獲取Segment中的HashEntry時也使用了相似方法

HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE)

對於寫操做,並不要求同時獲取全部Segment的鎖,由於那樣至關於鎖住了整個Map。它會先獲取該Key-Value對所在的Segment的鎖,獲取成功後就能夠像操做一個普通的HashMap同樣操做該Segment,並保證該Segment的安全性。
同時因爲其它Segment的鎖並未被獲取,所以理論上可支持concurrencyLevel(等於Segment的個數)個線程安全的併發讀寫。

獲取鎖時,並不直接使用lock來獲取,由於該方法獲取鎖失敗時會掛起(參考可重入鎖)。事實上,它使用了自旋鎖,若是tryLock獲取鎖失敗,說明鎖被其它線程佔用,此時經過循環再次以tryLock的方式申請鎖。若是在循環過程當中該Key所對應的鏈表頭被修改,則重置retry次數。若是retry次數超過必定值,則使用lock方法申請鎖。

這裏使用自旋鎖是由於自旋鎖的效率比較高,可是它消耗CPU資源比較多,所以在自旋次數超過閾值時切換爲互斥鎖。

size操做

put、remove和get操做只須要關心一個Segment,而size操做須要遍歷全部的Segment才能算出整個Map的大小。一個簡單的方案是,先鎖住全部Sgment,計算完後再解鎖。但這樣作,在作size操做時,不只沒法對Map進行寫操做,同時也沒法進行讀操做,不利於對Map的並行操做。

爲更好支持併發操做,ConcurrentHashMap會在不上鎖的前提逐個Segment計算3次size,若是某相鄰兩次計算獲取的全部Segment的更新次數(每一個Segment都與HashMap同樣經過modCount跟蹤本身的修改次數,Segment每修改一次其modCount加一)相等,說明這兩次計算過程當中無更新操做,則這兩次計算出的總size相等,可直接做爲最終結果返回。若是這三次計算過程當中Map有更新,則對全部Segment加鎖從新計算Size。該計算方法代碼以下

public int size() {
  final Segment<K,V>[] segments = this.segments;
  int size;
  boolean overflow; // true if size overflows 32 bits
  long sum;         // sum of modCounts
  long last = 0L;   // previous sum
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  return overflow ? Integer.MAX_VALUE : size;
}

不一樣之處

ConcurrentHashMap與HashMap相比,有如下不一樣點

  • ConcurrentHashMap線程安全,而HashMap非線程安全
  • HashMap容許Key和Value爲null,而ConcurrentHashMap不容許
  • HashMap不容許經過Iterator遍歷的同時經過HashMap修改,而ConcurrentHashMap容許該行爲,而且該更新對後續的遍歷可見

Java 8基於CAS的ConcurrentHashMap

注:本章的代碼均基於JDK 1.8.0_111

數據結構

Java 7爲實現並行訪問,引入了Segment這一結構,實現了分段鎖,理論上最大併發度與Segment個數相等。Java 8爲進一步提升併發性,摒棄了分段鎖的方案,而是直接使用一個大的數組。同時爲了提升哈希碰撞下的尋址性能,Java 8在鏈表長度超過必定閾值(8)時將鏈表(尋址時間複雜度爲O(N))轉換爲紅黑樹(尋址時間複雜度爲O(long(N)))。其數據結構以下圖所示


JAVA 8 ConcurrentHashMap

尋址方式

Java 8的ConcurrentHashMap一樣是經過Key的哈希值與數組長度取模肯定該Key在數組中的索引。一樣爲了不不太好的Key的hashCode設計,它經過以下方法計算獲得Key的最終哈希值。不一樣的是,Java 8的ConcurrentHashMap做者認爲引入紅黑樹後,即便哈希衝突比較嚴重,尋址效率也足夠高,因此做者並未在哈希值的計算上作過多設計,只是將Key的hashCode值與其高16位做異或並保證最高位爲0(從而保證最終結果爲正整數)。

static final int spread(int h) {
  return (h ^ (h >>> 16)) & HASH_BITS;
}

同步方式

對於put操做,若是Key對應的數組元素爲null,則經過CAS操做將其設置爲當前值。若是Key對應的數組元素(也即鏈表表頭或者樹的根元素)不爲null,則對該元素使用synchronized關鍵字申請鎖,而後進行操做。若是該put操做使得當前鏈表長度超過必定閾值,則將該鏈表轉換爲樹,從而提升尋址效率。

對於讀操做,因爲數組被volatile關鍵字修飾,所以不用擔憂數組的可見性問題。同時每一個元素是一個Node實例(Java 7中每一個元素是一個HashEntry),它的Key值和hash值都由final修飾,不可變動,無須關心它們被修改後的可見性問題。而其Value及對下一個元素的引用由volatile修飾,可見性也有保障。

static class Node<K,V> implements Map.Entry<K,V> {
  final int hash;
  final K key;
  volatile V val;
  volatile Node<K,V> next;
}

對於Key對應的數組元素的可見性,由Unsafe的getObjectVolatile方法保證。

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
  return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

size操做

put方法和remove方法都會經過addCount方法維護Map的size。size方法經過sumCount獲取由addCount方法維護的Map的size。

Java進階系列

相關文章
相關標籤/搜索