學習ConcurrentHashMap1.7分段鎖原理

1. 概述

接上一篇 學習 ConcurrentHashMap1.8 併發寫機制, 本文主要學習 Segment分段鎖 的實現原理。java

雖然 JDK1.7 在生產環境已逐漸被 JDK1.8 替代,然而一些好的思想仍是須要進行學習的。比方說位圖中尋找 bit 位的思路是否是和 ConcurrentHashMap1.7 有點類似?node

接下來,本文基於 OpenJDK7 來作源碼解析。算法

2. ConcurrentHashMap1.7 初認識

ConcurrentHashMap 中 put()是線程安全的。可是不少時候, 因爲業務需求, 須要先 get() 操做再 put() 操做,這 2 個操做沒法保證原子性,這樣就會產生線程安全問題了。你們在開發中必定要注意。數組

ConcurrentHashMap 的結構示意圖以下:安全

在進行數據的定位時,會首先找到 segment, 而後在 segment 中定位 bucket。若是多線程操做同一個 segment, 就會觸發 segment 的鎖 ReentrantLock, 這就是分段鎖的基本實現原理多線程

3. 源碼分析

3.1 HashEntry

HashEntryConcurrentHashMap 的基礎單元(節點),是實際數據的載體。併發

static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        /**
         * Sets next field with volatile write semantics.  (See above
         * about use of putOrderedObject.)
         */
        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        // Unsafe mechanics
        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

3.2 Segment

Segment 繼承 ReentrantLock 鎖,用於存放數組 HashEntry[]。在這裏能夠看出, 不管 1.7 仍是 1.8 版本, ConcurrentHashMap 底層並非對 HashMap 的擴展, 而是一樣從底層基於數組+鏈表進行功能實現。ssh

static final class Segment<K,V> extends ReentrantLock implements Serializable {

        private static final long serialVersionUID = 2249069246763182397L;

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        // 數據節點存儲在這裏(基礎單元是數組)
        transient volatile HashEntry<K,V>[] table;

        transient int count;

        transient int modCount;

        transient int threshold;

        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
        // 具體方法不在這裏討論...
    }

3.3 構造方法

public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        // 對於concurrencyLevel的理解, 能夠理解爲segments數組的長度,即理論上多線程併發數(分段鎖), 默認16
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        // 默認concurrencyLevel = 16, 因此ssize在默認狀況下也是16,此時 sshift = 4
        // ssize = 2^sshift 即 ssize = 1 << sshift
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        // 段偏移量,32是由於hash是int值,int值32位,默認值狀況下此時segmentShift = 28
        this.segmentShift = 32 - sshift;
        // 散列算法的掩碼,默認值狀況下segmentMask = 15, 定位segment的時候須要根據segment[]長度取模, 即hash(key)&(ssize - 1)
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        // 計算每一個segment中table的容量, 初始容量=16, 併發數=16, 則segment中的Entry[]長度爲1。
        int c = initialCapacity / ssize;
        // 處理沒法整除的狀況,取上限
        if (c * ssize < initialCapacity)
            ++c;
        // MIN_SEGMENT_TABLE_CAPACITY默認時2,cap是2的n次方
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        // create segments and segments[0]
        // 建立segments並初始化第一個segment數組,其他的segment延遲初始化
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        // 默認併發數=16
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

由圖和源碼可知,當用默認構造函數時,最大併發數是 16,即最大容許 16 個線程同步寫操做,且沒法擴展。因此若是咱們的場景數據量比較大時,應該設置合適的併發數,避免頻繁鎖衝突。函數

3.4 put()操做

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        // 根據key的hash再次進行hash運算
        int hash = hash(key.hashCode());
        // 基於hash定位segment數組的索引。
        // hash值是int值,32bits。segmentShift=28,無符號右移28位,剩下高4位,其他補0。
        // segmentMask=15,二進制低4位所有是1,因此j至關於hash右移後的低4位。
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        // 找到對應segment
            s = ensureSegment(j);
        // 將新節點插入segment中
        return s.put(key, hash, value, false);
    }

找出對應 segment,若是不存在就建立並初始化源碼分析

@SuppressWarnings("unchecked")
    private Segment<K,V> ensureSegment(int k) {
        // 當前的segments數組
        final Segment<K,V>[] ss = this.segments;
        // 計算原始偏移量,在segments數組的位置
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        // 判斷沒有被初始化
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            // 獲取第一個segment ss[0]做爲原型
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length; // 容量
            float lf = proto.loadFactor; // 負載因子
            int threshold = (int)(cap * lf); // 閾值
            // 初始化ss[k] 內部的tab數組 // recheck
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            // 再次檢查這個ss[k]  有沒有被初始化
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                // 自旋。getObjectVolatile 保證了讀的可見性,因此一旦有一個線程初始化了,那麼就結束自旋
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }

3.5 segment 插入節點

上一步找到 segment 位置後計算節點在 segment 中的位置。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // 是否獲取鎖,失敗自旋獲取鎖(直到成功)
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value); // 失敗了纔會scanAndLockForPut
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                // 獲取到bucket位置的第一個節點
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    // hash衝突
                    if (e != null) {
                        K k;
                        // key相等則覆蓋
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        // 不相等則遍歷鏈表
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            // 將新節點插入鏈表做爲表頭
                            node.setNext(first);
                        else
                            // 建立新節點並插入表頭
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                        // 判斷元素個數是否超過了閾值或者segment中數組的長度超過了MAXIMUM_CAPACITY,若是知足條件則rehash擴容!
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            // 擴容
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                // 解鎖
                unlock();
            }
            return oldValue;
        }

若是加鎖失敗則先走 scanAndLockForPut() 方法。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            // 根據hash獲取頭結點
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            int retries = -1; // negative while locating node
            // 嘗試獲取鎖,成功就返回,失敗就開始自旋
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                if (retries < 0) {
                    // 若是頭結點不存在
                    if (e == null) {
                        if (node == null) // speculatively create node
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    // 和頭結點key相等
                    else if (key.equals(e.key))
                        retries = 0;
                    else
                        // 下一個節點 直到爲null
                        e = e.next;
                }
                // 達到自旋的最大次數
                else if (++retries > MAX_SCAN_RETRIES) {
                    // lock()是阻塞方法。進入加鎖方法,失敗進入隊列,阻塞當前線程
                    lock();
                    break;
                }
                // TODO (retries & 1) == 0 沒理解
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    // 頭結點變化,須要從新遍歷,說明有新的節點加入或者移除
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }

(retries & 1) == 0 沒理解是在作什麼,有小夥伴看明白了請賜教。

最後

本文到此結束,主要是學習分段鎖是如何工做的。謝謝你們的觀看。

相關文章
相關標籤/搜索