C# SemaphoreSlim 實現

當多個任務或線程並行運行時,難以免的對某些有限的資源進行併發的訪問。能夠考慮使用信號量來進行這方面的控制(System.Threading.Semaphore)是表示一個Windows內核的信號量對象。若是預計等待的時間較短,能夠考慮使用SemaphoreSlim,它則帶來的開銷更小。.NetFrameWork中的信號量經過跟蹤進入和離開的任務或線程來協調對資源的訪問。信號量須要知道資源的最大數量,當一個任務進入時,資源計數器會被減1,當計數器爲0時,若是有任務訪問資源,它會被阻塞,直到有任務離開爲止。
若是須要有跨進程或AppDomain的同步時,能夠考慮使用Semaphore。Semaphore是取得的Windows 內核的信號量,因此在整個系統中是有效的。它主要的接口是Release和WaitOne,使用的方式和SemaphoreSlim是一致的
信號量Semaphore是另一個CLR中的內核同步對象。在.net中,類Semaphore封裝了這個對象。與標準的排他鎖對象(Monitor,Mutex,SpinLock)不一樣的是,它不是一個排他的鎖對象,它與SemaphoreSlim,ReaderWriteLock等同樣容許多個有限的線程同時訪問共享內存資源。多線程

Semaphore就好像一個柵欄,有必定的容量,當裏面的線程數量到達設置的最大值時候,就沒有線程能夠進去。而後,若是一個線程工做完成之後出來了,那下一個線程就能夠進去了。Semaphore的WaitOne或Release等操做分別將自動地遞減或者遞增信號量的當前計數值。當線程試圖對計數值已經爲0的信號量執行WaitOne操做時,線程將阻塞直到計數值大於0。在構造Semaphore時,最少須要2個參數。信號量的初始容量和最大的容量。併發

Semaphore的WaitOne或者Release方法的調用大約會耗費1微秒的系統時間,而優化後的SemaphoreSlim則須要大體四分之一微秒。在計算中大量頻繁使用它的時候SemaphoreSlim仍是優點明顯,加上SemaphoreSlim還豐富了很多接口,更加方便咱們進行控制,因此在4.0之後的多線程開發中,推薦使用SemaphoreSlim。SemaphoreSlim的實現以下:異步

