1.7 的concurrentHashMap內部結構

concurrentHashMap 是用的最多的一個concurrent包數據結構,瞭解內部設計對高併發有幫助。node

ConcurrentHashMap:非阻塞Map

要點

  • 1.7採用分段鎖的機制
  • 1.8取消分段鎖機制,減小了鎖競爭
  • 效率:1.8>1.7
1.7 的源碼:
  • Segment繼承ReentrantLock用來充當鎖的角色,每一個 Segment 對象守護每一個散列映射表的若干個桶 。數組

  • HashEntry 用來封裝映射表的鍵 / 值對數據結構

  • 每一個桶是由若干個 HashEntry 對象連接起來的鏈表併發

  • ConcurrentHashMap定位一個元素的過程須要進行兩次Hash操做,第一次Hash定位到Segment,第二次Hash定位到元素所在的鏈表的頭部。這一種結構的帶來的反作用是Hash的過程要比普通的HashMap要長,可是帶來的好處是寫操做的時候能夠只對元素所在的Segment進行加鎖便可,不會影響到其餘的Segmentssh

  • 擴容:因爲HashEntry.next都是/final/函數

    • 刪除:將待刪除元素前面的元素所有複製一遍,再將後面的一個一個從新接到鏈表上去
    • 新增:直接放在頭部,將以前的頭掛在next上
    • 擴容
      • 若是table槽中只有一個元素,直接根據新的index。遷移到新table中的槽中
      • 若是table槽中右多個元素:
        • 從鏈表中找到/結點p和後繼結點e/,這兩個結點的新table槽同樣。
        • 相同index的結點到結束結點,直接放在新的槽中
        • 將其餘不一樣的結點,從璉表頭放在新的槽中

輸入圖片說明

  • 基本屬性
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {

    '用於分段'

    //容許的最大段數;用於綁定構造函數參數。必須是小於1小於24的冪。
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
    
    //根據這個數來計算segment的個數,segment的個數是僅小於這個數且是2的幾回方的一個數(ssize)
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    //每一個分段表的最小容量。必定要有2個,至少2個,以免在懶惰的建築後使用下一個使用。
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;    

    '用於hashEntry'
    
    //默認的用於計算Segment數組中的每個segment的HashEntry[]的容量,可是並非每個segment的HashEntry[]的容量
    static final int DEFAULT_INITIAL_CAPACITY = 16;
    
    //這個表的默認加載因子,在構造函數中沒有指定的時候使用
    static final float DEFAULT_LOAD_FACTOR = 0.75f;
    
    // 用於計算Segment數組中的每個segment的HashEntry[]的最大容量(2的30次方)
    static final int MAXIMUM_CAPACITY = 1 << 30;


    
    //在使用鎖以前,在大小和容器值方法上進行不一樣步的重試。若是表進行連續修改,就會避免無界重試,這樣就不可能得到準確的結果。
    static final int RETRIES_BEFORE_LOCK = 2;

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

    }        
    
    
    static final class Segment<K,V> extends ReentrantLock implements Serializable {


        private static final long serialVersionUID = 2249069246763182397L;
        
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        //存儲表
        transient volatile HashEntry<K,V>[] table;
        //元素的數量。
        transient int count;
        //併發標記
        transient int modCount;
        
        //容量閾值,用於擴容
        transient int threshold;
        //負載因子,用於肯定threshold
        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
        
        '真實的插入'
        final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :    '加鎖成功在插入'
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) / key ||
                            (e.hash / hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();   '釋放鎖'
            }
            return oldValue;
        }        

    }    
    
    '插入'
    public V put(K key, V value) {
        Segment<K,V> s;
        if (value / null)
            throw new NullPointerException();
        int hash = hash(key);   'key的hash值'
        'segmentShift:28'
        'segmentMask:1111'
        int j = (hash >>> segmentShift) & segmentMask;  '無符號右位移28位,模運算'
        if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) / null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);  '調用segment的put   '
    }
    
    '獲取'
    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) / key || (e.hash / h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
}
  • 建立
    • initialCapacity=16,(加載因子):loadFactor=0.75,(併發級別,):concurrencyLevel=16
      • initialCapacity:段中HashEntry的大小
      • loadFactor:加載因子,當段中的數據超過initialCapacity
      • concurrencyLevel:併發個數(線程數)。由於每個段都是一把鎖,這也是Segment數組大小
    • ssize:段數組的大小
      • 位運算ssize <<= 1:ssize=ssize<<1(左移覺得,增大ssize*2)
    • segmentShift(段segment右偏移):分段內索引的移位值。32 - sshift
    • segmentMask(段segment掩碼):將索引值用於索引段。鍵的哈希碼的上位被用來選擇這個段。 段segment.ssize - 1
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    '設置最大併發級別:65535'
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    
    int sshift = 0; ''
    int ssize = 1;  '段數組大小'
    while (ssize < concurrencyLevel) {
        ++sshift;       'sshift=4'
        ssize <<= 1;    '左移1位,,ssize *=2 /16'
    }
    this.segmentShift = 32 - sshift;    '偏移量:28'
    this.segmentMask = ssize - 1;       '掩碼:15:1111'
    
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY; '最大爲:1073741824。默'
        
    int c = initialCapacity / ssize;    'C=1'
    if (c * ssize < initialCapacity)    '判斷是有沒除盡,保證:c * ssize >initialCapacity'
        ++c;
        
    int cap = MIN_SEGMENT_TABLE_CAPACITY;   '最小HashEntry大小:'
    while (cap < c)
        cap <<= 1;
        
    // create segments and segments[0]
    // 建立段S0
        'loadFactor=0.75'
        'threshold=(int)0.75*2=1'
        'HashEntry[2]'
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    //建立 segment 數組
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}
  • PUT(K,V)
    • segmentShift=28,segmentMask=15
    • >>>:無符號右移,
