ConcurrentHashMap源碼解析

1、ConcurrentHashMap源碼註解

我的站點:www.mycookies.cn​
/**
 * A hash table supporting full concurrency of retrievals and
 * adjustable expected concurrency for updates. This class obeys the
 * same functional specification as {@link java.util.Hashtable}, and
 * includes versions of methods corresponding to each method of
 * <tt>Hashtable</tt>. However, even though all operations are
 * thread-safe, retrieval operations do <em>not</em> entail locking,
 * and there is <em>not</em> any support for locking the entire table
 * in a way that prevents all access.  This class is fully
 * interoperable with <tt>Hashtable</tt> in programs that rely on its
 * thread safety but not on its synchronization details.
 *
 * <p> Retrieval operations (including <tt>get</tt>) generally do not
 * block, so may overlap with update operations (including
 * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results
 * of the most recently <em>completed</em> update operations holding
 * upon their onset.  For aggregate operations such as <tt>putAll</tt>
 * and <tt>clear</tt>, concurrent retrievals may reflect insertion or
 * removal of only some entries.  Similarly, Iterators and
 * Enumerations return elements reflecting the state of the hash table
 * at some point at or since the creation of the iterator/enumeration.
 * They do <em>not</em> throw {@link ConcurrentModificationException}.
 * However, iterators are designed to be used by only one thread at a time.
 *
 * <p> The allowed concurrency among update operations is guided by
 * the optional <tt>concurrencyLevel</tt> constructor argument
 * (default <tt>16</tt>), which is used as a hint for internal sizing.  The
 * table is internally partitioned to try to permit the indicated
 * number of concurrent updates without contention. Because placement
 * in hash tables is essentially random, the actual concurrency will
 * vary.  Ideally, you should choose a value to accommodate as many
 * threads as will ever concurrently modify the table. Using a
 * significantly higher value than you need can waste space and time,
 * and a significantly lower value can lead to thread contention. But
 * overestimates and underestimates within an order of magnitude do
 * not usually have much noticeable impact. A value of one is
 * appropriate when it is known that only one thread will modify and
 * all others will only read. Also, resizing this or any other kind of
 * hash table is a relatively slow operation, so, when possible, it is
 * a good idea to provide estimates of expected table sizes in
 * constructors.
 */

 

 

一個哈希表支持徹底併發的檢索和可更新的預期併發性。這個類服從與{@link java.util.Hashtable}相同的功能規範  包括對應於每種方法的版本  的HashTable的。可是,即便全部的操做都是 線程安全的檢索操做不須要加鎖,  而且沒有任何對鎖定整個表的支持, 阻止全部訪問的方式。這這個類在依賴線程安全性但不一樣步細節,在程序中徹底與Hashtable 互操做。java

  檢索操做(包括get )一般不會阻塞,所以可能會與更新操做併發  (添加 和刪除)。檢索反映結果  是最近完成更新操做持有在他們併發訪問時時。對於像<tt> putAll </ tt>這樣的集合操做  和<tt>清除</ tt>,併發檢索可能反映插入或  只刪除一些條目。一樣,迭代器和  枚舉返回反映散列表狀態的元素  在建立迭代器/枚舉時或以後的某個時間點。  它們不會<em>拋出ConcurrentModificationException。  可是,迭代器被設計爲一次只能由一個線程使用。node

更新操做中容許的併發性由指導 可選的concurrencyLevel構造函數參數(默認16 ),用做內部大小調整的提示。該  表內部分區以嘗試容許指示 沒有爭用的併發更新數量。由於安置 在散列表中基本上是隨機的,實際的併發會 變化。理想狀況下,您應該選擇一個值來容納儘量多的值線程將永遠同時修改表。用一個  明顯高於你須要的價值會浪費空間和時間  而顯着較低的值可能會致使線程爭用。但  在一個數量級內太高估計和低估  一般不會有太明顯的影響。值爲1  當知道只有一個線程會修改時適用  全部其餘人只會閱讀。此外,調整這個或任何其餘類型的  散列表是一個相對較慢的操做,因此,若是可能的話,在構造函數中提供預期表格大小的估計值的一個好主意。git

2、源碼剖析

重要的類

ConcurrentHashMap的內部類HashEntry github

 
//用來存儲鍵值對,與hashtable中不一樣的是 value設置爲volatile
static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;
​
    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }
​
    /**
     * Sets next field with volatile write semantics.  (See above
     * about use of putOrderedObject.)
     */
    final void setNext(HashEntry<K,V> n) {
        UNSAFE.putOrderedObject(this, nextOffset, n);
    }
​
    // Unsafe mechanics
    static final sun.misc.Unsafe UNSAFE;
    static final long nextOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = HashEntry.class;
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

 

