先貼源碼地址html
.NET CORE很大一個好處就是代碼的開源,你能夠詳細的查看你使用類的源代碼,並學習微軟的寫法和實現思路。 node
這裏我對.net core中ConcurrentDictionary源碼進行了分析,裏面採用了Volatile.Read和write(volatile做用:確保本條指令不會因編譯器的優化而省略,且要求每次直接從內存地址讀值,而不走寄存器),而後也使用了lock這種混合鎖,並且還定義了更細顆粒度的鎖。因此多線程使用ConcurrentDictionary集合仍是比較好的選擇。git
原本想把本篇放到個人《C#異步編程系列》,不事後來感受那個系列寫的已經算是收尾了,並且之後還會有寫更多core源碼分析的文字,因此就單獨新增一個系列把。github
先上源碼,再仔細聊web
/// <summary> /// Tables that hold the internal state of the ConcurrentDictionary /// /// Wrapping the three tables in a single object allows us to atomically /// replace all tables at once. /// </summary> private sealed class Tables { // A singly-linked list for each bucket. // 單鏈表數據結構的桶,裏面的節點就是對應字典值 internal readonly Node[] _buckets; // A set of locks, each guarding a section of the table. //鎖的數組 internal readonly object[] _locks; // The number of elements guarded by each lock. internal volatile int[] _countPerLock; internal Tables(Node[] buckets, object[] locks, int[] countPerLock) { _buckets = buckets; _locks = locks; _countPerLock = countPerLock; } } /// <summary> /// A node in a singly-linked list representing a particular hash table bucket. /// 由Dictionary裏的Entry改爲Node,而且把next放到Node裏 /// </summary> private sealed class Node { internal readonly TKey _key; internal TValue _value; internal volatile Node _next; internal readonly int _hashcode; internal Node(TKey key, TValue value, int hashcode, Node next) { _key = key; _value = value; _next = next; _hashcode = hashcode; } } private volatile Tables _tables; // Internal tables of the dictionary private IEqualityComparer<TKey> _comparer; // Key equality comparer // The maximum number of elements per lock before a resize operation is triggered // 每一個鎖對應的元素最大個數,若是超過,要從新進行resize tables private int _budget;
首先,內部類定義爲私有且密封,這樣就保證了沒法從外部進行篡改,並且注意volatile關鍵字的使用,這確保了咱們多線程操做的時候,最終都是去內存中讀取對應地址的值和操做對應地址的值。算法
internal readonly object[] _locks; internal volatile int[] _countPerLock;
以上兩個類是爲了高性能及併發鎖所創建的對象,實際方法上鎖時,使用以下語句編程
lock (tables._locks[lockNo]) Monitor.Enter(tables._locks[lockNo], ref lockTaken);
以上兩種調用方式是等價的,都會阻塞執行,直到獲取到鎖(對於Monitor我不少時候會盡量使用TryEnter,畢竟不阻塞,不過這個類的實現必定要使用阻塞式的,這樣程序邏輯才能繼續往下走。更多關於Monitor我在 《C#異步編程(四)混合模式線程同步》裏面有詳細介紹)windows
這樣,實現了顆粒化到每一個單獨的鍵值的鎖,最大限度的保證了併發。數組
這裏lockNo參數是經過GetBucketAndLockNo方法獲取的,方法經過out變量返回值。
/// <summary> /// Computes the bucket and lock number for a particular key. ///這裏獲取桶的索引和鎖的索引,注意,鎖的索引和桶未必是同一個值。 /// </summary> private static void GetBucketAndLockNo(int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount) { bucketNo = (hashcode & 0x7fffffff) % bucketCount; lockNo = bucketNo % lockCount; }
上面方法中
hashcode 是經過private IEqualityComparer<TKey> _comparer對象的GetHashCode方法經過key獲取到的。
bucketCount是整個table的長度。
lockCount是現有的鎖的數組
咱們從最簡單的TryAdd方法開始介紹,這裏ConcurrentDictionary類的封裝很是合理,暴露出來的方法,不少是經過統一的內部方法進行執行,好比更新刪除等操做等,都有類內部惟一的私有方法進行執行,而後經過向外暴漏各類參數不一樣的方法,來實現不一樣行爲。
public bool TryAdd(TKey key, TValue value) { if (key == null) ThrowKeyNullException(); TValue dummy; return TryAddInternal(key, _comparer.GetHashCode(key), value, false, true, out dummy); }
上面TryAddInternal的參數對應以下
/// <summary> /// Shared internal implementation for inserts and updates. /// If key exists, we always return false; and if updateIfExists == true we force update with value; /// If key doesn't exist, we always add value and return true; /// </summary> private bool TryAddInternal(TKey key, int hashcode, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
也就說說,updateIfExists爲false,存在值的狀況下,TryAdd不會更新原有值,而是直接返回false。個人多線程併發寫庫就是利用了這個特性,這個案例我會在本文最後介紹。如今咱們來看TryAddInternal內部,廢話很少說,上源碼(大部分都註釋過了,因此直接閱讀便可)
//while包在外面,爲了continue,若是發生了_tables私有變量在操做過程被其餘線程修改的狀況 while (true) { int bucketNo, lockNo; //變量複製到方法本地變量 判斷tables是否在操做過程當中被其餘線程修改。 Tables tables = _tables; //提到過的獲取桶的索引和鎖的索引 GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length); //是否要擴大tables bool resizeDesired = false; //是否成功獲取鎖,成功的話會在final塊中進行退出 bool lockTaken = false; try { if (acquireLock) Monitor.Enter(tables._locks[lockNo], ref lockTaken); // If the table just got resized, we may not be holding the right lock, and must retry. // This should be a rare occurrence. if (tables != _tables) { continue; } // Try to find this key in the bucket Node prev = null; //這裏若是找到對應地址爲空,會直接跳出循環,說明對應的key沒有添加鍋 //不爲空的時候,會進行返回false(具體是否更新根據updateIfExists)(固然也存在會有相同_hashcode值的狀況,因此還要對key進行斷定,key不一樣,繼續日後找,直到找到相同key) for (Node node = tables._buckets[bucketNo]; node != null; node = node._next) { Debug.Assert((prev == null && node == tables._buckets[bucketNo]) || prev._next == node); //對hashcode和key進行斷定,確保找到的就是要更新的 if (hashcode == node._hashcode && _comparer.Equals(node._key, key)) { // The key was found in the dictionary. If updates are allowed, update the value for that key. // We need to create a new node for the update, in order to support TValue types that cannot // be written atomically, since lock-free reads may be happening concurrently. if (updateIfExists) { if (s_isValueWriteAtomic) { node._value = value; } else { Node newNode = new Node(node._key, value, hashcode, node._next); if (prev == null) { Volatile.Write(ref tables._buckets[bucketNo], newNode); } else { prev._next = newNode; } } resultingValue = value; } else { resultingValue = node._value; } return false; } prev = node; } // The key was not found in the bucket. Insert the key-value pair. Volatile.Write<Node>(ref tables._buckets[bucketNo], new Node(key, value, hashcode, tables._buckets[bucketNo])); checked { tables._countPerLock[lockNo]++; } // // If the number of elements guarded by this lock has exceeded the budget, resize the bucket table. // It is also possible that GrowTable will increase the budget but won't resize the bucket table. // That happens if the bucket table is found to be poorly utilized due to a bad hash function. // if (tables._countPerLock[lockNo] > _budget) { resizeDesired = true; } } finally { if (lockTaken) Monitor.Exit(tables._locks[lockNo]); } // // The fact that we got here means that we just performed an insertion. If necessary, we will grow the table. // // Concurrency notes: // - Notice that we are not holding any locks at when calling GrowTable. This is necessary to prevent deadlocks. // - As a result, it is possible that GrowTable will be called unnecessarily. But, GrowTable will obtain lock 0 // and then verify that the table we passed to it as the argument is still the current table. // if (resizeDesired) { GrowTable(tables); } resultingValue = value; return true; }
ContainsKey和TryGetValue其實內部最後調用的都是私有TryGetValueInternal,這裏ContainsKey調用TryGetValue。
ContainsKey方法
/// <summary> /// Determines whether the ConcurrentDictionary{TKey, TValue} contains the specified key. /// </summary> /// <param name="key">The key to locate in the</param> /// <returns>true if the ConcurrentDictionary{TKey, TValue} contains an element withthe specified key; otherwise, false.</returns> public bool ContainsKey(TKey key) { if (key == null) ThrowKeyNullException(); TValue throwAwayValue; return TryGetValue(key, out throwAwayValue); }
TryGetValue方法
/// <summary> /// Attempts to get the value associated with the specified key from the ConcurrentDictionary{TKey,TValue}. /// </summary> /// <param name="key">The key of the value to get.</param> /// <param name="value">When this method returns, <paramref name="value"/> contains the object from /// the ConcurrentDictionary{TKey,TValue} with the specified key or the default value of /// <returns>true if the key was found in the <see cref="ConcurrentDictionary{TKey,TValue}"/>; /// otherwise, false.</returns> public bool TryGetValue(TKey key, out TValue value) { if (key == null) ThrowKeyNullException(); return TryGetValueInternal(key, _comparer.GetHashCode(key), out value); }
TryGetValueInternal方法
private bool TryGetValueInternal(TKey key, int hashcode, out TValue value) { //用本地變量保存這個table的快照。 // We must capture the _buckets field in a local variable. It is set to a new table on each table resize. Tables tables = _tables; //獲取key對應的桶位置 int bucketNo = GetBucket(hashcode, tables._buckets.Length); // We can get away w/out a lock here. // The Volatile.Read ensures that we have a copy of the reference to tables._buckets[bucketNo]. // This protects us from reading fields ('_hashcode', '_key', '_value' and '_next') of different instances. Node n = Volatile.Read<Node>(ref tables._buckets[bucketNo]); //若是key相符 ,賦值,否則繼續尋找下一個。 while (n != null) { if (hashcode == n._hashcode && _comparer.Equals(n._key, key)) { value = n._value; return true; } n = n._next; } value = default(TValue);//沒找到就賦默認值 return false; }
TryRemove方法
public bool TryRemove(TKey key, out TValue value) { if (key == null) ThrowKeyNullException(); return TryRemoveInternal(key, out value, false, default(TValue)); }
這個方法會調用內部私用的TryRemoveInternal
/// <summary> /// Removes the specified key from the dictionary if it exists and returns its associated value. /// If matchValue flag is set, the key will be removed only if is associated with a particular /// value. /// </summary> /// <param name="key">The key to search for and remove if it exists.</param> /// <param name="value">The variable into which the removed value, if found, is stored.</param> /// <param name="matchValue">Whether removal of the key is conditional on its value.</param> /// <param name="oldValue">The conditional value to compare against if <paramref name="matchValue"/> is true</param> /// <returns></returns> private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue) { int hashcode = _comparer.GetHashCode(key); while (true) { Tables tables = _tables; int bucketNo, lockNo; //這裏獲取桶的索引和鎖的索引,注意,鎖的索引和桶未必是同一個值,具體算法看源碼。 GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables._buckets.Length, tables._locks.Length); //這裏鎖住的只是對應這個index指向的鎖,而不是全部鎖。 lock (tables._locks[lockNo]) { //這裏table可能被從新分配,因此這裏再次獲取,看獲得的是否是同一個table // If the table just got resized, we may not be holding the right lock, and must retry. // This should be a rare occurrence. if (tables != _tables) { continue; } Node prev = null; //這裏同一個桶,可能由於連地址,有不少值,因此要對比key for (Node curr = tables._buckets[bucketNo]; curr != null; curr = curr._next) { Debug.Assert((prev == null && curr == tables._buckets[bucketNo]) || prev._next == curr); //對比是否是要刪除的的那個元素 if (hashcode == curr._hashcode && _comparer.Equals(curr._key, key)) { if (matchValue) { bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr._value); if (!valuesMatch) { value = default(TValue); return false; } } //執行刪除,判斷有沒有上一個節點。而後修改節點指針或地址。 if (prev == null) { Volatile.Write<Node>(ref tables._buckets[bucketNo], curr._next); } else { prev._next = curr._next; } value = curr._value; tables._countPerLock[lockNo]--; return true; } prev = curr; } } value = default(TValue); return false; } }
以前作項目時候,有個奇怪的場景,就是打電話的時候回調接口保存通話記錄,這裏經過CallId來惟一識別每次通話,可是前端程序是經過websocket跟通話服務創建鏈接(通話服務是另一個公司作的)。客戶是呼叫中心,通常在網頁端都是多個頁面操做,因此會有多個websocket鏈接,這時候每次通話,每一個頁面都會回調接口端,保存相同的通話記錄,併發是同一時間的。
咱們最先考慮使用消息隊列來過濾重複的請求,可是我仔細考慮了下,發現使用ConcurrentDictionary方式的實現更簡單,具體實現以下(我精簡了下代碼):
private static ConcurrentDictionary<string,string> _strDic=new ConcurrentDictionary<string, string>(); public async Task<BaseResponse> AddUserByAccount(string callId) { if ( _strDic.ContainsKey(callId)) { return BaseResponse.GetBaseResponse(BusinessStatusType.Failed,"鍵值已存在"); } //成功寫入 if (_strDic.TryAdd(callId,callId)) { var recordExist =await _userRepository.FirstOrDefaultAsync(c => c.CallId == callId); if (recordExist ==null) { Record record=new Record { CallId = callId, ………… ………… IsVerify=1 }; _userRepository.Insert(record); _userRepository.SaveChanges(); } return BaseResponse.GetBaseResponse(BusinessStatusType.OK); } //嘗試競爭線程,寫入失敗 return BaseResponse.GetBaseResponse(BusinessStatusType.Failed,"寫入失敗"); }
這裏若是進行同時的併發請求,最後請求均可以經過if ( _strDic.ContainsKey(callId))的斷定,由於全部線程同時讀取,都是未寫入狀態。可是多個線程會在TryAdd時有競爭,並且ConcurrentDictionary的實現保證了只有一個線程能夠成功更新,其餘的都返回失敗。
這個是我寫完本篇文字,無心瀏覽博客園時候看到的(文字地址https://www.cnblogs.com/CreateMyself/p/6086752.html),本身試了下,確實會出現線程不安全。本來實例以下
基本程序
class Program { private static readonly ConcurrentDictionary<string, string> _dictionary = new ConcurrentDictionary<string, string>(); private static int _runCount = 0; public static void Main(string[] args) { var task1 = Task.Run(() => PrintValue("JeffckWang")); var task2 = Task.Run(() => PrintValue("cnblogs")); Task.WaitAll(task1, task2); PrintValue("JeffckyWang from cnblogs"); Console.WriteLine(string.Format("運行次數爲:{0}", _runCount)); Console.ReadKey(); } public static void PrintValue(string valueToPrint) { var valueFound = _dictionary.GetOrAdd("key", x => { Interlocked.Increment(ref _runCount); return valueToPrint; }); Console.WriteLine(valueFound); } }
運行結果
我截圖了下GetOrAdd的源碼,問題出如今紅框部位。多線程同時運行的狀況下,這個判斷都會爲true,由於同時都拿不到值,而後2個線程就同時進行新增,最後就致使可能出現的結果不一致。
對於這個問題,其實windows團隊也是知道的,目前已開源的 Microsoft.AspNetCore.Mvc.Core ,咱們能夠查看中間件管道源代碼以下:
/// <summary> /// Builds a middleware pipeline after receiving the pipeline from a pipeline provider /// </summary> public class MiddlewareFilterBuilder { // 'GetOrAdd' call on the dictionary is not thread safe and we might end up creating the pipeline more // once. To prevent this Lazy<> is used. In the worst case multiple Lazy<> objects are created for multiple // threads but only one of the objects succeeds in creating a pipeline. private readonly ConcurrentDictionary<Type, Lazy<RequestDelegate>> _pipelinesCache = new ConcurrentDictionary<Type, Lazy<RequestDelegate>>(); private readonly MiddlewareFilterConfigurationProvider _configurationProvider; public IApplicationBuilder ApplicationBuilder { get; set; } }
經過ConcurrentDictionary類調用上述方法沒法保證委託調用的次數,在對於mvc中間管道只能初始化一次因此ASP.NET Core團隊使用Lazy<>來初始化,此時咱們將上述也進行上述對應的修改,以下:
class Program { private static readonly ConcurrentDictionary<string, Lazy<string>> _lazyDictionary = new ConcurrentDictionary<string, Lazy<string>>(); private static int _runCount = 0; public static void Main(string[] args) { var task1 = Task.Run(() => PrintValue("JeffckWang")); var task2 = Task.Run(() => PrintValue("cnblogs")); Task.WaitAll(task1, task2); PrintValue("JeffckyWang from cnblogs"); Console.WriteLine(_runCount); Console.ReadKey(); } public static void PrintValue(string valueToPrint) { var valueFound = _lazyDictionary.GetOrAdd("key", x => new Lazy<string>( () => { Interlocked.Increment(ref _runCount); return valueToPrint; })); Console.WriteLine(valueFound.Value); } }
運行結果以下
咱們將第二個參數修改成Lazy<string>,最終調用valueFound.value將調用次數輸出到控制檯上。此時咱們再來解釋上述整個過程發生了什麼。
(1)線程1調用GetOrAdd方法時,此鍵不存在,此時會調用valueFactory這個委託。
(2)線程2也調用GetOrAdd方法,此時線程1還未完成,此時也會調用valueFactory這個委託。
(3)線程1完成調用,返回一個未初始化的Lazy<string>對象,此時在Lazy<string>對象上的委託還未進行調用,此時檢查未存在鍵key的值,因而將Lazy<striing>插入到字典中,並返回給調用者。
(4)線程2也完成調用,此時返回一個未初始化的Lazy<string>對象,在此以前檢查到已存在鍵key的值經過線程1被保存到了字典中,因此會中斷建立(由於方法的updateIfExists爲false),因而其值會被線程1中的值所代替並返回給調用者。
(5)線程1調用Lazy<string>.Value,委託的調用以線程安全的方式運行,因此若是被兩個線程同時調用則只運行一次。
(6)線程2調用Lazy<string>.Value,此時相同的Lazy<string>剛被線程1初始化過,此時則不會再進行第二次委託調用,若是線程1的委託初始化還未完成,此時線程2將被阻塞,直到完成爲止,線程2才進行調用。(也就是Lazy寫法強制使相同的委託同一時間只能執行一個,不知道我這個理解對不對)
(7)線程3調用GetOrAdd方法,此時已存在鍵key則再也不調用委託,直接返回鍵key保存的結果給調用者。
上述使用Lazy來強迫咱們運行委託只運行一次,若是調用委託比較耗時此時不利用Lazy來實現那麼將調用屢次,結果可想而知,如今咱們只須要運行一次,雖然兩者結果是同樣的。咱們經過調用Lazy<string>.Value來促使委託以線程安全的方式運行,從而保證在某一個時刻只有一個線程在運行,其餘調用Lazy<string>.Value將會被阻塞直到第一個調用執行完,其他的線程將使用相同的結果。
咱們接下來看看Lazy對象。方便演示咱們定義一個博客類
public class Blog { public string BlogName { get; set; } public Blog() { Console.WriteLine("博客構造函數被調用"); BlogName = "JeffckyWang"; } }
接下來在控制檯進行調用:
var blog = new Lazy<Blog>(); Console.WriteLine("博客對象被定義"); if (!blog.IsValueCreated) Console.WriteLine("博客對象還未被初始化"); Console.WriteLine("博客名稱爲:" + (blog.Value as Blog).BlogName); if (blog.IsValueCreated) Console.WriteLine("博客對象如今已經被初始化完畢");
打印以下:
經過上述打印咱們知道當調用blog.Value時,此時博客對象才被建立並返回對象中的屬性字段的值,上述布爾屬性即IsValueCreated顯示代表Lazy對象是否已經被初始化,上述初始化對象過程能夠簡述以下:
var lazyBlog = new Lazy<Blog> ( () => { var blogObj = new Blog() { BlogName = "JeffckyWang" }; return blogObj; } );
打印結果和上述一致。上述運行都是在非線程安全的模式下進行,要是在多線程環境下對象只被建立一次咱們須要用到以下構造函數:
public Lazy(LazyThreadSafetyMode mode); public Lazy(Func<T> valueFactory, LazyThreadSafetyMode mode);
經過指定LazyThreadSafetyMode的枚舉值來進行。
(1)None = 0【線程不安全】
(2)PublicationOnly = 1【針對於多線程,有多個線程運行初始化方法時,當第一個線程完成時其值則會設置到其餘線程】
(3)ExecutionAndPublication = 2【針對單線程,加鎖機制,每一個初始化方法執行完畢,其值則相應的輸出】
默認的模式爲 LazyThreadSafetyMode.ExecutionAndPublication【針對單線程,加鎖機制,每一個初始化方法執行完畢,其值則相應的輸出】保證委託只執行一次。爲了避免破壞原生調用ConcurrentDictionary的GetOrAdd方法,可是又爲了保證線程安全,咱們封裝一個方法來方便進行調用。
public class LazyConcurrentDictionary<TKey, TValue> { private readonly ConcurrentDictionary<TKey, Lazy<TValue>> concurrentDictionary; public LazyConcurrentDictionary() { this.concurrentDictionary = new ConcurrentDictionary<TKey, Lazy<TValue>>(); } public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory) { var lazyResult = this.concurrentDictionary.GetOrAdd(key, k => new Lazy<TValue>(() => valueFactory(k), LazyThreadSafetyMode.ExecutionAndPublication)); return lazyResult.Value; } }
原封不動的進行方法調用:
private static int _runCount = 0; private static readonly LazyConcurrentDictionary<string, string> _lazyDictionary = new LazyConcurrentDictionary<string, string>(); public static void Main(string[] args) { var task1 = Task.Run(() => PrintValue("JeffckyWang")); var task2 = Task.Run(() => PrintValue("cnblogs")); Task.WaitAll(task1, task2); PrintValue("JeffckyWang from cnblogs"); Console.WriteLine(string.Format("運行次數爲:{0}", _runCount)); Console.Read(); } public static void PrintValue(string valueToPrint) { var valueFound = _lazyDictionary.GetOrAdd("key", x => { Interlocked.Increment(ref _runCount); Thread.Sleep(100); return valueToPrint; }); Console.WriteLine(valueFound); }
最終正確打印只運行一次的結果,以下: