ConcurrentHashMap源碼閱讀

1. 前言

HashMap是非線程安全的,在多線程訪問時沒有同步機制,併發場景下put操做可能致使同一數組下的鏈表造成閉環,get時候出現死循環,致使CPU利用率接近100%。html

HashTable是線程安全的,使用synchronized鎖住整個table的方式來保證併發訪問下的線程安全,但效率卻比較低下。由於線程1調用HashTable的put同步方法時,線程2的put或get等方法則進入阻塞狀態,因此競爭越激烈,效率越低。node

ConcurrentHashMap是支持高併發、高吞吐量的線程安全的Map實現。下面會經過閱讀 ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的源碼來了解它的演變過程。算法

2. ConcurrentHashMap在JDK1.7中的設計

2.1. 數據結構和鎖分段

HashTable在競爭激烈的併發環境中效率低下的緣由是:訪問HashTable的線程都競爭同一把鎖。數組

ConcurrentHashMap卻容許多個修改操做併發的進行,其關鍵是運用了鎖分段技術:將容器內的數據分爲一段段(Segment)的來存儲,每段能夠當作一個小的HashTable,每段數據都分配一把鎖。當一個線程佔用鎖訪問這一段數據時,其餘線程能夠訪問其餘段的數據。那麼當多線程併發訪問容器內不一樣鎖鎖住的數據時,線程間就不存在鎖競爭,從而有效的提高效率。安全

ConcurrentHashMap的數據結構以下:數據結構

ConcurrentHashMap 由 Segment 數組和 HashEntry 數組組成。一個 ConcurrentHashMap 裏包含一個 Segment 數組,Segment 的結構和 HashMap 相似,是一種數組和鏈表結構。一個 Segment 裏包含一個 HashEntry 數組,每一個 HashEntry 用於存儲 key-value 鍵值對數據,同時指向下一個 HashEntry 節點。當定位 key 在 ConcurrentHashMap 中的位置時,須要先通過一次 hash 定位到 Segment 的位置,而後在 hash 定位到指定的HashEntry。多線程

Segment 是一種可重入鎖 ReentrantLock,在 ConcurrentHashMap 裏扮演鎖的角色,每一個 Segment 守護着一個HashEntry 數組裏的元素,當對 HashEntry 數組的數據進行修改時,必須首先得到它對應的Segment鎖。而大多讀操做是不加鎖的,可是 size()、containsValue() 遇到併發修改競爭時須要全表加鎖。併發

2.2. 結構

ConcurrentHashMap:下面是ConcurrentHashMap中的數據成員以及構造函數源碼:app

構造函數主要作了兩件事:1)參數的校驗;2)table初始化長度ssh

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> 
        implements ConcurrentMap<K, V>, Serializable { 
    /**
     * Mask value for indexing into segments. The upper bits of a
     * key's hash code are used to choose the segment.
     */
    final int segmentMask;
 
    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;
 
    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;
 
    // 構造函數
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        // 找到第一個 >= concurrencyLevel 的 2次方數,做爲後續 Segment數組大小
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 上文中經過比較concurrencyLevel而計算出的ssize做爲數組大小
        // 使用的是StoreStore內存屏障,而不是較慢的StoreLoad內存屏障(使用在volatile寫操做上)。實現寫入不會被JIT從新排序指令,性能雖然提高,但寫結果不會被當即看到。
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] 
        this.segments = ss;
    }
}

全部成員都是final修飾的,保證線程的安全發佈。segmentMask 和 segmentShift 主要是爲了定位段。

concurrencyLevel 參數表示指望併發的修改 ConcurrentHashMap 的線程數量,用於決定 Segment 的數量,具體方式是經過找到第一個 >= concurrentcyLevel 的 2的次方數做爲 Segment 數組的大小。默認值爲16,Segment數組大小也爲16,若是設置 concurrentcyLevel = 100,那麼 Segment 數組大小則爲128。

Segment:每一個Segment至關於ConcurrentHashMap的一個子 hash表,Segment繼承了ReetrantLock,爲了方便使用加鎖的功能,如lock,tryLock等。數據成員以下:
static final class Segment<K,V> extends ReentrantLock implements Serializable {   
        transient volatile HashEntry<K,V>[] table;
        transient int count;
        transient int modCount;
        transient int threshold;
        final float loadFactor;
 }