public class SemaphoreSlim : IDisposable
    {  
        private volatile int m_currentCount; //可用數的資源數,<=0開始阻塞
        private readonly int m_maxCount;
        private volatile int m_waitCount; //阻塞的線程數
        private object m_lockObj;
        private volatile ManualResetEvent m_waitHandle;
        private const int NO_MAXIMUM = Int32.MaxValue;
        //Head of list representing asynchronous waits on the semaphore.
        private TaskNode m_asyncHead;
        // Tail of list representing asynchronous waits on the semaphore.
        private TaskNode m_asyncTail;
         // A pre-completed task with Result==true
        private readonly static Task<bool> s_trueTask =
            new Task<bool>(false, true, (TaskCreationOptions)InternalTaskOptions.DoNotDispose, default(CancellationToken));

        public SemaphoreSlim(int initialCount) : this(initialCount, NO_MAXIMUM){ }        
        public SemaphoreSlim(int initialCount, int maxCount)
        {
            if (initialCount < 0 || initialCount > maxCount)
            {
                throw new ArgumentOutOfRangeException("initialCount", initialCount, GetResourceString("SemaphoreSlim_ctor_InitialCountWrong"));
            }
            if (maxCount <= 0)
            {
                throw new ArgumentOutOfRangeException("maxCount", maxCount, GetResourceString("SemaphoreSlim_ctor_MaxCountWrong"));
            }
          m_maxCount = maxCount;
            m_lockObj = new object();
            m_currentCount = initialCount;
        }
        public void Wait(){Wait(Timeout.Infinite, new CancellationToken());}
        public bool Wait(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckDispose();
            if (millisecondsTimeout < -1)
            {
                throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong"));
            }
            cancellationToken.ThrowIfCancellationRequested();
            uint startTime = 0;
            if (millisecondsTimeout != Timeout.Infinite && millisecondsTimeout > 0)
            {
                startTime = TimeoutHelper.GetTime();
            }

            bool waitSuccessful = false;
            Task<bool> asyncWaitTask = null;
            bool lockTaken = false;

            CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this);
            try
            {
                SpinWait spin = new SpinWait();
                while (m_currentCount == 0 && !spin.NextSpinWillYield)
                {
                   spin.SpinOnce();
                }
                try { }
                finally
                {
                   Monitor.Enter(m_lockObj, ref lockTaken);
                    if (lockTaken)
                    {
                        m_waitCount++;
                    }
                }

                // If there are any async waiters, for fairness we'll get in line behind
                if (m_asyncHead != null)
                {
                    Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't");
                   asyncWaitTask = WaitAsync(millisecondsTimeout, cancellationToken);
                }
                // There are no async waiters, so we can proceed with normal synchronous waiting.
                else
                {
                    // If the count > 0 we are good to move on.
                    // If not, then wait if we were given allowed some wait duration
                    OperationCanceledException oce = null;
                    if (m_currentCount == 0)
                    {
                        if (millisecondsTimeout == 0)
                        {
                            return false;
                        }
                        // Prepare for the main wait...
                        // wait until the count become greater than zero or the timeout is expired
                        try
                        {
                           waitSuccessful = WaitUntilCountOrTimeout(millisecondsTimeout, startTime, cancellationToken);
                        }
                        catch (OperationCanceledException e) { oce = e; }
                    }
                 
                    Contract.Assert(!waitSuccessful || m_currentCount > 0, "If the wait was successful, there should be count available.");
                    if (m_currentCount > 0)
                    {
                        waitSuccessful = true;
                        m_currentCount--;
                    }
                    else if (oce != null)
                    {
                        throw oce;
                    }
                    if (m_waitHandle != null && m_currentCount == 0)
                    {
                       m_waitHandle.Reset();
                    }
                }
            }
            finally
            {
                // Release the lock
                if (lockTaken)
                {
                   m_waitCount--; Monitor.Exit(m_lockObj);
                }

                // Unregister the cancellation callback.
                cancellationTokenRegistration.Dispose();
            }
            return (asyncWaitTask != null) ? asyncWaitTask.GetAwaiter().GetResult() : waitSuccessful;
        }
        
        private bool WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, CancellationToken cancellationToken)
        {
            int remainingWaitMilliseconds = Timeout.Infinite;
            //Wait on the monitor as long as the count is zero
            while (m_currentCount == 0)
            {
                // If cancelled, we throw. Trying to wait could lead to deadlock.
                cancellationToken.ThrowIfCancellationRequested();
                if (millisecondsTimeout != Timeout.Infinite)
                {
                    remainingWaitMilliseconds = TimeoutHelper.UpdateTimeOut(startTime, millisecondsTimeout);
                    if (remainingWaitMilliseconds <= 0)
                    {
                        // The thread has expires its timeout
                        return false;
                    }
                }
                // ** the actual wait **
                if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds))
                {
                    return false;
                }
            }
            return true;
        }
        public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckDispose();
            // Validate input
            if (millisecondsTimeout < -1)
            {
                throw new ArgumentOutOfRangeException("totalMilliSeconds", millisecondsTimeout, GetResourceString("SemaphoreSlim_Wait_TimeoutWrong"));
            }
            // Bail early for cancellation
            if (cancellationToken.IsCancellationRequested)
                return Task.FromCancellation<bool>(cancellationToken);

            lock (m_lockObj)
            {
                // If there are counts available, allow this waiter to succeed.
                if (m_currentCount > 0)
                {
                    --m_currentCount;
                    if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset(); return s_trueTask;
                }
                    // If there aren't, create and return a task to the caller.
                    // The task will be completed either when they've successfully acquired
                    // the semaphore or when the timeout expired or cancellation was requested.
                else
                {
                    Contract.Assert(m_currentCount == 0, "m_currentCount should never be negative");
                    var asyncWaiter = CreateAndAddAsyncWaiter();
                    return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ?
                        asyncWaiter :
                       WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken);
                }
            }
        }

        /// <summary>Creates a new task and stores it into the async waiters list.</summary>
        /// <returns>The created task.</returns>
        private TaskNode CreateAndAddAsyncWaiter()
        {
            Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
            // Create the task
            var task = new TaskNode();
            // Add it to the linked list
            if (m_asyncHead == null)
            {
                Contract.Assert(m_asyncTail == null, "If head is null, so too should be tail");
                m_asyncHead = task;
                m_asyncTail = task;
            }
            else
            {
                Contract.Assert(m_asyncTail != null, "If head is not null, neither should be tail");
             m_asyncTail.Next = task;
                task.Prev = m_asyncTail;
                m_asyncTail = task;
            }
            // Hand it back
            return task;
        }
        
        private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken)
        {
            Contract.Assert(asyncWaiter != null, "Waiter should have been constructed");
            Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");
            using (var cts = cancellationToken.CanBeCanceled ?
                CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) :
                new CancellationTokenSource())
            {
                var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token));
                if (asyncWaiter == await waitCompleted.ConfigureAwait(false))
                {
                    cts.Cancel(); // ensure that the Task.Delay task is cleaned up
                    return true; // successfully acquired
                }
            }

            // If we get here, the wait has timed out or been canceled.

            // If the await completed synchronously, we still hold the lock.  If it didn't,
            // we no longer hold the lock.  As such, acquire it.
            lock (m_lockObj)
            {
                // Remove the task from the list.  If we're successful in doing so,
                // we know that no one else has tried to complete this waiter yet,
                // so we can safely cancel or timeout.
                if (RemoveAsyncWaiter(asyncWaiter))
                {
                    cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred
                    return false; // timeout occurred
                }
            }

            // The waiter had already been removed, which means it's already completed or is about to
            // complete, so let it, and don't return until it does.
            return await asyncWaiter.ConfigureAwait(false) await asyncWaiter.ConfigureAwait(false);
        }
        public int Release(){ return Release(1);}

        public int Release(int releaseCount)
        {
            CheckDispose();

            // Validate input
            if (releaseCount < 1)
            {
                throw new ArgumentOutOfRangeException( "releaseCount", releaseCount, GetResourceString("SemaphoreSlim_Release_CountWrong"));
            }
            int returnCount;

            lock (m_lockObj)
            {
                // Read the m_currentCount into a local variable to avoid unnecessary volatile accesses inside the lock.
                int currentCount = m_currentCount;
                returnCount = currentCount;

                // If the release count would result exceeding the maximum count, throw SemaphoreFullException.
                if (m_maxCount - currentCount < releaseCount)
                {
                    throw new SemaphoreFullException();
                }

                // Increment the count by the actual release count
                currentCount += releaseCount;

                // Signal to any synchronous waiters
                int waitCount = m_waitCount;
                if (currentCount == 1 || waitCount == 1)
                {
                  Monitor.Pulse(m_lockObj);
                }
                else if (waitCount > 1)
                {
                   Monitor.PulseAll(m_lockObj);
                }

                // Now signal to any asynchronous waiters, if there are any.  While we've already
                // signaled the synchronous waiters, we still hold the lock, and thus
                // they won't have had an opportunity to acquire this yet.  So, when releasing
                // asynchronous waiters, we assume that all synchronous waiters will eventually
                // acquire the semaphore.  That could be a faulty assumption if those synchronous
                // waits are canceled, but the wait code path will handle that.
                if (m_asyncHead != null)
                {
                    Contract.Assert(m_asyncTail != null, "tail should not be null if head isn't null");
                    int maxAsyncToRelease = currentCount - waitCount;
                    while (maxAsyncToRelease > 0 && m_asyncHead != null)
                    {
                        --currentCount;
                        --maxAsyncToRelease;

                        // Get the next async waiter to release and queue it to be completed
                        var waiterTask = m_asyncHead;
                      RemoveAsyncWaiter(waiterTask); // ensures waiterTask.Next/Prev are null
 QueueWaiterTask(waiterTask);
                    }
                }
                m_currentCount = currentCount;

                // Exposing wait handle if it is not null
                if (m_waitHandle != null && returnCount == 0 && currentCount > 0)
                {
                    m_waitHandle.Set();
                }
            }

            // And return the count
            return returnCount;
        }
        
        ///Removes the waiter task from the linked list.</summary>
        private bool RemoveAsyncWaiter(TaskNode task)
        {
            Contract.Requires(task != null, "Expected non-null task");
            Contract.Assert(Monitor.IsEntered(m_lockObj), "Requires the lock be held");

            // Is the task in the list?  To be in the list, either it's the head or it has a predecessor that's in the list.
            bool wasInList = m_asyncHead == task || task.Prev != null;

            // Remove it from the linked list
            if (task.Next != null) task.Next.Prev = task.Prev;
            if (task.Prev != null) task.Prev.Next = task.Next;
            if (m_asyncHead == task) m_asyncHead = task.Next;
            if (m_asyncTail == task) m_asyncTail = task.Prev;
            Contract.Assert((m_asyncHead == null) == (m_asyncTail == null), "Head is null iff tail is null");

            // Make sure not to leak
            task.Next = task.Prev = null;

            // Return whether the task was in the list
            return wasInList;
        }
        private static void QueueWaiterTask(TaskNode waiterTask)
        {
            ThreadPool.UnsafeQueueCustomWorkItem(waiterTask, forceGlobal: false);
        }
        public int CurrentCount
        {
            get { return m_currentCount; }
        }
        public WaitHandle AvailableWaitHandle
        {
            get
            {
                CheckDispose();
                if (m_waitHandle != null)
                    return m_waitHandle;
                lock (m_lockObj)
                {
                    if (m_waitHandle == null)
                    {
                        m_waitHandle = new ManualResetEvent(m_currentCount != 0);
                    }
                }
                return m_waitHandle;
            }
        }
        private sealed class TaskNode : Task<bool>, IThreadPoolWorkItem
        {
            internal TaskNode Prev, Next;
            internal TaskNode() : base() {}

            [SecurityCritical]
            void IThreadPoolWorkItem.ExecuteWorkItem()
            {
                bool setSuccessfully = TrySetResult(true);
                Contract.Assert(setSuccessfully, "Should have been able to complete task");
            }

            [SecurityCritical]
            void IThreadPoolWorkItem.MarkAborted(ThreadAbortException tae) { /* nop */ }
        }
    }

