ConcurrentHashMap源碼

  推薦文章:
  網上的文章,jdk7版本的比較多,因爲本身本地是jdk8,因此仍是整理jdk8的邏輯吧。
------------------------------------------------------------------------------------------
1、結構跟思路說明,主要字段解釋
  HashMap的源碼咱們比較清楚,大概結構就是數組加鏈表,數組默認長度爲16,負載因子爲0.75,超事後擴容,長度爲當前數組長度*2,數組下的鏈表元素超過8個時轉換爲紅黑樹結構,優化查找性能。jdk8中ConcurrentHashMap的結構跟HashMap同樣,也是直接數組加鏈表,每一個鏈表的首節點做爲synchronized的同步對象使用,對該鏈表數據的訪問都要通過同步塊synchronized,跟jdk7不同,這點最後再說。jdk8的難點在於多線程擴容部分。
  主要字段說明:
  int DEFAULT_CAPACITY = 16; // 默認容量,哈希表數組的初始長度
  int DEFAULT_CONCURRENCY_LEVEL = 16; // 默認併發級別,也就是容許的最大併發線程數
  float LOAD_FACTOR = 0.75f; // 負載因子
  int TREEIFY_THRESHOLD = 8; // 轉化爲紅黑樹的閾值
  volatile Node<K,V>[] table; // 哈希表數組
  volatile int sizeCtl; // 用來控制初始化跟擴容的字段。-1表示初始化,-(1+擴容線程數)表示在擴容或者縮容,默認值0,未初始化且大於0則表示初始容量,擴容後值爲下一次應該擴容的閾值(當前容量*0.75)
2、put源碼
  put總體邏輯就是根據key哈希值計算在哈希桶的位置,而後將數據放入鏈表,具體以下:
  一、若是未初始化則先進行初始化,完成後再次進入循環;
  二、若找到位置,該處元素爲null,則用cas方式進行設置,成功則退出,不然進入下一次循環;
  三、若發現對應位置的鏈表首節點hash值爲-1,說明在進行擴容,則當前線程也幫助進行擴容,擴容完成後繼續進行put操做;
  四、以上判斷都結束後,synchronized鎖定鏈表首節點,進入鏈表遍歷,進行設置。
  相關代碼:
