圖解ConcurrentHashMap

概述

上篇文章介紹了 HashMap 在多線程併發狀況下是不安全的,多線程併發推薦使用 ConcurrentHashMap ,那麼 ConcurrentHashMap 是什麼?它的設計思想是什麼,源碼是怎麼實現的?node

ConcurrentHashMap是什麼

Concurrent翻譯過來是併發的意思,字面理解它的做用是處理併發狀況的 HashMap,在介紹它以前先回顧下以前的知識。經過前面兩篇學習,咱們知道多線程併發下 HashMap 是不安全的(如死循環),更廣泛的是多線程併發下,因爲堆內存對於各個線程是共享的,而 HashMap 的 put 方法不是原子操做,假設Thread1先 put 值,而後 sleep 2秒(也能夠是系統時間片切換失去執行權),在這2秒內值被Thread2改了,Thread1「醒來」再 get 的時候發現已經不是原來的值了,這就容易出問題。數組

那麼如何避免這種多線程「奧迪變奧拓」的狀況呢?常規思路就是給 HashMap 的 put 方法加鎖(synchronized),保證同一個時刻只容許一個線程擁有對 hashmap 有寫的操做權限便可。然而假如線程1中操做耗時,佔着茅坑半天不出來,其餘須要操做該 hashmap 的線程就須要在門口排隊半天,嚴重影響用戶體驗(HashTable 就是這麼幹的)。舉個生活中的例子,不少銀行除了存取錢,還支持存取貴重物品,貴重物品都放在保險箱裏,把 HashMap 和 HashTable 比做銀行,結構:安全

把線程比做人,對應的狀況以下:bash

  • HashMap牌銀行:咱們的服務宗旨是不用排隊,同一時間多人都有機會修改保險櫃裏的東西,你覺得你存的是美圓?取出來的實際上是日元,破產就在一瞬間,刺不刺激。
  • HashTable牌銀行:咱們的服務宗旨是要排隊,同一時間只有一我的有機會修改保險櫃裏的東西,其他的人只能看不能動手改,保你存的是美圓取得仍是美圓。什麼?你說若是那人在裏面睡着了不出來怎麼辦?不要着急,來,坐下來打會麻將等他出來。

多線程下用 HashMap 不肯定性過高,有破產的風險,不能選;用 HashTable 不會破產,可是用戶體驗不太好,那麼怎樣才能作到多人存取既不影響他人存值,又不用排隊呢?有人提議搞個「銀行者聯盟」,多開幾個像HashTable 這種「帶鎖」的銀行就行了,有多少人辦理業務,就開多少個銀行,一對一服務,這個區都是大老闆,開銀行的成本都是小錢,因而「銀行者聯盟」成立了。多線程

接下來的狀況是這樣的:好比蓋倫和亞索一塊兒去銀行存他們的大寶劍,這個「銀行者聯盟」一頓操做,而後對蓋倫說,1號銀行如今沒人,你能夠去那存,不用排隊,而後蓋倫就去1號銀行存他的大寶劍,1號銀行把蓋倫接進門,立刻拉閘,一頓操做,而後把蓋倫的大寶劍放在第x行第x個保險箱,等蓋倫辦妥離開後,再開閘;一樣「銀行者聯盟」對亞索說,2號銀行如今沒人,你能夠去那存,不用排隊,而後亞索去2號銀行存他的大寶劍,2號銀行把亞索接進門,立刻拉閘,一頓操做把亞索的大寶劍放在第x行第x號保險箱,等亞索離開後再開閘,此時無論蓋倫和亞索在各自銀行裏面待多久都不會影響到彼此,不用擔憂本身的大寶劍被人偷換了。這就是ConcurrentHashMap的設計思路,用一個圖來理解併發

從上圖能夠看出,此時鎖的是對應的單個銀行,而不是整個「銀行者聯盟」。分析下這種設計的特色:ssh

  • 多個銀行組成的「銀行者聯盟」
  • 當有人來辦理業務時,「銀行者聯盟」須要肯定這我的去哪一個銀行
  • 當此人去到指定銀行辦理業務後,該銀行上鎖,其餘人不能同時執行修改操做,直到此人離開後解鎖

由這幾點基本思想能夠引起一些思考,好比:函數

1.成立「銀行者聯盟」時初識銀行數是多少?怎麼設計合理?源碼分析

