ConcurrentDictionary併發字典知多少?

背景

在上一篇文章你真的瞭解字典嗎?一文中我介紹了Hash Function和字典的工做的基本原理.
有網友在文章底部評論,說個人Remove和Add方法沒有考慮線程安全問題.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查閱相關資料後,發現字典.net中Dictionary自己時不支持線程安全的,若是要想使用支持線程安全的字典,那麼咱們就要使用ConcurrentDictionary了.
在研究ConcurrentDictionary的源碼後,我以爲在ConcurrentDictionary的線程安全的解決思路頗有意思,其對線程安全的處理對對咱們項目中的其餘高併發場景也有必定的參考價值,在這裏再次分享個人一些學習心得和體會,但願對你們有所幫助.html

Concurrent

ConcurrentDictionary是Dictionary的線程安全版本,位於System.Collections.Concurrent的命名空間下,該命名空間下除了有ConcurrentDictionary,還有如下Class都是咱們經常使用的那些類庫的線程安全版本.node

BlockingCollection :爲實現 IProducerConsumerCollection 的線程安全集合提供阻塞和限制功能。git

ConcurrentBag :表示對象的線程安全的無序集合.github

ConcurrentQueue :表示線程安全的先進先出 (FIFO) 集合。算法

若是讀過我上一篇文章你真的瞭解字典嗎?的小夥伴,對這個ConcurrentDictionary的工做原理應該也不難理解,它是簡簡單單地在讀寫方法加個lock嗎?c#

工做原理

Dictionary

以下圖所示,在字典中,數組entries用來存儲數據,buckets做爲橋樑,每次經過hash function獲取了key的哈希值後,對這個哈希值進行取餘,即hashResult%bucketsLength=bucketIndex,餘數做爲buckets的index,而buckets的value就是這個key對應的entry所在entries中的索引,因此最終咱們就能夠經過這個索引在entries中拿到咱們想要的數據,整個過程不須要對全部數據進行遍歷,的時間複雜度爲1.api

Alt text

ConcurrentDictionary

ConcurrentDictionary的數據存儲相似,只是buckets有個更多的職責,它除了有dictionary中的buckets的橋樑的做用外,負責了數據存儲.數組

Alt text

key的哈希值與buckets的length取餘後hashResult%bucketsLength=bucketIndex,餘數做爲buckets的索引就能找到咱們要的數據所存儲的塊,當出現兩個key指向同一個塊時,即上圖中的John Smith和Sandra Dee他同時指向152怎麼辦呢?存儲節點Node具備Next屬性執行下個Node,上圖中,node 152的Next爲154,即咱們從152開始找Sandra Dee,發現不是咱們想要的,再到154找,便可取到所需數據.安全

因爲官方原版的源碼較爲複雜,理解起來有所難度,我對官方源碼作了一些精簡,下文將圍繞這個精簡版的ConcurrentDictionary展開敘述.
https://github.com/liuzhenyulive/DictionaryMini數據結構

數據結構

Node

ConcurrentDictionary中的每一個數據存儲在一個Node中,它除了存儲value信息,還存儲key信息,以及key對應的hashcode

private class Node
        {
            internal TKey m_key;   //數據的key
            internal TValue m_value;  //數據值
            internal volatile Node m_next;  //當前Node的下級節點
            internal int m_hashcode;  //key的hashcode

            //構造函數
            internal Node(TKey key, TValue value, int hashcode, Node next)
            {
                m_key = key;
                m_value = value;
                m_next = next;
                m_hashcode = hashcode;
            }
        }

Table

而整個ConcurrentDictionary的數據存儲在這樣的一個Table中,其中m_buckets的Index負責映射key,m_locks是線程鎖,下文中會有詳細介紹,m_countPerLock存儲每一個lock鎖負責的node數量.

private class Tables
        {
            internal readonly Node[] m_buckets;   //上文中提到的buckets
            internal readonly object[] m_locks;   //線程鎖
            internal volatile int[] m_countPerLock;  //索格鎖所管理的數據數量
            internal readonly IEqualityComparer<TKey> m_comparer;  //當前key對應的type的比較器

            //構造函數
            internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer)
            {
                m_buckets = buckets;
                m_locks = locks;
                m_countPerLock = countPerlock;
                m_comparer = comparer;
            }
        }

ConcurrentDictionary會在構造函數中建立Table,這裏我對原有的構造函數進行了簡化,經過默認值進行建立,其中DefaultConcurrencyLevel默認併發級別爲當前計算機處理器的線程數.

//構造函數
        public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true,
            EqualityComparer<TKey>.Default)
        {
        }

        /// <summary>
        ///
        /// </summary>
        /// <param name="concurrencyLevel">併發等級,默認爲CPU的線程數</param>
        /// <param name="capacity">默認容量,31,超過31後會自動擴容</param>
        /// <param name="growLockArray">時否動態擴充鎖的數量</param>
        /// <param name="comparer">key的比較器</param>
        internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
        {
            if (concurrencyLevel < 1)
            {
                throw new Exception("concurrencyLevel 必須爲正數");
            }

            if (capacity < 0)
            {
                throw new Exception("capacity 不能爲負數.");
            }

            if (capacity < concurrencyLevel)
            {
                capacity = concurrencyLevel;
            }

            object[] locks = new object[concurrencyLevel];
            for (int i = 0; i < locks.Length; i++)
            {
                locks[i] = new object();
            }

            int[] countPerLock = new int[locks.Length];
            Node[] buckets = new Node[capacity];
            m_tables = new Tables(buckets, locks, countPerLock, comparer);

            m_growLockArray = growLockArray;
            m_budget = buckets.Length / locks.Length;
        }

方法

ConcurrentDictionary中較爲基礎重點的方法分別位Add,Get,Remove,Grow Table方法,其餘方法基本上是創建在這四個方法的基礎上進行的擴充.

Add

向Table中添加元素有如下亮點值得咱們關注.

  • 開始操做前會聲明一個tables變量來存儲操做開始前的m_tables,在正式開始操做後(進入lock)的時候,會檢查tables在準備工做階段是否別的線程改變,若是改變了,則從新開始準備工做並重新開始.

  • 經過GetBucketAndLockNo方法獲取bucket索引以及lock索引,其內部就是取餘操做.

private void GetBucketAndLockNo(
            int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)
        {
            //0x7FFFFFFF 是long int的最大值 與它按位與數據小於等於這個最大值
            bucketNo = (hashcode & 0x7fffffff) % bucketCount;
            lockNo = bucketNo % lockCount;
        }
  • 對數據進行操做前會從m_locks取出第lockNo個對象最爲lock,操做完成後釋放該lock.多個lock必定程度上減小了阻塞的可能性.

  • 在對數據進行更新時,若是該Value的Type爲容許原子性寫入的,則直接更新該Value,不然建立一個新的node進行覆蓋.

/// <summary>
        /// Determines whether type TValue can be written atomically
        /// </summary>
        private static bool IsValueWriteAtomic()
        {
            Type valueType = typeof(TValue);

            //
            // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without
            // the risk of tearing.
            //
            // See http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf
            //
            if (valueType.IsClass)
            {
                return true;
            }
            switch (Type.GetTypeCode(valueType))
            {
                case TypeCode.Boolean:
                case TypeCode.Byte:
                case TypeCode.Char:
                case TypeCode.Int16:
                case TypeCode.Int32:
                case TypeCode.SByte:
                case TypeCode.Single:
                case TypeCode.UInt16:
                case TypeCode.UInt32:
                    return true;

                case TypeCode.Int64:
                case TypeCode.Double:
                case TypeCode.UInt64:
                    return IntPtr.Size == 8;

                default:
                    return false;
            }
        }

該方法依據CLI規範進行編寫,簡單來講,32位的計算機,對32字節如下的數據類型寫入時能夠一次寫入的而不須要移動內存指針,64位計算機對64位如下的數據可一次性寫入,不須要移動內存指針.保證了寫入的安全.
詳見12.6.6 http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf

private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)
        {
            while (true)
            {
                int bucketNo, lockNo;
                int hashcode;

                //https://www.cnblogs.com/blurhkh/p/10357576.html
                //須要瞭解一下值傳遞和引用傳遞
                Tables tables = m_tables;
                IEqualityComparer<TKey> comparer = tables.m_comparer;
                hashcode = comparer.GetHashCode(key);

                GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                bool resizeDesired = false;
                bool lockTaken = false;

                try
                {
                    if (acquireLock)
                        Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);

                    //若是表剛剛調整了大小,咱們可能沒有持有正確的鎖,必須重試。
                    //固然這種狀況不多見
                    if (tables != m_tables)
                        continue;

                    Node prev = null;
                    for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)
                    {
                        if (comparer.Equals(node.m_key, key))
                        {
                            //key在字典裏找到了。若是容許更新,則更新該key的值。
                            //咱們須要爲更新建立一個node,以支持不能以原子方式寫入的TValue類型,由於free-lock 讀取可能同時發生。
                            if (updateIfExists)
                            {
                                if (s_isValueWriteAtomic)
                                {
                                    node.m_value = value;
                                }
                                else
                                {
                                    Node newNode = new Node(node.m_key, value, hashcode, node.m_next);
                                    if (prev == null)
                                    {
                                        tables.m_buckets[bucketNo] = newNode;
                                    }
                                    else
                                    {
                                        prev.m_next = newNode;
                                    }
                                }

                                resultingValue = value;
                            }
                            else
                            {
                                resultingValue = node.m_value;
                            }

                            return false;
                        }

                        prev = node;
                    }

                    //key沒有在bucket中找到,則插入該數據
                    Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));
                    //當m_countPerLock超過Int Max時會拋出OverflowException
                    checked
                    {
                        tables.m_countPerLock[lockNo]++;
                    }

                    //
                    // 若是m_countPerLock[lockNo] > m_budget,則須要調整buckets的大小。
                    // GrowTable也可能會增長m_budget,但不會調整bucket table的大小。.
                    // 若是發現bucket table利用率很低,也會發生這種狀況。
                    //
                    if (tables.m_countPerLock[lockNo] > m_budget)
                    {
                        resizeDesired = true;
                    }
                }
                finally
                {
                    if (lockTaken)
                        Monitor.Exit(tables.m_locks[lockNo]);
                }

                if (resizeDesired)
                {
                    GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);
                }

                resultingValue = value;
                return true;
            }
        }

Get

從Table中獲取元素的的流程與前文介紹ConcurrentDictionary工做原理時一致,但有如下亮點值得關注.

  • 讀取bucket[i]在Volatile.Read()方法中進行,該方法會自動對讀取出來的數據加鎖,避免在讀取的過程當中,數據被其餘線程remove了.
  • Volatile讀取指定字段時,在讀取的內存中插入一個內存屏障,阻止處理器從新排序內存操做,若是在代碼中此方法以後出現讀取或寫入,則處理器沒法在此方法以前移動它。
public bool TryGetValue(TKey key, out TValue value)
        {
            if (key == null) throw new ArgumentNullException("key");

            // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize.
            Tables tables = m_tables;
            IEqualityComparer<TKey> comparer = tables.m_comparer;
            GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length);

            // We can get away w/out a lock here.
            // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i].
            Node n = Volatile.Read(ref tables.m_buckets[bucketNo]);

            while (n != null)
            {
                if (comparer.Equals(n.m_key, key))
                {
                    value = n.m_value;
                    return true;
                }
                n = n.m_next;
            }

            value = default(TValue);
            return false;
        }

