理解ConcurrentHashMap1.8源碼

ConcurrentHashMap源碼分析

其實ConcurrentHashMap我本身已經看過不少遍了,可是今天在面試阿里的時候本身在描述ConcurrentHashMap發現本身根本講不清楚什麼是ConcurrentHashMap,以及裏面是怎麼實現的,搞的我忽然發現本身什麼都不懂,因此我想要再次的來分析一下這個源碼,徹底理解ConcurrentHashMap,而不是覺得本身懂了,實際上本身不懂。html

首先咱們看一下put方法,put方法會調用到putVal方法上面。java

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
	  //若是put進去的是個鏈表,這個參數表示鏈表的大小
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
			  //初始化鏈表
            tab = initTable();
			//若是這個槽位沒有數據
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {		
				//使用CAS將這個新的node設置到hash桶裏面去
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
			//幫助遷移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
				//獲取鎖
            V oldVal = null;
            synchronized (f) {
					//雙重檢查鎖
                if (tabAt(tab, i) == f) {
						//若是hash值大於等於0,那麼表明這個節點裏的數據是鏈表
                    if (fh >= 0) {
                        binCount = 1;
							//每次遍歷完後binCount加1,表示鏈表長度
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
								//若是hash值和key值都相同,那麼覆蓋,break結束循環
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
								//下一個節點爲null,說明遍歷到尾節點了,那麼直接在尾節點設值一個新的值
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
						 //若是是紅黑樹
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
					  //若是鏈表個數大於8,那麼就調用這個方法
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

解釋一下上面的源碼作了什麼:node

  1. 首先作一下判斷,不容許key和value中任意一個爲空,不然拋出異常
  2. 計算key的hash值,而後遍歷table數組
  3. 若是table數組爲null或爲空,那麼就調用initTable作初始化
  4. 爲了保證可見性,會使用tab去table數組裏獲取數據,若是沒有數據,那麼用casTabAt經過CAS將新Node設置到table數組裏。(注:這裏也體現了和hashmap不同的地方,hashmap直接經過數據拿就行了, 這個獲取數據和設值都要保證可見性和線程安全性)
  5. 若是當前槽位所對應的hash值是MOVED,說明當前的table正在擴容遷移節點,那麼就調用helpTransfer幫助遷移
  6. 走到這裏,說明這個槽位裏面的元素不止一個,有不少個,因此給頭節點加上鎖
  7. 若是當前的hash所對應的的槽位不是空的,而且hash值大於等於0,那麼就說明這個槽位裏面的對象是一個鏈表,那麼就遍歷鏈表
    1. 若是所遍歷的鏈表裏面有元素的hash值而且key和當前要插入的數據的是同樣的,那麼就覆蓋原來的值
    2. 若是遍歷到最後的節點都沒有元素和要插入的值key是同樣的,那麼就新建一個Node節點,插入到鏈表的最後
    3. 每遍歷一個節點就把binCount+1
  8. 若是當前的節點是TreeBin,那麼說明該槽位裏面的數據是紅黑樹,那麼調用相應方法插入數據
  9. 最後若是binCount已經大於或等於8了,那麼就調用treeifyBin

接下來咱們先看initTable 方法,再看treeifyBin和helpTransfer面試

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
			//一開始的時候sizeCtl爲0
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
			//將sizeCtl用CAS設置成-1
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
						//由於sc一開始爲0,因此n取DEFAULT_CAPACITY爲16
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
						//將table賦值爲大小爲16的Node數組
                    table = tab = nt;
						//將sc的設置爲總容量的75%,若是 n 爲 16 的話,那麼這裏 sc = 12
                    sc = n - (n >>> 2);
                }
            } finally {
					//最後將sizeCtl設置爲sc的值
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

這個方法裏面初始化了一個很重要的變量sizeCtl,初始值爲總容量的75%,table初始化爲一個容量爲16的數組數組

下面咱們在看看treeifyBin方法安全

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
			//若是數據的長度小於64,那麼調用tryPresize進行擴容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
			//若是這個槽位裏面的元素是鏈表
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {			
				//給鏈表頭加上鎖
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
					 //遍歷鏈表,而後初始化紅黑樹對象
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
						//給tab槽位爲index的元素設置新的對象
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

treeifyBin這個方法裏面並非只是將鏈表轉化爲紅黑樹,而是當tab的長度大於64的時候纔會將鏈表轉成紅黑樹,不然的話,會調用tryPresize方法。多線程

而後咱們進入到tryPresize方法裏面看看,tryPresize傳入的參數是當前tab數組長度的兩倍。ide

private final void tryPresize(int size) {
		//本來傳進來的size已是兩倍了,這裏會再往上取最近的 2 的 n 次方
    int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
        tableSizeFor(size + (size >>> 1) + 1);
    int sc;
    while ((sc = sizeCtl) >= 0) {
        Node<K,V>[] tab = table; int n;
			// 這個 if 分支和以前說的初始化數組的代碼基本上是同樣的,在這裏,咱們能夠不用管這塊代碼
        if (tab == null || (n = tab.length) == 0) {
            n = (sc > c) ? sc : c;
            if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if (table == tab) {
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
            }
        }
        else if (c <= sc || n >= MAXIMUM_CAPACITY)
            break;
        else if (tab == table) {
            int rs = resizeStamp(n);
				//一開始進來的時候sc是大於0的
            if (sc < 0) {
                Node<K,V>[] nt;
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
				//將SIZECTL設置爲一個很大的複數
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
        }
    }
}

這個方法裏面,會對tab數據進行校驗,若是沒有初始化的話會從新進行初始化大小,若是是第一次進來的話會將SIZECTL設置成一個很大的複數,而後調用transfer方法,傳如當前的tab數據和null。源碼分析

接着咱們來看transfer方法,這個方法比較長,主要的擴容和轉移節點都在這個方法裏面實現,咱們將這個長方法分紅代碼塊,一步步分析:this

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
		//若是當前tab數組長度爲16
    int n = tab.length, stride;
		//那麼(n >>> 3) / NCPU  = 0 小於MIN_TRANSFER_STRIDE
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
			//將stride設置爲 16 
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
			//若是n是16,那麼nextTab就是一個容量爲32的空數組
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
			//將transferIndex賦值爲16
        transferIndex = n;
    }
		...
}

這個代碼塊主要是作nextTable、transferIndex 、stride的賦值操做。

...
//初始化nextn爲32
int nextn = nextTab.length;
//新建一個ForwardingNode對象,裏面放入長度爲32的nextTab數組
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false;
//初始化bound爲0
for (int i = 0, bound = 0;;) {
	...
}

下面的代碼會所有包裹在這個for循環裏面,因此咱們來分析一下這個for循環裏面的代碼

for (int i = 0, bound = 0;;) {
		
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
				//將nextIndex設置爲transferIndex,一開始16
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
				//一開始的時候nextIndex是和stride相同,那麼nextBound爲0,TRANSFERINDEX也爲0
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
					//這裏bound也直接爲0
                bound = nextBound;
					//i = 15
                i = nextIndex - 1;
                advance = false;
            }
        }
		...
}

這個方法是爲了設置transferIndex這個屬性,transferIndex一開始是原tab數組的長度,每次會向前移動stride大小的值,若是transferIndex減到了0或小於0,那麼就設置I等於-1,i在下面的代碼會說到。

for (int i = 0, bound = 0;;) {
		...
		//在上面一段代碼塊中,若是transferIndex已經小於等於0了,就會把i設置爲-1
		if (i < 0 || i >= n || i + n >= nextn) {
		    int sc;
				//表示遷移已經完成
		    if (finishing) {
					//將nextTable置空,表示不須要遷移了
		        nextTable = null;
					//將table設置爲新的數組
		        table = nextTab;
					//sizeCtl設置爲n的 1.5倍
		        sizeCtl = (n << 1) - (n >>> 1);
		        return;
		    }
		    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
		        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
		            return;
		        // 到這裏,說明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,
		         // 也就是說,全部的遷移任務都作完了,也就會進入到上面的 if(finishing){} 分支了
		        finishing = advance = true;
		        i = n; // recheck before commit
		    }
		}
...
}

這個方法是用來表示已經遷移完畢了,能夠退出。

for (int i = 0, bound = 0;;) {
	...
	//若是該槽位沒有元素,那麼直接把tab的i槽位設置爲fwd
	else if ((f = tabAt(tab, i)) == null)
	    advance = casTabAt(tab, i, null, fwd);
	//說明這個槽位已經有其餘線程遷移過了
	else if ((fh = f.hash) == MOVED)
	    advance = true; // already processed
	//走到這裏,說明tab的這個槽位裏面有數據,那麼咱們須要得到槽位的頭節點的監視器鎖
	else {
	    synchronized (f) {	
			if (tabAt(tab, i) == f) {
				...
			} 
		  }
	}
	...
}

在這個代碼塊中,i會從最後一個元素一個個往前移動,而後根據i這個index來判斷tab裏面槽位的狀況。

下面的代碼咱們來分析監視器鎖裏面的內容:

synchronized (f) {
	if (tabAt(tab, i) == f) {
		//fh是當前節點的hash值
		if (fh >= 0) {
		    int runBit = fh & n;
			//lastRun設置爲頭節點
		    Node<K,V> lastRun = f;
        // 須要將鏈表一分爲二,
        //   找到原鏈表中的 lastRun,而後 lastRun 及其以後的節點是一塊兒進行遷移的
        //   lastRun 以前的節點須要進行克隆,而後分到兩個鏈表中
		    for (Node<K,V> p = f.next; p != null; p = p.next) {
		        int b = p.hash & n;
		        if (b != runBit) {
		            runBit = b;
		            lastRun = p;
		        }
		    }
		    if (runBit == 0) {
		        ln = lastRun;
		        hn = null;
		    }
		    else {
		        hn = lastRun;
		        ln = null;
		    }
		    for (Node<K,V> p = f; p != lastRun; p = p.next) {
		        int ph = p.hash; K pk = p.key; V pv = p.val;
		        if ((ph & n) == 0)
		            ln = new Node<K,V>(ph, pk, pv, ln);
		        else
		            hn = new Node<K,V>(ph, pk, pv, hn);
		    }
			//其中的一個鏈表放在新數組的位置 i
		    setTabAt(nextTab, i, ln);
			//另外一個鏈表放在新數組的位置 i+n
		    setTabAt(nextTab, i + n, hn);
			//將原數組該位置處設置爲 fwd,表明該位置已經處理完畢
			//其餘線程一旦看到該位置的 hash 值爲 MOVED,就不會進行遷移了
		    setTabAt(tab, i, fwd);
			//advance 設置爲 true,表明該位置已經遷移完畢
		    advance = true;
		}
		//下面紅黑樹的遷移和上面差很少
		else if (f instanceof TreeBin) {
			....
		}
	} 
}