上面這張圖沒有給出是否須要排隊的結論,這是由於須要結合實際狀況分析,好比初識化有16個銀行,只有兩我的來辦理業務,那天然不須要排隊;若是如今16個銀行都有人在辦理業務,這時候來了第17我的,那麼他仍是須要排隊的。因爲「銀行者聯盟」事先沒法得知會有多少人來辦理業務,因此在它創立的時候須要制定一個「標準」,即初始銀行數量,人多的狀況「銀行者聯盟」應該多開幾家銀行,避免別人排隊;人少的狀況應該少開,避免浪費錢(什麼,你說不差錢?那也不行)性能

2.當有人來辦理業務的時候,「銀行者聯盟」怎麼肯定此人去哪一個銀行?

正常狀況下,若是全部銀行都是未上鎖狀態,那麼有人來辦理業務去哪都不用排隊,當其中有些銀行已經上鎖,那麼後續「銀行者聯盟」給人推薦的時候就不能把客戶往上鎖的銀行引了,不然分分鐘給人錘成麻瓜。所以「銀行者聯盟」須要時刻保持清醒的頭腦,對本身的銀行空閒狀況瞭如指掌,每次給用戶推薦都應該是最好的選擇。

3.「銀行者聯盟」怎麼保證同一時間不會有兩我的在同一個銀行擁有存權限?

經過對指定銀行加鎖/解鎖的方式實現。

源碼分析

Java7 源碼分析

經過 Java7 的源碼分析下代碼實現,先看下一些重要的成員

//默認的數組大小16(HashMap裏的那個數組)
static final int DEFAULT_INITIAL_CAPACITY = 16;

//擴容因子0.75
static final float DEFAULT_LOAD_FACTOR = 0.75f;
 
//ConcurrentHashMap中的數組
final Segment<K,V>[] segments

//默認併發標準16
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//Segment是ReentrantLock子類,所以擁有鎖的操做
 static final class Segment<K,V> extends ReentrantLock implements Serializable {
  //HashMap的那一套,分別是數組、鍵值對數量、閾值、負載因子
  transient volatile HashEntry<K,V>[] table;
  transient int count;
  transient int threshold;
  final float loadFactor;

  Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
 }
 
 //換了馬甲仍是認識你!!!HashEntry對象,存key、value、hash值以及下一個節點
 static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
 }
//segment中HashEntry[]數組最小長度
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

//用於定位在segments數組中的位置,下面介紹
final int segmentMask;
final int segmentShift;
複製代碼

上面這些一下出來有點接受不了不要緊,下面都會介紹到。

接下來從最簡單的初識化開始分析

ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
複製代碼

默認構造函數會調用帶三個參數的構造函數

public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        //步驟① start
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        //步驟① end
        //步驟② start
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        //步驟② end
        // create segments and segments[0]
        //步驟③ start
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
        //步驟③ end
    }

複製代碼

上面定義了許多臨時變量,註釋寫的又少,第一次看名字根本不知道這鬼東西表明什麼意思,不過咱們能夠把已知的數據代進去,算出這些變量的值,再分析能不能找出一些貓膩。假設這是第一次默認建立:

  • 步驟① concurrencyLevel = 16 ,能夠計算出 sshift = 4,ssize = 16,segmentShift = 28,segmentMask = 15;
  • 步驟② c = 16/16 = 1,cap = 2;
  • 步驟③有句註釋,建立 Segment 數組 segments 並初始化 segments [0] ,因此 s0 初始化後數組長度爲2,負載因子0.75,閾值爲1;再看這裏的ss的初始化(重點,圈起來要考!!!), ssize 此時爲16,因此默認數組長度16,給人一種感受正好和咱們傳的 concurrencyLevel 同樣?看下下面的例子
例子1 例子2
ssize = 1,concurrencyLevel = 10 ssize = 1,concurrencyLevel = 8
ssize <<= 1 —> 2<10 知足 ssize <<= 1 —> 2<10 知足
ssize <<= 1 —> 4<10 知足 ssize <<= 1 —> 4<10 知足
ssize <<= 1 —> 8<10 知足 ssize <<= 1 —> 8<10 不知足 ssize = 8
ssize <<= 1 —> 16<10 不知足 ssize = 16

因此咱們傳 concurrencyLevel 不必定就是最後數組的長度,長度的計算公式:

