繼上篇Dictionary源碼分析,上篇講過的在這裏不會再重複html
ConcurrentDictionary源碼地址:node
ConcurrentDictionary一大特色是線程安全,在沒有ConcurrentDictionary以前在多線程下用Dictionary,無論讀寫都要加個鎖,不但麻煩,性能上也不是很好,由於在上篇分析中咱們知道Dictionary內部是由多個bucket組成,不一樣bucket的操做即便在多線程下也能夠不互相影響,若是一個鎖把整個Dictionary都鎖住實在有點浪費。github
不過凡事都有兩面性,給每一個Bucket都加一個鎖也不可取,Bucket的數量和Dictionary元素數量是同樣的,而Bucket可能會有一部分是空的,並且訪問Dictionary的線程若是數量不是太多也根本用上不這麼多鎖,想一想即便有10個線程在不停的操做這個Dictionary,同時操做的最多也就10個,即便兩兩衝突訪問同一個Bucket,5個鎖就夠了,固然這是最好的狀況,最壞狀況是這5個bucket用同一個鎖。因此,要獲得最好的結果須要嘗試取一個最優解,而影響因素則是bucket數量和線程數量。咱們想要的結果是鎖夠用但又不浪費。算法
微軟得出的結果是默認的鎖的數量是CPU核的個數,這個線程池默認的線程數量同樣。隨着Dictionary的擴容,鎖的個數也能夠跟着增長,這個能夠在構造函數中本身指定。c#
下面看看ConcurrentDictionary裏元素是作了怎樣的封裝。數組
private volatile Tables _tables; // 這不一樣於Dictionary的bucket 數組,而是整個封裝起來,並且用volatile來保證讀寫時的原子性 private sealed class Tables { internal readonly Node[] _buckets; // bucket成了這樣,也就是ConcurrentDictionary能夠認爲是一個bucket數組,每一個Bucket裏又由next來造成鏈表 internal readonly object[] _locks; // 這個就是鎖的數組了 internal volatile int[] _countPerLock; // 這個是每一個鎖罩的元素個數 internal Tables(Node[] buckets, object[] locks, int[] countPerLock) { _buckets = buckets; _locks = locks; _countPerLock = countPerLock; } } //由Dictionary裏的Entry改爲Node,而且把next放到Node裏 private sealed class Node { internal readonly TKey _key; internal TValue _value; internal volatile Node _next; //next由volatile修飾,確保不被優化且讀寫原子性 internal readonly int _hashcode; internal Node(TKey key, TValue value, int hashcode, Node next) { _key = key; _value = value; _next = next; _hashcode = hashcode; } }
private readonly bool _growLockArray; // 是否在Dictionary擴容時也增長鎖的數量 private int _budget; // 單個鎖罩的元素的最大個數 private const int DefaultCapacity = 31; //ConcurrentDictionary默認大小,和List,Dictionary不同 private const int MaxLockNumber = 1024; //最大鎖的個數,不過也能夠在構造函數中弄個更大的,不般不必
internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer) { if (concurrencyLevel < 1) { throw new ArgumentOutOfRangeException(nameof(concurrencyLevel), SR.ConcurrentDictionary_ConcurrencyLevelMustBePositive); } if (capacity < 0) { throw new ArgumentOutOfRangeException(nameof(capacity), SR.ConcurrentDictionary_CapacityMustNotBeNegative); } if (comparer == null) throw new ArgumentNullException(nameof(comparer)); // The capacity should be at least as large as the concurrency level. Otherwise, we would have locks that don't guard // any buckets. if (capacity < concurrencyLevel) //concurrencyLevel就是鎖的個數,容量小於鎖的個數時,一部分鎖就真是徹底沒用了 { capacity = concurrencyLevel; //因此容量至少要和鎖的個數同樣 } object[] locks = new object[concurrencyLevel]; //初始化鎖數組 for (int i = 0; i < locks.Length; i++) { locks[i] = new object(); } int[] countPerLock = new int[locks.Length]; //初始化鎖罩元素個數的數組 Node[] buckets = new Node[capacity]; //初始化Node _tables = new Tables(buckets, locks, countPerLock); //初始化table _comparer = comparer; _growLockArray = growLockArray; //這就是指定鎖是否增加 _budget = buckets.Length / locks.Length; //鎖最大罩【容量/鎖個數】這麼多元素(畢竟不是扛把子,罩不了太多),多了怎麼辦,擴地盤。。 }
public bool TryGetValue(TKey key, out TValue value) { if (key == null) ThrowKeyNullException(); return TryGetValueInternal(key, _comparer.GetHashCode(key), out value); } private bool TryGetValueInternal(TKey key, int hashcode, out TValue value) { Debug.Assert(_comparer.GetHashCode(key) == hashcode); //先用本地變量存一下,省得在另一個線程擴容時變了 Tables tables = _tables; //又是hashcode取餘哈,很少說 //int bucketNo = (hashcode & 0x7fffffff) % bucketCount; int bucketNo = GetBucket(hashcode, tables._buckets.Length); //這個用Valatile確保讀了最新的Node Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]); //遍歷bucket,真懷疑這些代碼是幾我的寫的,風格都不同 while (n != null) { //找到了 if (hashcode == n._hashcode && _comparer.Equals(n._key, key)) { //返回true和value value = n._value; return true; } n = n._next; } value = default(TValue); return false; }
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory) { if (key == null) ThrowKeyNullException(); if (valueFactory == null) throw new ArgumentNullException(nameof(valueFactory)); int hashcode = _comparer.GetHashCode(key); TValue resultingValue; //先TryGet,沒有的再TryAdd if (!TryGetValueInternal(key, hashcode, out resultingValue)) { TryAddInternal(key, hashcode, valueFactory(key), false, true, out resultingValue); } return resultingValue; } private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue) { Debug.Assert(_comparer.GetHashCode(key) == hashcode); while (true) { int bucketNo, lockNo; Tables tables = _tables; //GetBucketAndLockNo函數裏面就是下面兩句 //bucketNo = (hashcode & 0x7fffffff) % bucketCount; 取餘得bucket No.,和Dictionary同樣 //lockNo = bucketNo % lockCount; 也是取餘得鎖No. 也就是一個鎖也是可能給多個Bucket用的 GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length); bool resizeDesired = false; bool lockTaken = false; try { if (acquireLock) //參數指定須要鎖的話就鎖上這個bucket的鎖,也就在構造函數初始化時不須要鎖 Monitor.Enter(tables._locks[lockNo], ref lockTaken); //這裏是作個校驗,判斷tables是否在這邊取完鎖後其餘線程把元素給擴容了,擴容會生成一個新的tables,tables變了的話上面的鎖就沒意義了,須要重來,因此這整個是在while(true)裏面 if (tables != _tables) { continue; } Node prev = null; //這裏就遍歷bucket裏的鏈表了,和Dictionary差很少 for (Node node = tables._buckets[bucketNo]; node != null; node = node._next) { Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node); if (hashcode == node._hashcode && _comparer.Equals(node._key, key))//看是否找到 { //看是否須要更新node if (updateIfExists) { if (s_isValueWriteAtomic) //這個是判斷是不是支持原子操做的值類型,好比32位上byte,int,byte,short都是原子的,而long,double就不是了,支持原子操做的直接賦值就能夠了,得注意是值類型,引用類型可不能這麼搞 { node._value = value; } else //不是原子操做的值類型就new一個node { Node newNode = new Node(node._key, value, hashcode, node._next); if (prev == null) { tables._buckets[bucketNo] = newNode; } else { prev._next = newNode; } } resultingValue = value; } else//不更新就直接取值 { resultingValue = node._value; } return false; //找到了返回false,表示不用Add就Get了 } prev = node; } // 找了一圈沒找着,就Add吧,new一個node用Volatile的寫操做寫到bucket裏 Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo])); checked//這裏若是超出int大小,拋overflow exception, 能進這裏表示一個鎖罩int.MaxValue大小的Node,真成扛把子了,極端狀況下只有一個鎖並且Node的大小已是Int.MaxValue纔可能會出現(還要看budget同不一樣意) { tables._countPerLock[lockNo]++; } //若是鎖罩的Node個數大於budget就表示差很少須要擴容了,黑社會表示地盤不夠用了 if (tables._countPerLock[lockNo] > _budget) { resizeDesired = true; } } finally { if (lockTaken) //出現異常要把鎖釋放掉 Monitor.Exit(tables._locks[lockNo]); } if (resizeDesired) { GrowTable(tables); //擴容 } resultingValue = value; //result值 return true; } }
private void GrowTable(Tables tables) { const int MaxArrayLength = 0X7FEFFFFF; int locksAcquired = 0; try { // 先把第一個鎖鎖住,省得其餘線程也要擴容走進來 AcquireLocks(0, 1, ref locksAcquired); //若是table已經變了,也就是那些等着上面鎖的線程進來發現已經擴容完了直接返回就行了 if (tables != _tables) { return; } // 計算每一個鎖罩的元素的個數總和,也就是當前元素的個數 long approxCount = 0; for (int i = 0; i < tables._countPerLock.Length; i++) { approxCount += tables._countPerLock[i]; } //若是元素總和不到Bucket大小的1/4,說明擴容擴得不是時候,歸根結底是budget小了 if (approxCount < tables._buckets.Length / 4) { _budget = 2 * _budget;//2倍增長budget if (_budget < 0) //小於0說明overflow了,看看,前面用check,這裏又用小於0。。 { _budget = int.MaxValue; //直接最大值吧 } return; } int newLength = 0; bool maximizeTableSize = false; try { checked { //2倍+1取得一個奇數做了新的容量 newLength = tables._buckets.Length * 2 + 1; //看是否能整除3/5/7,能就+2,直到不能整除爲止,也挺奇怪這算法,List是2倍,Dictionary是比2倍大的一個質數,這裏又是另一種,只能說各人有各人的算法 while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0) { newLength += 2; } Debug.Assert(newLength % 2 != 0); if (newLength > MaxArrayLength) { maximizeTableSize = true; } } } catch (OverflowException) { maximizeTableSize = true; } if (maximizeTableSize)//進這裏表示溢出了 { newLength = MaxArrayLength; //直接給最大值 _budget = int.MaxValue; //budget也給最大值,由於無法再擴容了,給小了進來也沒意義 } //擴容以後又是熟悉的從新分配元素,和Dictionary基本一致,這裏要先把全部鎖鎖住,前面已經鎖了第一個,這裏鎖其餘的 AcquireLocks(1, tables._locks.Length, ref locksAcquired); object[] newLocks = tables._locks; //若是容許增長鎖並則鎖的個數還不到1024,就增長鎖 if (_growLockArray && tables._locks.Length < MaxLockNumber) { newLocks = new object[tables._locks.Length * 2]; //也是2倍增長 Array.Copy(tables._locks, 0, newLocks, 0, tables._locks.Length); //舊鎖複製到新數組裏 for (int i = tables._locks.Length; i < newLocks.Length; i++) //再初始化增的鎖 { newLocks[i] = new object(); } } //新的Node數組 Node[] newBuckets = new Node[newLength]; int[] newCountPerLock = new int[newLocks.Length]; //遍歷bucket for (int i = 0; i < tables._buckets.Length; i++) { Node current = tables._buckets[i];//當前node while (current != null) { Node next = current._next; int newBucketNo, newLockNo; //算新的bucket No.和lock No. GetBucketAndLockNo(current._hashcode, out newBucketNo, out newLockNo, newBuckets.Length, newLocks.Length); //重建個新的node,注意next指到了上一個node,和Dictionary裏同樣 newBuckets[newBucketNo] = new Node(current._key, current._value, current._hashcode, newBuckets[newBucketNo]); checked { newCountPerLock[newLockNo]++; //這個鎖又罩了一個小弟,加一個 } current = next; } } //調整下budget _budget = Math.Max(1, newBuckets.Length / newLocks.Length); //獲得新的table _tables = new Tables(newBuckets, newLocks, newCountPerLock); } finally { // 釋放鎖 ReleaseLocks(0, locksAcquired); } }
經過這幾個函數差很少也就清楚了ConcurrentDictionary整個的原理,其餘函數有興趣的能夠去看看,都差很少這個意思。安全
說完了,總結下,ConcurrentDictionary能夠說是爲了不一個大鎖鎖住整個Dictionary帶來的性能損失而出來的,固然也是採用空間換時間,不過這空間換得仍是很值得的,一些object而已。
原理在於Dictionary本質是是一個鏈表數組,只有在多線程同時操做到數組裏同一個鏈表時才須要鎖,因此就用到一個鎖數組,每一個鎖罩着幾個小弟(bucket及bucket內的鏈表元素),這樣多線程讀寫不一樣鎖罩的區域的時候能夠同時進行而不會等待,進而提升多線程性能。
不過也凡事無絕對,不一樣業務場景的需求不同,可能Dictionary配合ReaderWriterLockSlim在某些場景(好比讀的機會遠大於寫的)可能會有更好的表現。多線程