Remove

Remove方法實現其實也並不複雜,相似咱們鏈表操做中移除某個Node.移除節點的同時,還要對先後節點進行連接,相信一塊小夥伴們確定很好理解.

private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)
        {
            while (true)
            {
                Tables tables = m_tables;

                IEqualityComparer<TKey> comparer = tables.m_comparer;

                int bucketNo, lockNo;

                GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);

                lock (tables.m_locks[lockNo])
                {
                    if (tables != m_tables)
                        continue;

                    Node prev = null;
                    for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)
                    {
                        if (comparer.Equals(curr.m_key, key))
                        {
                            if (matchValue)
                            {
                                bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);
                                if (!valuesMatch)
                                {
                                    value = default(TValue);
                                    return false;
                                }
                            }
                            if (prev == null)
                                Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next);
                            else
                            {
                                prev.m_next = curr.m_next;
                            }

                            value = curr.m_value;
                            tables.m_countPerLock[lockNo]--;
                            return true;
                        }

                        prev = curr;
                    }
                }

                value = default(TValue);
                return false;
            }
        }

Grow table

當table中任何一個m_countPerLock的數量超過了設定的閾值後,會觸發此操做對Table進行擴容.

private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys,
            int rehashCount)
        {
            int locksAcquired = 0;
            try
            {
                //首先鎖住第一個lock進行resize操做.
                AcquireLocks(0, 1, ref locksAcquired);

                if (regenerateHashKeys && rehashCount == m_keyRehashCount)
                {
                    tables = m_tables;
                }
                else
                {
                    if (tables != m_tables)
                        return;

                    long approxCount = 0;
                    for (int i = 0; i < tables.m_countPerLock.Length; i++)
                    {
                        approxCount += tables.m_countPerLock[i];
                    }

                    //若是bucket數組太空,則將預算加倍,而不是調整表的大小
                    if (approxCount < tables.m_buckets.Length / 4)
                    {
                        m_budget = 2 * m_budget;
                        if (m_budget < 0)
                        {
                            m_budget = int.MaxValue;
                        }

                        return;
                    }
                }

                int newLength = 0;
                bool maximizeTableSize = false;
                try
                {
                    checked
                    {
                        newLength = tables.m_buckets.Length * 2 + 1;
                        while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)
                        {
                            newLength += 2;
                        }
                    }
                }
                catch (OverflowException)
                {
                    maximizeTableSize = true;
                }

                if (maximizeTableSize)
                {
                    newLength = int.MaxValue;

                    m_budget = int.MaxValue;
                }

                AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);

                object[] newLocks = tables.m_locks;

                //Add more locks
                if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)
                {
                    newLocks = new object[tables.m_locks.Length * 2];
                    Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);

                    for (int i = tables.m_locks.Length; i < newLocks.Length; i++)
                    {
                        newLocks[i] = new object();
                    }
                }

                Node[] newBuckets = new Node[newLength];
                int[] newCountPerLock = new int[newLocks.Length];

                for (int i = 0; i < tables.m_buckets.Length; i++)
                {
                    Node current = tables.m_buckets[i];
                    while (current != null)
                    {
                        Node next = current.m_next;
                        int newBucketNo, newLockNo;
                        int nodeHashCode = current.m_hashcode;

                        if (regenerateHashKeys)
                        {
                            //Recompute the hash from the key
                            nodeHashCode = newComparer.GetHashCode(current.m_key);
                        }

                        GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length,
                            newLocks.Length);

                        newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode,
                            newBuckets[newBucketNo]);
                        checked
                        {
                            newCountPerLock[newLockNo]++;
                        }

                        current = next;
                    }
                }

                if (regenerateHashKeys)
                {
                    unchecked
                    {
                        m_keyRehashCount++;
                    }
                }

                m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);

                m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer);
            }
            finally
            {
                ReleaseLocks(0, locksAcquired);
            }
        }