長度 = 2的n次方(2的n次方 >= concurrencyLevel)

到這裏只是建立了一個長度爲16的Segment 數組,並初始化數組0號位置,segmentShift和segmentMask還沒派上用場,畫圖存檔:

接着看 put 方法

public V put(K key, V value) {
        Segment<K,V> s;
        //步驟①注意valus不能爲空!!!
        if (value == null)
            throw new NullPointerException();
        //根據key計算hash值,key也不能爲null,不然hash(key)報空指針
        int hash = hash(key);
        //步驟②派上用場了,根據hash值計算在segments數組中的位置
        int j = (hash >>> segmentShift) & segmentMask;
        //步驟③查看當前數組中指定位置Segment是否爲空
        //若爲空,先建立初始化Segment再put值,不爲空,直接put值。
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
複製代碼

步驟①能夠看到和 HashMap 的區別,這裏的 key/value 爲空會報空指針異常;步驟②先根據 key 值計算 hash 值,再和前面算出來的兩個變量計算出這個 key 應該放在哪一個Segment中(具體怎麼計算的有興趣能夠去研究下,先高位運算再取與),假設咱們算出來該鍵值對應該放在5號,步驟③判斷5號爲空,看下 ensureSegment() 方法

private Segment<K,V> ensureSegment(int k) {
        //獲取segments
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            //拷貝一份和segment 0同樣的segment
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            //大小和segment 0一致,爲2
            int cap = proto.table.length;
            //負載因子和segment 0一致,爲0.75
            float lf = proto.loadFactor;
            //閾值和segment 0一致,爲1
            int threshold = (int)(cap * lf);
            //根據大小建立HashEntry數組tab
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            //再次檢查
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                根據已有屬性建立指定位置的Segment
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
    }
複製代碼

該方法重點在於拷貝了segments[0],所以新建立的Segment與segment[0]的配置相同,因爲多個線程都會有可能執行該方法,所以這裏經過UNSAFE的一些原子性操做的方法作了屢次的檢查,到目前爲止畫圖存檔:

如今「舞臺」也有了,請開始你的表演,看下 Segment 的put方法

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            //步驟① start
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            //步驟① end
            V oldValue;
            try {
                //步驟② start
                //獲取Segment中的HashEntry[]
                HashEntry<K,V>[] tab = table;
                //算出在HashEntry[]中的位置
                int index = (tab.length - 1) & hash;
                //找到HashEntry[]中的指定位置的第一個節點
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    //若是不爲空,遍歷這條鏈
                    if (e != null) {
                        K k;
                        //狀況① 以前已存過,則替換原值
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        //狀況② 另外一個線程的準備工做
                        if (node != null)
                            //鏈表頭插入方式
                            node.setNext(first);
                        else //狀況③ 該位置爲空,則新建一個節點(注意這裏採用鏈表頭插入方式)
                            node = new HashEntry<K,V>(hash, key, value, first);
                        //鍵值對數量+1
                        int c = count + 1;
                        //若是鍵值對數量超過閾值
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            //擴容
                            rehash(node);
                        else //未超過閾值,直接放在指定位置
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        //插入成功返回null
                        oldValue = null;
                        break;
                    }
                }
            //步驟② end
            } finally {
                //步驟③
                //解鎖
                unlock();
            }
            //修改爲功,返回原值
            return oldValue;
        }
複製代碼

上面的 put 方法其實和 Java7 HashMap裏大體是同樣的,只是多了加鎖/解鎖兩步,也正由於這樣才保證了同一時刻只有一個線程擁有修改的權限。按步驟分析下上面的流程:

  • 步驟① 執行 tryLock 方法獲取鎖,拿到鎖返回null,沒拿到鎖執行 scanAndLockForPut 方法;
  • 步驟② 和 HashMap 裏的那一套思路是同樣的,不理解能夠看下以前的文章介紹(狀況②下面介紹);
  • 步驟③ 執行 unLock 方法解鎖

假設如今Thread1進來存值,前面沒人來過,它能夠成功拿到鎖,根據計算,得出它要存的鍵值對應該放在HashEntry[] 的0號位置,0號位置爲空,因而新建一個 HashEntry,並經過 setEntryAt() 方法,放在0號位置,然而還沒等 Thread1 釋放鎖,系統的時間片切到了 Thread2 ,先畫圖存檔

Thread2 也來存值,經過前面的計算,剛好 Thread2 也被定位到 segments[5],接下來 Thread2 嘗試獲取鎖,沒有成功(Thread1 還未釋放),執行 scanAndLockForPut() 方法:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
            //經過Segment和hash值尋找匹配的HashEntry
            HashEntry<K,V> first = entryForHash(this, hash);
            HashEntry<K,V> e = first;
            HashEntry<K,V> node = null;
            //重試次數
            int retries = -1; // negative while locating node
            //循環嘗試獲取鎖
            while (!tryLock()) {
                HashEntry<K,V> f; // to recheck first below
                //步驟①
                if (retries < 0) {
                    //狀況① 沒找到,以前表中不存在
                    if (e == null) {
                        if (node == null) // speculatively create node
                            //新建 HashEntry 備用,retries改爲0
                            node = new HashEntry<K,V>(hash, key, value, null);
                        retries = 0;
                    }
                    //狀況② 找到,恰好第一個節點就是,retries改爲0
                    else if (key.equals(e.key))
                        retries = 0;
                    //狀況③ 第一個節點不是,移到下一個,retries仍是-1,繼續找
                    else
                        e = e.next;
                }
                //步驟②
                //嘗試了MAX_SCAN_RETRIES次還沒拿到鎖,簡直B了dog!
                else if (++retries > MAX_SCAN_RETRIES) {
                    //泉水掛機
                    lock();
                    break;
                }
                //步驟③
                //在MAX_SCAN_RETRIES次過程當中,key對應的entry發生了變化,則從頭開始
                else if ((retries & 1) == 0 &&
                         (f = entryForHash(this, hash)) != first) {
                    e = first = f; // re-traverse if entry changed
                    retries = -1;
                }
            }
            return node;
        }
複製代碼

經過上面的註釋分析能夠看出,Thread2 雖然此刻沒有權限修改,可是它也沒閒着,利用等鎖的這個時間,把本身要放的鍵值對在數組中哪一個位置計算出來了,這樣當 Thread2 一拿到鎖就能夠立馬定位到具體位置操做,節省時間。上面的步驟③稍微解釋下,好比 Thread2 經過查找得知本身要修改的值在0號位置,但在 Thread1 裏面又把該值改到了1號位置,若是它還去0號操做那確定出問題了,因此須要從新肯定。

假設 Thread2 put 值爲("亞索",「98」),對應1號位置,那麼在 scanAndLockForPut 方法中對應狀況①,畫圖存檔:

再回到 Segment put 方法中的狀況②,當 Thread1 釋放鎖後,Thread2 持有鎖,並準備把亞索放在1號位置,然而此時 Segment[5] 裏的鍵值對數量2 > 閾值1,因此調用 rehash() 方法擴容,