這個方法主要是將頭節點裏面的鏈表拆分紅兩個鏈表,而後設置到新的數組中去,再給老的數組設置爲fwd,表示這個節點已經遷移過了。

到這裏transfer方法已經分析完畢了。 這裏我再舉個例子,讓你們根據透徹的明白多線程之間是怎麼進行遷移工做的。

咱們假設stride仍是默認的16,第一次進來nextTab爲null,可是tab的長度爲32。

一開始的賦值:
1. n會設置成32,而且n只會賦值一次,表明被遷移的數組長度 
2. nextTab會被設置成一個大小爲64的數組,並塞入到新的ForwardingNode對象中去。
3. transferIndex會被賦值爲32

進入循環:
	初始化i爲0,bound爲0;
	第一次循環:
		1. 因爲advance初始化爲true,因此會進入到while循環中,循環出來後,transferIndex會被設置成16,bound被設置成16,i設置成31。這裏你可能會問
		2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置爲fwd,將advance設置成爲true

	第二次循環:
		1. --i,變爲30,--i >= bound成立,並將advance設置成false
		2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置爲fwd,將advance設置成爲true
	。。。
	第十六次循環:
		1. --i,變爲15,將transferIndex設置爲0,bound也設置爲0,i設置爲15
		2. 將原來tab[i]的元素遷移到新的數組中去,並將tab[i]設置爲fwd,將advance設置成爲true
	第三十二次循環:
		1. 這個時候--i等於-1,而且(nextIndex = transferIndex) <= 0成立,那麼會將i設置爲-1,advance設置爲false
		2. 會把SIZECTL用CAS設置爲原來的值加1,而後設置finishing爲true

	第三十三次循環:
		1. 因爲finishing爲true,那麼nextTable設置爲null,table設置爲新的數組值,sizeCtl設置爲舊tab的長度的1.5倍

原文出處:https://www.cnblogs.com/luozhiyun/p/11406557.html

相關文章
相關標籤/搜索