table:鏈表數組,每一個數組元素是一個hash鏈表的頭部。table是volatile的,使得每次都能讀取到最新的table值而不須要同步。

count:Segment中元素的數量。每次修改操做作告終構上的改變,如添加/刪除節點(更新節點的value不算結構上的改變),都要更新count值。

modCount:對table的結構進行修改的次數。

threshold:若Segment裏的元素數量超過這個值,則就會對Segment進行擴容。

loadFactor:負載因子,threshold = capacity * threshold。

HashEntry:Segment中的類,表明 hash 鏈表中的一個節點,源碼以下:

static final class HashEntry<K,V> { 
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
}

2.3. put

下面是CouncurrentHashMap的put操做的源碼:

public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)
        throw new NullPointerException();
    int hash = hash(key);      
    int j = (hash >>> segmentShift) & segmentMask; // 對key求hash,並取模,而後找到對應Segment數組下標位置
    if ((s = (Segment<K,V>)UNSAFE.getObject         
         (segments, (j << SSHIFT) + SBASE)) == null)
        s = ensureSegment(j);                       // 當 Segment 爲空時,會建立
    return s.put(key, hash, value, false);          // put操做會委託給Segment的put
}

下面是ensureSegment的代碼:

private Segment<K,V> ensureSegment(int k) {
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {  // UNSAFE進行volatile讀Segment數組對應位置的值
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype   若是爲空,則從Segment[0]複製Entry數組長度capacity,loadFactor
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))== null) { // recheck 再次volatile讀Segment數組,若是爲空,繼續新建Segment對象
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                // 使用while循環和UNSAFE的CAS原子性替換Segment數組對應下標的元素。使用樂觀鎖的方式
                // 當線程t1和t2都讀取Segment[u]==null時,只有一個線程能經過CAS替換成功。假設t1替換成功了,下一次while循環t2能volatile讀取到替換的值
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))     
                    break;
            }
        }
    }
    return seg;
}

put最終交給 Segment 的 put 方法,每一個Segment至關於一個HashMap,put操做就是要在HashMap中尋找對應的key是否存在,若是存在則更新value,如不存在則新建一個HashEntry。put須要加鎖,使用了ReetrantLock的tryLock的非阻塞加鎖方法。源碼以下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // tryLock嘗試加鎖,若是加鎖成功,node=null。不然自旋等待並尋找對應key的節點是否存在
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);   // 獲取鎖後,對HashEntry的table數組取模,獲取數組下標index的第一個節點
        for (HashEntry<K,V> e = first;;) {            // first節點後面的鏈表,向後遍歷,尋找與key相同的節點
            if (e != null) {
                K k;
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;            // 找到與key相同的節點,按onlyIfAbsent判斷是否替換舊的value值
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            }
            else { 
                // e==null,表明沒有找到。新建HashEntry節點或使用scanAndLockForPut中建立的節點做爲新的鏈表頭節點
                if (node != null)
                    node.setNext(first);
                else
                    node = new HashEntry<K,V>(hash, key, value, first);  
                int c = count + 1;
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)   // 若是當前總節點個數 > threashold,則rehash擴容
                // Segment rehash是獲取鎖以後進行的,是將數組長度擴大一倍,將舊的數組元素複製到新數組中(先後有獲取鎖和釋放鎖的語義,不須要考慮多線程問題)
                    rehash(node);
                else
                    setEntryAt(tab, index, node);   // 設置新HashEntry節點到table對應的位置中
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        unlock();       // 最後解鎖
    }
    return oldValue;
}

有一個點:HashEntry<K,V>[] tab = table。一開始將table賦值給tab局部變量,能夠減小直接引用table時帶來的性能損失,由於table是一個volatile變量,不能進行優化,而賦值給tab普通變量後,能夠實現編譯、運行時的優化。

2.4. get

ConcurrentHashMap的get操做,源碼以下:

public V get(Object key) {
    Segment<K,V> s; // manually integrate access methods to reduce overhead
    HashEntry<K,V>[] tab;
    int h = hash(key);  // 對key求hash
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;   // key對應的Segment的數組下標
    if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&  // 先找對應的Segment。volatile讀語義保證內存的可見性
        (tab = s.table) != null) {
        for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);       // 再在Segment中找對應的HashEntry
             e != null; e = e.next) {
            K k;
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    return null;
}

get操做不須要加鎖,經過volatile保證可見性,若是同時有put併發操做增長HashEntry,因爲是在鏈表頭部添加(頭插法),不會對get形成影響。