private void rehash(HashEntry<K,V> node) {
            /*
             * Reclassify nodes in each list to new table.  Because we
             * are using power-of-two expansion, the elements from
             * each bin must either stay at same index, or move with a
             * power of two offset. We eliminate unnecessary node
             * creation by catching cases where old nodes can be
             * reused because their next fields won't change. * Statistically, at the default threshold, only about * one-sixth of them need cloning when a table * doubles. The nodes they replace will be garbage * collectable as soon as they are no longer referenced by * any reader thread that may be in the midst of * concurrently traversing table. Entry accesses use plain * array indexing because they are followed by volatile * table write. */ //舊數組引用 HashEntry<K,V>[] oldTable = table; //舊數組長度 int oldCapacity = oldTable.length; //新數組長度爲舊數組的2倍 int newCapacity = oldCapacity << 1; //修改新的閾值 threshold = (int)(newCapacity * loadFactor); //建立新表 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; //遍歷舊錶 for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; //肯定在新表中的位置 int idx = e.hash & sizeMask; //狀況① 鏈表只有一個節點,指定轉移到新表指定位置 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { //狀況② 擴容先後位置發生改變 int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } //將改變的鍵值對放到新表的對應位置 newTable[lastIdx] = lastRun; // Clone remaining nodes //狀況③ 把鏈表中剩下的節點拷到新表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //添加新的節點(鏈表頭插入方式) int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } 複製代碼

一樣是擴容轉移,這裏的代碼比 HashMap 中的 transfer 多了一些操做,在上上篇學習 HashMap 擴容可知,擴容後鍵值對的新位置要麼和原位置同樣,要麼等於原位置+舊數組的長度,因此畫個圖來理解下上面代碼這麼寫的緣由:

前提:當前 HashEntry[] 長度爲8,閾值爲 8*0.75 = 6,因此 put 第7個鍵值對須要擴容 ,蓋倫和亞索擴容先後位置不變,妖姬和卡特擴容後位置須要加上原數組長度,因此執行上面代碼流程:

上面的代碼先找出擴容先後須要轉移的節點,先執行轉移,而後再把該條鏈上剩下的節點轉移,之因此這麼寫是起到複用的效果,註釋中也說了,在使用默認閾值的狀況下,只有大約 1/6 的節點須要被 clone 。注意到目前爲止,能夠看到不管是擴容轉移仍是新增節點,Java7都是採用的頭插入方式,流程圖以下:

相比之下,get 方法沒有加鎖/解鎖的操做,代碼比較簡單就不分析了。

稍微說下Java8

Java8 對比Java7有很大的不一樣,好比取消了Segments數組,容許併發擴容。

先看下ConcurrentHashMap的初始化

public ConcurrentHashMap() {
}
複製代碼

和Java7不同,這裏是個空方法,那麼它具體的初始化操做呢?直接看下 put 方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key/value不能爲空!!!
    if (key == null || value == null) throw new NullPointerException();
    //計算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //註釋① 表爲null則初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //CAS方法判斷指定位置是否爲null,爲空則經過建立新節點,經過CAS方法設置在指定位置
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        //當前節點正在擴容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        //指定位置不爲空
        else {
            V oldVal = null;
            //註釋② 加鎖
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //節點是鏈表的狀況
                    if (fh >= 0) {
                        binCount = 1;
                        //遍歷總體鏈
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //若是已存在,替換原值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //若是是新加節點,則以尾部插入實現添加
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    //節點是紅黑樹的狀況
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        //遍歷紅黑樹
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (binCount != 0) {
                //鏈表中節點個數超過8轉成紅黑樹
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //註釋③ 添加節點
    addCount(1L, binCount);
    return null;
}
複製代碼

代碼有點長,第一次看頗有可能引發身體不適,主要是由於引入了紅黑樹的判斷和操做,以及線程安全的操做。一樣key/value 爲空會報空指針異常,這也是和 HashMap 一個明顯的區別。

註釋①

調用 initTable 初始化數組

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // sizeCtl小於0,當前線程讓出執行權
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        //CAS 操做將 sizeCtl 值改成-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    //默認建立大小爲16的數組
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                //初始化完再改回來
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}
複製代碼

put方法並無加鎖,那麼它是如何保證建立新表的時候併發安全呢?答案就是這裏的 sizeCtl ,sizeCtl 默認值爲0,當一個線程初始化數組時,會將 sizeCtl 改爲 -1,因爲被 volatile 修飾,對於其餘線程來講這個變化是可見的,上面代碼看到後續線程判斷 sizeCtl 小於0 就會讓出執行權。

註釋②

Java8 摒棄了Segment,而是對數組中單個位置加鎖。當指定位置節點不爲 null 時,狀況與 Java8 HashMap 操做相似,新節點的添加仍是尾部插入方式。

註釋③

無論是鏈表的仍是紅黑樹,肯定以後總的節點數會加1,可能會引發擴容,Java8 ConcunrrentHashMap 支持併發擴容,以前擴容老是由一個線程將舊數組中的鍵值對轉移到新的數組中,支持併發的話,轉移所須要的時間就能夠縮短了,固然相應的併發處理控制邏輯也就更復雜了,擴容轉移經過 transfer 方法完成,Java8中該方法很長,感興趣的能夠看下源碼。。。

用一個圖來表示 Java8 ConcurrentHashMap的樣子

總結

經過分析源碼對比了 HashMap 與 ConcurrentHashMap的差異,以及Java7和Java8上 ConcurrentHashMap 設計的不一樣,固然還有不少坑沒有填,好比其中調用了不少UNSAFE的CAS方法,能夠減小性能上的消耗,平時不多用,瞭解的比較少;以及紅黑樹的具體原理和實現,後續慢慢填。。。

相關文章
相關標籤/搜索