Java併發編程:併發容器之ConcurrentHashMap(轉載)

下面這部份內容轉載自:java

  http://www.haogongju.net/art/2350374node

  JDK5中添加了新的concurrent包,相對同步容器而言,併發容器經過一些機制改進了併發性能。由於同步容器將全部對容器狀態的訪問都c++

串行化了,這樣保證了線程的安全性,因此這種方法的代價就是嚴重下降了併發性,當多個線程競爭容器時,吞吐量嚴重下降。所以Java5.0開數組

始針對多線程併發訪問設計,提供了併發性能較好的併發容器,引入了java.util.concurrent包。與Vector和Hashtable、安全

Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的併發容器主要解決了兩個問題: 
1)根據具體場景進行設計,儘可能避免synchronized,提供併發性。 
2)定義了一些併發安全的複合操做,而且保證併發環境下的迭代操做不會出錯。數據結構

  util.concurrent中容器在迭代時,能夠不封裝在synchronized中,能夠保證不拋異常,可是未必每次看到的都是"最新的、當前的"數據。多線程

  下面是對併發容器的簡單介紹:併發

  ConcurrentHashMap代替同步的Map(Collections.synchronized(new HashMap())),衆所周知,HashMap是根據散列值分段存儲的,同步Map在同步的時候鎖住了全部的段,而ConcurrentHashMap加鎖的時候根據散列值鎖住了散列值鎖對應的那段,所以提升了併發性能。ConcurrentHashMap也增長了對經常使用複合操做的支持,好比"若沒有則添加":putIfAbsent(),替換:replace()。這2個操做都是原子操做。app

  CopyOnWriteArrayList和CopyOnWriteArraySet分別代替List和Set,主要是在遍歷操做爲主的狀況下來代替同步的List和同步的Set,這也就是上面所述的思路:迭代過程要保證不出錯,除了加鎖,另一種方法就是"克隆"容器對象。ssh

  ConcurrentLinkedQuerue是一個先進先出的隊列。它是非阻塞隊列。

    ConcurrentSkipListMap能夠在高效併發中替代SoredMap(例如用Collections.synchronzedMap包裝的TreeMap)。

  ConcurrentSkipListSet能夠在高效併發中替代SoredSet(例如用Collections.synchronzedSet包裝的TreeMap)。

  

  本篇文章着重講解2個併發容器:ConcurrentHashMap和CopyOnWriteArrayList其中的ConcurrentHashMap,CopyOnWriteArrayList在下一篇文章中講述。

  原文連接:http://www.iteye.com/topic/1103980

  你們都知道HashMap是非線程安全的,Hashtable是線程安全的,可是因爲Hashtable是採用synchronized進行同步,至關於全部線程進行讀寫時都去競爭一把鎖,致使效率很是低下。

  ConcurrentHashMap能夠作到讀取數據不加鎖,而且其內部的結構可讓其在進行寫操做的時候可以將鎖的粒度保持地儘可能地小,不用對整個ConcurrentHashMap加鎖。

ConcurrentHashMap的內部結構

  ConcurrentHashMap爲了提升自己的併發能力,在內部採用了一個叫作Segment的結構,一個Segment其實就是一個類Hash Table的結構,Segment內部維護了一個鏈表數組,咱們用下面這一幅圖來看下ConcurrentHashMap的內部結構:
圖表1
  從上面的結構咱們能夠了解到,ConcurrentHashMap定位一個元素的過程須要進行兩次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部,所以,這一種結構的帶來的反作用是Hash的過程要比普通的HashMap要長,可是帶來的好處是寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segment,這樣,在最理想的狀況下,ConcurrentHashMap能夠最高同時支持Segment數量大小的寫操做(恰好這些寫操做都很是平均地分佈在全部的Segment上),因此,經過這一種結構,ConcurrentHashMap的併發能力能夠大大的提升。

Segment

  咱們再來具體瞭解一下Segment的數據結構:

static  final  class  Segment<K,V>  extends  ReentrantLock  implements  Serializable {
     transient  volatile  int  count;
     transient  int  modCount;
     transient  int  threshold;
     transient  volatile  HashEntry<K,V>[] table;
     final  float  loadFactor;
}

詳細解釋一下Segment裏面的成員變量的意義:

count:Segment中元素的數量

  • modCount:對table的大小形成影響的操做的數量(好比put或者remove操做)

  • threshold:閾值,Segment裏面元素的數量超過這個值依舊就會對Segment進行擴容

  • table:鏈表數組,數組中的每個元素表明了一個鏈表的頭部

  • loadFactor:負載因子,用於肯定threshold

HashEntry

  Segment中的元素是以HashEntry的形式存放在鏈表數組中的,看一下HashEntry的結構:

static  final  class  HashEntry<K,V> {
     final  K key;
     final  int  hash;
     volatile  V value;
     final  HashEntry<K,V> next;
}

能夠看到HashEntry的一個特色,除了value之外,其餘的幾個變量都是final的,這樣作是爲了防止鏈表結構被破壞,出現ConcurrentModification的狀況。

ConcurrentHashMap的初始化

  下面咱們來結合源代碼來具體分析一下ConcurrentHashMap的實現,先看下初始化方法:

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
  
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
  
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;
    this.segments = Segment.newArray(ssize);
  
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = 1;
    while (cap < c)
        cap <<= 1;
  
    for (int i = 0; i < this.segments.length; ++i)
        this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