public V put(K key, V value) {
    return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) { // onlyIfAbsent,是否保留原來的值,默認false,也就是覆蓋舊值
    if (key == null || value == null) throw new NullPointerException();  // 不容許null的key跟value,由於有可能會做爲對象鎖使用,synchronized是基於monitor機制的,null沒有對象頭
    int hash = spread(key.hashCode()); // 對hashcode進行散列,獲取一個相對分佈更加均勻的hash值。這個散列是用hashcode的高16位跟低16位進行異或運算獲得的一個結果。
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) { // 哈希數組賦值給tab----爲啥要
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0) // 哈希數組默認是空的,要初始化
            tab = initTable(); // 見下文,new了一個node數組
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 根據hash值取table中元素,該元素爲空
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) // cas設置該元素的值,失敗則說明有線程競爭,break進入下次循環
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED) // 從名字來看,正在擴容,MOVED 值爲-1,這個擴容部分再看
            tab = helpTransfer(tab, f); // 
        else {  // 找到哈希表中的位置,並且該位置元素不爲null,hash值也不是-1,進入正常的遍歷鏈表賦值
            V oldVal = null;
            synchronized (f) { // 就是這個操做,把鏈表首節點做爲鎖對象進行同步(也多是紅黑樹頭節點)
                if (tabAt(tab, i) == f) { // 普通Node節點
                    if (fh >= 0) { // 首節點的hash值
                        binCount = 1; 
                        for (Node<K,V> e = f;; ++binCount) { 
                            K ek;
                            if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // key 的hash值相等且equals比較也相同,則覆蓋
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) { // 在鏈表追加
                                pred.next = new Node<K,V>(hash, key, value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { // 紅黑樹節點
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD) // 鏈表長度達到8個,轉換爲紅黑樹節點--這個並不徹底是
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
  對於鏈表過長而轉換爲紅黑樹,實際是並非,還要看哈希表容量,容量小於64的話,哈希表擴容,通常來講就解決了單節點過長問題。若單鏈表過長,且哈希表長度超過64,這纔會轉換爲紅黑樹。
  哈希桶(數組)初始化過程,也是整個map的初始化過程:
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) { // 數組爲空
        if ((sc = sizeCtl) < 0)  // 初始化的"功勞"被別人搶走了
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {  // 搶奪初始化"功勞"的操做,cas設置sizeCtl = -1
            try {
                if ((tab = table) == null || tab.length == 0) {
                      int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 數組大小默認爲16
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //建了個node數組,長度爲sizeCtl大小(默認16)
                    table = tab = nt;  // 數組賦給哈希表
                    sc = n - (n >>> 2); // sc 變爲原來的3/4,爲啥是這個數,由於負載因子0.75,正好用這個數作記錄
                }
            } finally {
                sizeCtl = sc;  // 第12行的數據給了sizeCtl
            }
            break;
        }
    }
    return tab; 
}
  這裏的這個spread(),就是把高位拿下來也參與了hash。由於若是僅僅取餘的話,其實看的只是最低的幾位,即便高位不一樣,最後也會被分到同一個位置,這會致使數據在hash表中分佈不均勻。將高位跟低位一塊兒參與運算,能夠適當的減輕這種狀況。
3、get源碼
  get的主要邏輯也比較清晰,就是根據哈希值進行取餘來肯定位置,而後根據equals來比較是否相等,是則取出。hash值爲負數,則認爲是在進行擴容致使的數據遷移。數據遷移的總體邏輯最後整理。邏輯以下:
  一、判斷是否已初始化,若未初始化,則直接返回null;hash值對應的鏈表首節點爲null,也直接返回null;
  二、hash值跟鏈表首節點相等,且key地址相同或者equals相等,則認爲鏈表首節點就是要找的節點,返回其值;
  三、若鏈表首節點的hash值爲負數,說明map在擴容,鏈表可能數據不全;經過節點Node的find方法進行查找;(這個方法須要分析一下)
  四、正常的遍歷鏈表進行查找;
  相關代碼:
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());  // 跟put同樣的散列操做,結果最後會被修正爲正數
    if ((tab = table) != null && (n = tab.length) > 0 &&  (e = tabAt(tab, (n - 1) & h)) != null) { //table已經初始化,並且hash值對應的位置,首節點有值,未初始化直接返回null
        if ((eh = e.hash) == h) { // 首節點hash值跟要查找的key的hash值同樣
            if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 地址或者equals結果相同
                return e.val;
        }
        else if (eh < 0)  // hash值不同的狀況(元素能被分到這個地方,按道理hash應該是取餘相等):hash值  < 0,說明在擴容或者爲紅黑樹
            return (p = e.find(h, key)) != null ? p.val : null; //查找元素,支持紅黑樹節點
        while ((e = e.next) != null) { // hash值不同,挨個查找
            if (e.hash == h &&  ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
4、擴容源碼
  先理清楚方法中幾個變量的含義:
  int size ---- 本次想要擴充的數量(通常是數組長度的一半(右移1位),putAll的時候是新map的長度)
  int c ---- 當前數組擴容後的大小
private final void tryPresize(int size) {
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);  // 計算本次擴容的目標值
    int sc;
    while ((sc = sizeCtl) >= 0) { // 目標值不必定足夠大,要跟sizeCtl進行比較
        Node<K,V>[] tab = table; int n;
        if (tab == null || (n = tab.length) == 0) { //沒有初始化
            n = (sc > c) ? sc : c;  //擴容的閾值跟計算的目標值比較,較大者爲目標值(這裏至關於用一個map來初始化concurrentHashMap了)
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 搶奪初始化,Unsafe的這個方法說明見下文
                try {
                    if (table == tab) {
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY) // 沒到達擴容閾值或者表已經太大,沒法再擴容
            break;
        else if (tab == table) { // 
            int rs = resizeStamp(n);
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}
  這裏是擴容的主要方法,咱們先理清楚擴容的容量是怎麼變化的:
  直接調用這個tryPresize(int size)的有兩個地方putAll(Map m)跟treeifyBin(Node[] tab, int index),前者傳進來的參數是m.size(),後者是tab.length << 1。結合initTable,初始化後的數組長度爲16,若是觸發了treeifyBin,因爲數組長度小於64,會進行數組擴容而不是轉換爲紅黑樹,擴容時傳值過來的是16 << 1也就是32,而後這個tableSizeFor(32+(32 >>> 1) +1)也就是tableSizeFor(49),tableSizeFor()是個對入參往上取2的最小冪的過程,也就是64。看到沒,由於初始數組長度是16,若是在數組長度不是很長的狀況下觸發了紅黑樹轉換機制,那麼數組長度直接成了64,而不是32!!這是爲了省事兒吧,由於這種狀況下,hash表元素很少,而單鏈表的長度竟然到了8個,能夠說是元素分佈的很是不均勻了,極端一點一下擴容多點,相對來講能使元素分佈的分散一點。之後再有鏈表過長的,也不會觸發這個tryPresize了,由於第一次擴容後就已經達到紅黑樹要求的數組長度最低值了。
而後看putAll(Map m)這個方法,這個是將另外的map添加到ConcurrentHashMap中來的方法,擴容的目標數值是size + (size >>> 1) + 1,size爲新來的map的元素個數,這個具體最終要擴展到多少要跟sizeCtl進行比較,若是sizeCtl大則說明不用擴容,直接退出,不然進行擴容。
  U.compareAndSwapInt(this, SIZECTL, sc, -1)這一行是調用的jdk的Unsafe類的一個方法,看名字是cas操做修改一個int值。Unsafe類是jdk用於操做內存等一些精細操做的類,僅限於jdk使用,咱們調用會報錯(原理是檢查類加載對象,咱們能夠有方法繞過去),有四個參數compareAndSwapInt(this, offset, expect, update),第一個參數是被修改的對象,offset是被修改字段在對象中的內存偏移量(從對象在內存中的起始位置算到該字段的偏移量),第三個是指望值,是被修改字段的初始值,至關於樂觀鎖的版本號,update是若是cas判斷符合後字段要新後的新值。這一部分的相關介紹,請參考: https://blog.csdn.net/sherld/article/details/42492259
到這裏,總體思路相對來講比較明確。複雜的是擴容的具體執行transfer的代碼;這裏要考慮多個線程的併發擴容問題。讀了幾遍仍是有的地方沒太明白:
transfer的思路:
  爲了便於多線程並行處理,不會引發衝突,這裏對哈希桶進行了分段,每一個線程處理的數據爲一個「步長」--也就是一個stride的長度,數據遷移完成後再繼續處理下一個分段。須要注意的是對於整個哈希桶來講,數據的遷移是從末尾開始倒着往前進行的。好比哈希桶數組的長度是64,則先進行遷移的是49-64這16個鏈表對應的數據,這也正是一個步長的長度。對於舊數組,擴容線程每完成一個鏈表的數據遷移,就會將該鏈表首節點元素修改成ForwardingNode,查找元素能夠經過該對象的find方法進行,remove,pudate,put則要優先執行擴容,而後再進行相關操做。
  int stride :步長,每一個擴容線程要處理的哈希桶的位數;
  int transferIndex :遷移下標,下一個要進行擴容的線程應該獲取步長的起始位置;注意是倒着來的;
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { //每次擴容結束,nextTab 會被置空,所以第一個擴容的線程的nextTab應該是null
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)    // 步長的計算,默認最小值是16,so,數組不是很大的狀況下,通常是16。從計算過程看出,對8線程cpu來講,這個值要比16大,n最小應該是64*16=1024
        stride = MIN_TRANSFER_STRIDE; // subdivide range  
    if (nextTab == null) {            // initiating    //新數組的初始化,直接容量翻倍
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;  //由於是倒着來的,注意這裏是n而不是0
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;   // hash桶完成的標誌位,true已經處理過了,false未處理
    boolean finishing = false; // to ensure sweep before committing nextTab   //整個擴容完成的標誌位
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) { // 更新待遷移的hash桶索引
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {  //本線程任務搶佔成功,將下個線程要開始的位置設置好
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {  // 全部節點完成
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 本線程遷移完成,
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 最後一個線程
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)  //該位置爲空,不須要遷移,直接放一個fwd對象,表示遷移過了
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)   //已是fwd對象,說明遷移完了,不用處理
            advance = true; // already processed
        else {
            synchronized (f) { // 正常的遷移邏輯,跟hashmap相似,利用按位與操做將鏈表數據巧妙的分到兩個鏈表中,並且位置也已經計算好了
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    if (fh >= 0) {
                        //普通節點遷移,略
                        advance = true;
                    }
                    else if (f instanceof TreeBin) {
                        //樹節點遷移,略
                        advance = true;
                    }
                }
            }
        }
    }
}
  第3一、32行代碼的解釋:第一個擴容的線程,執行transfer方法以前,會設置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),後續幫其擴容的線程,執行transfer方法以前,會設置 sizeCtl = sizeCtl+1,每個退出transfer的方法的線程,退出以前,會設置 sizeCtl = sizeCtl-1,那麼最後一個線程退出時:必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT。
  本覺得ConcurrentHashMap的結構瞭解了,翻看源碼應該會很容易,結果實際花費的時間比預期多的多。主要緣由是對於擴容的transfer思路以及其中各類標誌位的處理以及這麼處理的緣由不甚瞭解,須要進行反覆猜想推理。也幸而有各路大神提早研究過,這纔跟在大神身後亦步亦趨,終於理清楚。
相關文章
相關標籤/搜索