學習感悟

  • lock[]:在以往的線程安全上,咱們對數據的保護每每是對數據的修改寫入等地方加上lock,這個lock常常上整個上下文中惟一的,這樣的設計下就可能會出現多個線程,寫入的根本不是一塊數據,卻要等待前一個線程寫入完成下一個線程才能繼續操做.在ConcurrentDictionary中,經過哈希算法,從數組lock[]中找出key的準確lock,若是不一樣的key,使用的不是同一個lock,那麼這多個線程的寫入時互不影響的.

  • 寫入要考慮線程安全,讀取呢?不能否認,在大部分場景下,讀取沒必要去考慮線程安全,可是在咱們這樣的鏈式讀取中,須要自上而下地查找,是否是有種可能在查找個過程當中,鏈路被修改了呢?因此ConcurrentDictionary中使用Volatile.Read來讀取出數據,該方法從指定字段讀取對象引用,在須要它的系統上,插入一個內存屏障,阻止處理器從新排序內存操做,若是在代碼中此方法以後出現讀取或寫入,則處理器沒法在此方法以前移動它。

  • 在ConcurrentDictionary的更新方法中,對數據進行更新時,會判斷該數據是否能夠原子寫入,若是時能夠原子寫入的,那麼就直接更新數據,若是不是,那麼會建立一個新的node覆蓋原有node,起初看到這裏時候,我百思不得其解,不知道這麼操做的目的,後面在jeo duffy的博客中Thread-safety, torn reads, and the like中找到了答案,這樣操做時爲了防止torn reads(撕裂讀取),什麼叫撕裂讀取呢?通俗地說,就是有的數據類型寫入時,要分屢次寫入,寫一次,移動一次指針,那麼就有可能寫了一半,這個結果被另一個線程讀取走了.好比說我把 劉振宇三個字改爲周杰倫的過程當中,我先把劉改爲周了,正在我準備去把振改爲傑的時候,另一個線程過來讀取結果了,讀到的數據是周振宇,這顯然是不對的.因此對這種,更安全的作法是先把周杰倫三個字寫好在一張紙條上,而後直接替換掉劉振宇.更多信息在CLI規範12.6.6有詳細介紹.

  • checkedunckecked關鍵字.很是量的運算(non-constant)運算在編譯階段和運行時下不會作溢出檢查,以下這樣的代碼時不會拋出異常的,算錯了也不會報錯。

int ten = 10;
int i2 = 2147483647 + ten;

可是咱們知道,int的最大值是2147483647,若是咱們將上面這樣的代碼嵌套在checked就會作溢出檢查了.

checked
{
int ten = 10;
int i2 = 2147483647 + ten;
}

相反,對於常量,編譯時是會作溢出檢查的,下面這樣的代碼在編譯時就會報錯的,若是咱們使用unckeck標籤進行標記,則在編譯階段不會作移除檢查.

int a = int.MaxValue * 2;

那麼問題來了,咱們固然知道checked頗有用,那麼uncheck呢?若是咱們只是須要那麼一個數而已,至於溢出不溢出的關係不大,好比說生成一個對象的HashCode,好比說根據一個算法計算出一個相對隨機數,這都是不須要準確結果的,ConcurrentDictionary中對於m_keyRehashCount++這個運算就使用了unchecked,就是由於m_keyRehashCount是用來生成哈希值的,咱們並不關心它有沒有溢出.

  • volatile關鍵字,表示一個字段多是由在同一時間執行多個線程進行修改。出於性能緣由,編譯器\運行時系統甚至硬件能夠從新排列對存儲器位置的讀取和寫入。聲明的字段volatile不受這些優化的約束。添加volatile修飾符可確保全部線程都能按照執行順序由任何其餘線程執行的易失性寫入,易失性寫入是一件瘋狂的事情的事情:普通玩家慎用.

本博客所涉及的代碼都保存在github中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs

相關文章
相關標籤/搜索