2.5. size

計算ConcurrentHashMap的 size 是一個有趣的問題,由於在計算的時候,還會在併發的插入數據,可能致使計算出的size和實際的size有出入。

JDK1.7中前後採起了兩個方案:

第一種方案:先使用不加鎖的模式先嚐試遍歷兩次ConcurrentHashMap計算size,若是兩次遍歷過程當中全部segment中的modCount的和是一致的,則能夠認爲整個計算過程當中的Map沒有發生變化(添加或刪除HashEntry節點),返回size。

第二種方案:若是第一種方案不符合(Map發生告終構變化),就給每一個Segment加鎖,而後計算ConcurrentHashMap的size,解鎖,最後返回。

public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;   
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // 若是for循環達到RETRIES_BEFORE_LOCK,則表示前民幾回累計的modCount都不相等,其餘線程併發修改ConcurrentHashMap致使數據結構一直在改變。
            // 降級爲依次對Segment進行加鎖,此時其餘線程改變數據結構就會阻塞等待
            if (retries++ == RETRIES_BEFORE_LOCK) {    
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {      // 循環每一個Segment,累加Segment中的count和modCount
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)    // 若是當前sum和以前計算的sum相等,即各Segment累加的modCount相等。就能夠認爲兩次for循環間沒有其餘線程修改內部數據結構。直接返回size
                break;
            last = sum;     // last等於最近一次計算的sum值
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) { // 若是加鎖了,最終解鎖
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

3. ConcurrentHashMap在JDK1.8中的改進

3.1. 改進

改進1:取消segment分段鎖,直接使用 transient volatile Node<K,V>[] table; 來保存數組。採用table數組元素做爲鎖,從而實現對每一行數據進行加鎖,進一步減小了衝突的機率。

改進2:將原先table數組 + 鏈表的數據結構,變動爲table數組 + 鏈表 / 紅黑樹 的結構,同HashMap在JDK1.8的數據結構的改進。優化爲紅黑樹的好處是:當一個鏈表長度過長時,查詢某個節點的時間複雜度爲O(N),而當鏈表長度超過8時,將鏈表轉化爲紅黑樹,查詢節點的時間複雜度能夠下降爲O(logN),從而提高了性能。

改進3:併發控制使用synchronized和CAS,使用synchronized替換ReetrantLock。在ConcurrentHashMap中能夠看到不少的 U.compareAndSwapXXX,經過CAS算法實現無鎖化修改值的操做,下降了鎖的性能消耗。CAS的思想是不斷的比較當前內存中的變量值和所指定的變量值是否相等,若是相等,則接受指定的修改值;不然,拒絕操做,相似與樂觀鎖。

數據結構以下:

 

3.2. 結構:Node和TreeBin

Node是ConcurrentHashMap存儲結構的基本單元,用於存儲key-value鍵值對,是一個鏈表,但只容許查找數據,不容許修改數據。源碼以下:

/**
 * Key-value entry.  This class is never exported out as a
 * user-mutable Map.Entry (i.e., one supporting setValue; see
 * MapEntry below), but can be used for read-only traversals used
 * in bulk tasks.  Subclasses of Node with a negative hash field
 * are special, and contain null keys and values (but are never
 * exported).  Otherwise, keys and vals are never null.
 */
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    // value和next使用volatile來保證可見性和禁止重排序
    volatile V val;
    volatile Node<K,V> next;  // 指向下一個Node節點
 
    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }
 
    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    // 不容許更新value
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }
 
    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    // 用於map.get()方法,子類重寫
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

TreeBin是封裝TreeNode的容器,提供了轉化紅黑樹的一些條件和鎖的控制,部分源碼以下:

/**
 * TreeNodes used at the heads of bins. TreeBins do not hold user
 * keys or values, but instead point to list of TreeNodes and
 * their root. They also maintain a parasitic read-write lock
 * forcing writers (who hold bin lock) to wait for readers (who do
 * not) to complete before tree restructuring operations.
 */
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;       // 指向TreeNode的根節點
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
    ...
}

3.3. put

put源碼以下:

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 求key的hash值,兩次hash,使hash值能均勻分佈
    int hash = spread(key.hashCode()); 
    int binCount = 0;
    // 迭代Node[] table數組
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 1) 若是table爲空,則初始化,ConcurrentHashMap構造方法未初始化Node數組,而是在put中實現,屬於懶漢式初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 2) 若是table不爲空,根據hash值計算獲得key在table中的索引i,若是table[i]爲null,使用CAS方式新建Node節點(table[i]爲鏈表或紅黑樹的首節點)
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin,不進行加鎖
        }
        // 3) 若是table[i]不爲空,而且hash值爲MOVED(-1),代表該鏈表正在進行transfer擴容操做,幫助擴容完成
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 4) 以上條件都不知足,也就是存在hash衝突。對鏈表或紅黑樹的頭節點進行加鎖操做(再也不是segment,進一步減小了線程衝突)
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // hash值>0,表示該節點是鏈表結構。只需向後遍歷便可
                    if (fh >= 0) {      
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 若是在鏈表中找到值爲key的節點,按onlyIfAbsent判斷是否替換value
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            // 沒有找到值爲key的節點,直接新建Node並加入鏈表尾部
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }  
                    // 若是首節點爲紅黑樹結構,putTreeValue存放key-value對
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 若是鏈表節點數>8,則將鏈表結構轉換爲紅黑樹結構
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // size加1,會檢查當前size是否超過sizeCtl,若是是則觸發transfer擴容操做
    addCount(1L, binCount);
    return null;
}

大體流程爲:

  • 若是table爲空,沒有初始化,則先經過initTable()初始化
  • 若是table[i]爲null,沒有hash衝突,則CAS新建節點到table[i]
  • 若是table在擴容中,則等待擴容結束再操做
  • 若是存在hash衝突,則使用synchronized對Node首節點加鎖來保證線程安全,此時有2種方式:
    • 1)若是是鏈表形式,則根據key是否存在來判斷,若是key存在,則覆蓋value;key不存在,則新建Node插入到鏈表尾端
    • 2)若是是紅黑樹形式,就按照紅黑樹的結構插入
  • 最後判斷鏈表長度是否 >8?若是大於,則將鏈表轉換爲紅黑樹結構
  • put成功,則經過addCount方法統計size,並檢查是否須要擴容

下面看initTable的源碼:

sizeCtl是控制ConcurrentHashMap的控制標示符,用來控制初始化和擴容操做的,其不一樣的含義以下:

  • 負數:表明正在進行初始化和擴容操做
  • -1:正在初始化
  • -N:有 N - 1 個線程正在進行擴容操做
  • 正數或0:hash表尚未被初始化,這個數值表示初始化或下一次擴容的大小
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl < 0表示其餘線程正在初始化,當前線程掛起
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 將 sizeCtl置爲 -1,表示正在初始化
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];   // 初始化
                    table = tab = nt;
                    sc = n - (n >>> 2);    // 下次擴容的閥值,等於 0.75*n
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

3.4. get

get的源碼以下:

/**
 * Returns the value to which the specified key is mapped,
 * or {@code null} if this map contains no mapping for the key.
 *
 * <p>More formally, if this map contains a mapping from a key
 * {@code k} to a value {@code v} such that {@code key.equals(k)},
 * then this method returns {@code v}; otherwise it returns
 * {@code null}.  (There can be at most one such mapping.)
 *
 * @throws NullPointerException if the specified key is null
 */
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 計算兩次hash
    int h = spread(key.hashCode());    
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {        // 讀取首節點Node元素
        // 若是首節點Node.key相等,返回value
        if ((eh = e.hash) == h) {  
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 樹的find()來查找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 在鏈表中遍歷查找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
} 

3.5. size

在JDK1.8版本中,對於size的計算,在擴容和addCount()時已經在處理了。JDK1.7是在調用時纔去計算。

爲了幫助統計size,ConcurrentHashMap提供了baseCount和counterCells兩個輔助變量和CounterCell輔助類:

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}
 
//ConcurrentHashMap中元素個數,但返回的不必定是當前Map的真實元素個數。基於CAS無鎖更新
private transient volatile long baseCount;
 
private transient volatile CounterCell[] counterCells;  // 部分元素變化的個數保存在此數組中

經過累計baseCount和 counterCells數組中的數量,便可獲得元素的總個數。size源碼以下:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
  
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        // 遍歷,累加全部counterCells.value
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

4. 參考

http://www.cnblogs.com/study-everyday/p/6430462.html

http://blog.csdn.net/u010723709/article/details/48007881

http://blog.csdn.net/jianghuxiaojin/article/details/52006118

相關文章
相關標籤/搜索