ConcurrentHashMap重要的方法---put

 
public V put(K key, V value) {
    Segment<K,V> s;
    if (value == null)//value不能爲null
        throw new NullPointerException();
    int hash = hash(key);//第一次對key進行hash運算 
  int j = (hash >>> segmentShift) & segmentMask;//映射到hash表中的某個segment
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j); //返回給定索引的Segment,建立它並在Segment表中(經過CAS)記錄(若是尚不存在)。
    return s.put(key, hash, value, false);
}

 private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        //若是當前索引對應segment不存在
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
              //建立一個Segment
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value);//嘗試獲取鎖,當前線程獨家佔有,node賦值爲null,不然一直獲取鎖,直到獲取到鎖而後建立一個鍵值對並返回
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
            int index = (tab.length - 1) & hash;
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
                else {
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();//釋放鎖
        }
        return oldValue;
    }

 

若是當前線程是該鎖的持有者,則保持計數遞減。 若是保持計數如今爲零,則鎖定被釋放。 若是當前線程不是該鎖的持有者,則拋出{@link IllegalMonitorStateException}數組

 
/**
 * Attempts to release this lock.
 *
 * <p>If the current thread is the holder of this lock then the hold
 * count is decremented.  If the hold count is now zero then the lock
 * is released.  If the current thread is not the holder of this
 * lock then {@link IllegalMonitorStateException} is thrown.
 *
 * @throws IllegalMonitorStateException if the current thread does not
 *         hold this lock
 */
public void unlock() {
    sync.release(1);
}

 

掃描包含給定key的節點 ,同時嘗試獲取鎖,若是找不到則建立並返回一個。返回後,保證持有當前鎖。安全


/**
* Scans for a node containing given key while trying to
* acquire lock, creating and returning one if not found. Upon
* return, guarantees that lock is held. UNlike in most
* methods, calls to method equals are not screened: Since
* traversal speed doesn't matter, we might as well help warm
* up the associated code and accesses as well.
*
* @return a new node if key not found, else null
*/
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {      
       HashEntry<K,V> first = entryForHash(this, hash);
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // negative while locating node
        while (!tryLock()) {
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e == null) {
                    if (node == null) // speculatively create node
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
            else if (++retries > MAX_SCAN_RETRIES) {
                lock();
                break;
            }
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }

 

只有在當時沒有被另外一個線程佔用的狀況下才會獲取該鎖cookie

若是該鎖沒有被另外一個線程和另外一個線程佔用,則獲取該鎖   當即返回值爲true,將鎖定保持計數設置爲1。 即便此鎖已設置爲使用公平的順序策略,對 tryLock()調用將當即得到該鎖(若是該鎖可用),不管其餘線程當前是否正在等待鎖。 這種強制 行爲在某些狀況下是有用的,即便它違背了公平。 若是您想遵照此鎖的公平性設置,請使用 {@link #tryLock(long,TimeUnit)tryLock(0,TimeUnit.SECONDS)} 他們幾乎相同(它也檢測到中斷)。 若是當前線程已經擁有這個鎖,那麼保持計數增長1,方法返回{true}。 若是該鎖由另外一個線程保存,則此方法將當即以* {false}的值返回*。併發

 
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

    final boolean nonfairTryAcquire(int acquires) {
        //獲取當前線程
        final Thread current = Thread.currentThread();
        int c = getState();//返回statue (state是voltile修飾的)
        if (c == 0) {//若是state==0,即當前鎖空閒
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);//設置當前線程擁有鎖
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}

​
 protected final Thread getExclusiveOwnerThread() {
  return exclusiveOwnerThread;
}

 

Size方法
 
public int size() {
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts
    long last = 0L;   // previous sum
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    ensureSegment(j).lock(); // 獲取全部segment的鎖
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)//釋放全部segment的鎖
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

 

總結:ConcurrentHashMap是線程安全的哈希表,它是經過「分段」來實現的。ConcurrentHashMap中包括了「Segment(分段)數組」,每一個Segment就是一個哈希表,並且也是可重入的互斥鎖。第一,Segment是哈希表表如今,Segment包含了「HashEntry數組」,而「HashEntry數組」中的每個HashEntry元素是一個單向鏈表。即Segment是經過鏈式哈希表。第二,Segment是可重入的互斥鎖表如今,Segment繼承於ReentrantLock,而ReentrantLock就是可重入的互斥鎖。對於ConcurrentHashMap的添加,刪除操做,在操做開始前,線程都會獲取Segment的互斥鎖;操做完畢以後,纔會釋放。而對於讀取操做,它是經過volatile去實現的,HashEntry數組是volatile類型的,而volatile能保證「即對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入」,即咱們總能讀到其它線程寫入HashEntry以後的值。 以上這些方式,就是ConcurrentHashMap線程安全的實現原理。app

經過分段方式減少的鎖的粒度,若是整個map使用一個鎖,則就不能並行地操做鍵值對。而ConcurrentHashMap將HashMap分解成段,每一個段有一把鎖,鎖的粒度就少了。可是與此同時,鎖的數量增多了。當須要訪問ConcurrentHashMap的全局屬性時(好比ConcurrentHashMap的size()方法),須要 得到 全部的Segment的鎖。dom

 我的站點:www.mycookies.cn

github:https://github.com/liqianggh

相關文章
相關標籤/搜索