CurrentHashMap的初始化一共有三個參數,一個initialCapacity,表示初始的容量,一個loadFactor,表示負載參數,最後一個是concurrentLevel,表明ConcurrentHashMap內部的Segment的數量,ConcurrentLevel一經指定,不可改變,後續若是ConcurrentHashMap的元素數量增長致使ConrruentHashMap須要擴容,ConcurrentHashMap不會增長Segment的數量,而只會增長Segment中鏈表數組的容量大小,這樣的好處是擴容過程不須要對整個ConcurrentHashMap作rehash,而只須要對Segment裏面的元素作一次rehash就能夠了。

  整個ConcurrentHashMap的初始化方法仍是很是簡單的,先是根據concurrentLevel來new出Segment,這裏Segment的數量是不大於concurrentLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便採用移位操做來進行hash,加快hash的過程。接下來就是根據intialCapacity肯定Segment的容量的大小,每個Segment的容量大小也是2的指數,一樣使爲了加快hash的過程。

  這邊須要特別注意一下兩個變量,分別是segmentShift和segmentMask,這兩個變量在後面將會起到很大的做用,假設構造函數肯定了Segment的數量是2的n次方,那麼segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。

ConcurrentHashMap的get操做

  前面提到過ConcurrentHashMap的get操做是不用加鎖的,咱們這裏看一下其實現:

public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

看第三行,segmentFor這個函數用於肯定操做應該在哪個segment中進行,幾乎對ConcurrentHashMap的全部操做都須要用到這個函數,咱們看下這個函數的實現:

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}

這個函數用了位操做來肯定Segment,根據傳入的hash值向右無符號右移segmentShift位,而後和segmentMask進行與操做,結合咱們以前說的segmentShift和segmentMask的值,就能夠得出如下結論:假設Segment的數量是2的n次方,根據元素的hash值的高n位就能夠肯定元素到底在哪個Segment中。

  在肯定了須要在哪個segment中進行操做之後,接下來的事情就是調用對應的Segment的get方法:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}

先看第二行代碼,這裏對count進行了一次判斷,其中count表示Segment中元素的數量,咱們能夠來看一下count的定義:

transient volatile int count;

能夠看到count是volatile的,實際上這裏裏面利用了volatile的語義:

  對volatile字段的寫入操做happens-before於每個後續的同一個字段的讀操做。

  由於實際上put、remove等操做也會更新count的值,因此當競爭發生的時候,volatile的語義能夠保證寫操做在讀操做以前,也就保證了寫操做對後續的讀操做都是可見的,這樣後面get的後續操做就能夠拿到完整的元素內容。而後,在第三行,調用了getFirst()來取得鏈表的頭部:

HashEntry<K,V> getFirst(int hash) {
    HashEntry<K,V>[] tab = table;
    return tab[hash & (tab.length - 1)];
}

一樣,這裏也是用位操做來肯定鏈表的頭部,hash值和HashTable的長度減一作與操做,最後的結果就是hash值的低n位,其中n是HashTable的長度以2爲底的結果。

  在肯定了鏈表的頭部之後,就能夠對整個鏈表進行遍歷,看第4行,取出key對應的value的值,若是拿出的value的值是null,則可能這個key,value對正在put的過程當中,若是出現這種狀況,那麼就加鎖來保證取出的value是完整的,若是不是null,則直接返回value。

ConcurrentHashMap的put操做

  看完了get操做,再看下put操做,put操做的前面也是肯定Segment的過程,這裏再也不贅述,直接看關鍵的segment的put方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}

首先對Segment的put操做是加鎖完成的,而後在第五行,若是Segment中元素的數量超過了閾值(由構造函數中的loadFactor算出)這須要進行對Segment擴容,而且要進行rehash,關於rehash的過程你們能夠本身去了解,這裏不詳細講了。

  第8和第9行的操做就是getFirst的過程,肯定鏈表頭部的位置。

  第11行這裏的這個while循環是在鏈表中尋找和要put的元素相同key的元素,若是找到,就直接更新更新key的value,若是沒有找到,則進入21行這裏,生成一個新的HashEntry而且把它加到整個Segment的頭部,而後再更新count的值。

ConcurrentHashMap的remove操做

  Remove操做的前面一部分和前面的get和put操做同樣,都是定位Segment的過程,而後再調用Segment的remove方法:

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;
  
        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}

首先remove操做也是肯定須要刪除的元素的位置,不過這裏刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向後面一個就完事了,咱們以前已經說過HashEntry中的next是final的,一經賦值之後就不可修改,在定位到待刪除元素的位置之後,程序就將待刪除元素前面的那一些元素所有複製一遍,而後再一個一個從新接到鏈表上去,看一下下面這一幅圖來了解這個過程:
1
  假設鏈表中原來的元素如上圖所示,如今要刪除元素3,那麼刪除元素3之後的鏈表就以下圖所示:
2

ConcurrentHashMap的size操做

  在前面的章節中,咱們涉及到的操做都是在單個Segment中進行的,可是ConcurrentHashMap有一些操做是在多個Segment中進行,好比size操做,ConcurrentHashMap的size操做也採用了一種比較巧的方式,來儘可能避免對全部的Segment都加鎖。

  前面咱們提到了一個Segment中的有一個modCount變量,表明的是對Segment中元素的數量形成影響的操做的次數,這個值只增不減,size操做就是遍歷了兩次Segment,每次記錄Segment的modCount值,而後將兩次的modCount進行比較,若是相同,則表示期間沒有發生過寫入操做,就將原先遍歷的結果返回,若是不相同,則把這個過程再重複作一次,若是再不相同,則就須要將全部的Segment都鎖住,而後一個一個遍歷了,具體的實現你們能夠看ConcurrentHashMap的源碼,這裏就不貼了。

  另外2篇講述關於ConcurrentHashMap原理的兩篇文章:

  《ConcurrentHashMap之實現細節》:http://www.iteye.com/topic/344876

  《聊聊併發(四)深刻分析ConcurrentHashMap》:http://ifeve.com/ConcurrentHashMap/

相關文章
相關標籤/搜索