SemaphoreSlim類有幾個私有字段很重要,m_currentCount表示可用資源,若是m_currentCount>0每次調用Wait都會減1,當m_currentCount<=0時再次調用Wait方法就會阻塞。每次調用Release方法m_currentCount都會加1.m_maxCount表示最大可用資源數,是在構造函數中指定的。m_waitCount表示當前阻塞的線程數。TaskNode m_asyncHead,m_asyncTail這2個變量主要用於異步方法async

咱們首先來看看Wait方法,這裏還有它的異步版本WaitAsync。在Wait方法中首先檢查m_currentCount是否爲0,若是是咱們用SpinWait自旋10次;任意一次Wait都須要鎖住m_lockObj對象,m_asyncHead != null表示當前已經存在異步的對象,因此咱們調用WaitAsync方法,若是沒有那麼咱們調用WaitUntilCountOrTimeout方法,該方法在m_currentCount==0會阻塞到到m_currentCount不爲0或者超時;看到WaitUntilCountOrTimeout方法中【if (!Monitor.Wait(m_lockObj, remainingWaitMilliseconds))】,就很明瞭Wait方法中【CancellationTokenRegistration cancellationTokenRegistration = cancellationToken.InternalRegisterWithoutEC(s_cancellationTokenCanceledEventHandler, this)】存在的緣由了,確實很巧妙【這裏和ManualResetEventSlim類似】。如今咱們回到WaitAsync方法,該方法也是首先檢查m_currentCount是否大於0,大於直接返回。否者調用CreateAndAddAsyncWaiter建立一個Task<bool>【Task<bool>是一個鏈表結構】,若是沒有取消且超時大於-1,那麼就調用WaitUntilCountOrTimeoutAsync方法,該方法首先包裝一個Task【var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token))】而後等待線程【await waitCompleted.ConfigureAwait(false)】返回的是asyncWaiter或者另外一個Delay的Task。若是返回的不是asyncWaiter說明已經超時須要調用RemoveAsyncWaiter,而後返回 await asyncWaiter.ConfigureAwait(false),若是返回的是asyncWaiter,那麼就調用Cancel方法。那麼這裏的asyncWaiter.ConfigureAwait(false)何時退出了【或者說不阻塞】,這就要看Release中的QueueWaiterTask方法了。ide

QueueWaiterTask方法或調用TaskNode的ExecuteWorkItem方法。
那如今咱們來看看Release方法,該方法會把currentCount加1,而後把等待線程轉爲就緒線程【Monitor.Pulse(m_lockObj)或 Monitor.PulseAll(m_lockObj)】,若是存在異步的話,看看還能夠釋放幾個異步task【 int maxAsyncToRelease = currentCount - waitCount】,這裏Release的註釋很重要,只是沒怎麼明白,現釋同步的waiters,而後在釋放異步的waiters,可是釋放同步後鎖的資源沒有釋放,在釋放異步的waiters時候是把currentCount減1,這樣感受異步waiters優先獲取資源。也不知道個人理解是否正確?
1)當ConfigureAwait(true),代碼由同步執行進入異步執行時,當前同步執行的線程上下文信息(好比HttpConext.Current,Thread.CurrentThread.CurrentCulture)就會被捕獲並保存至SynchronizationContext中,供異步執行中使用,而且供異步執行完成以後(await以後的代碼)的同步執行中使用(雖然await以後是同步執行的,可是發生了線程切換,會在另一個線程中執行「ASP.NET場景」)。這個捕獲固然是有代價的,當時咱們誤覺得性能問題是這個地方的開銷引發,但實際上這個開銷很小,在咱們的應用場景不至於會帶來性能問題。函數

2)當Configurewait(flase),則不進行線程上下文信息的捕獲,async方法中與await以後的代碼執行時就沒法獲取await以前的線程的上下文信息,在ASP.NET中最直接的影響就是HttpConext.Current的值爲null。性能

相關文章
相關標籤/搜索