'調用'
    public V put(K key, V value) {
        '判斷值'
        Segment<K,V> s;
        if (value / null)
            throw new NullPointerException();
        int hash = hash(key);   //計算hash值
        int j = (hash >>> segmentShift) & segmentMask;
        
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) / null) //  in ensureSegment
            s = ensureSegment(j);   ''
        return s.put(key, hash, value, false);
    }
    
'新建segment'
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) / null) {
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;   'cap =2'
            float lf = proto.loadFactor;    ' 0.75'    
            int threshold = (int)(cap * lf); 'threshold = 1'    
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                / null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       / null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
    '添加'
    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        HashEntry<K,V> node = tryLock() ? null : '加鎖'
            scanAndLockForPut(key, hash, value);
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;   '獲取table'
            int index = (tab.length - 1) & hash;    '獲取table中的偏移量'
            HashEntry<K,V> first = entryAt(tab, index); '從table中,找出鏈表頭'
            '無限循環添加進去'
            for (HashEntry<K,V> e = first;;) {
                if (e != null) {    '頭部位null'
                
                    K k;
                    "若是key相同,則直接替換"
                    if ((k = e.key) / key ||(e.hash / hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next; "若是key不一樣,則操做e後面的元素"
                    
                }else { "頭爲null"
                
                    if (node != null)
                        node.setNext(first);
                    else
                        node = new HashEntry<K,V>(hash, key, value, first); '新建,並將頭放在新HashEntry的後面'
                    int c = count + 1;
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);   '擴容'
                    else
                        setEntryAt(tab, index, node);   '將新HashEntry直接添加在頭上面'
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                    
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }
  • 擴容:
    • 擴容:默認segement.threshold=1,直接會擴容一次
    • 擴容並非table滿了,才擴容。用的是擴容個數:table.lengthloadFactor (table個數擴容因子)
    • 若是table槽中只有一個元素,直接根據新的index。遷移到新table中的槽中
    • 若是table槽中右多個元素:
      • 從鏈表中找到/結點p和後繼結點e/,這兩個結點的新table槽同樣。
      • 而後將相同的結點到結束,直接放在新的槽中
      • 將其餘不一樣的結點,從璉表頭放在新的槽中
private void rehash(HashEntry<K,V> node) {
    
        HashEntry<K,V>[] oldTable = table;  "獲取舊的段table"
        int oldCapacity = oldTable.length;  '舊的HashEntry數量'
        int newCapacity = oldCapacity << 1; '擴大一倍'
        threshold = (int)(newCapacity * loadFactor);    '剩擴展因子:0.75'
        HashEntry<K,V>[] newTable =(HashEntry<K,V>[]) new HashEntry[newCapacity];
        
        int sizeMask = newCapacity - 1; "位模運算:1111或者11111。每次擴大2被"
        
        for (int i = 0; i < oldCapacity ; i++) {
            HashEntry<K,V> e = oldTable[i]; "舊數組的璉表頭"
            if (e != null) {
                HashEntry<K,V> next = e.next;
                int idx = e.hash & sizeMask;    "新的index"
                
                if (next / null)   //  只有一個節點
                    newTable[idx] = e;  "若是隻有一個數據,直接放到新的index中"
                    
                else { "在相同的槽內重用連續序列"
                    HashEntry<K,V> lastRun = e; "新的起始點"
                    int lastIdx = idx; "新的index"
                    '找到e節點第一個索引值相同的HashEntry節點'
                    for (HashEntry<K,V> last = next;last != null;last = last.next) {
                        int k = last.hash & sizeMask;   "後繼的index"
                        if (k != lastIdx) { "若是後繼的新index和當前結點index相同的元素"
                            lastIdx = k;
                            lastRun = last;
                        }
                    }
                    newTable[lastIdx] = lastRun;    "而後將相同的結點到結束,直接放在新的槽中"
                    '複製e節點到第一個索引值相同(該結點和後繼結點index相同)節之間的HashEntry節點'
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }
相關文章
相關標籤/搜索