最近同事對 .net core memcached 緩存客戶端 EnyimMemcachedCore 進行了高併發下的壓力測試,發如今 linux 上高併發下使用 async 異步方法讀取緩存數據會出現大量失敗的狀況,好比在一次測試中,100萬次讀取緩存,只有12次成功,999988次失敗,好恐怖。若是改成同步方法,沒有一次失敗,100%成功。奇怪的是,一樣的壓力測試程序在 Windows 上異步讀取卻沒問題,100%成功。html
排查後發現是2個地方使用的鎖引發的,一個是 ManualResetEventSlim ,一個是 Semaphore ,這2個鎖是在同步方法中使用的,但 aync 異步方法中調用了這2個同步方法,咱們來分別看一下。linux
使用 ManualResetEventSlim 是在建立 Socket 鏈接時用於控制鏈接超時git
var args = new SocketAsyncEventArgs(); using (var mres = new ManualResetEventSlim()) { args.Completed += (s, e) => mres.Set(); if (socket.ConnectAsync(args)) { if (!mres.Wait(timeout)) { throw new TimeoutException("Could not connect to " + endpoint); } } }
使用 Semaphore 是在從 EnyimMemcachedCore 本身實現的 Socket 鏈接池獲取 Socket 鏈接時github
if (!this.semaphore.WaitOne(this.queueTimeout)) { message = "Pool is full, timeouting. " + _endPoint; if (_isDebugEnabled) _logger.LogDebug(message); result.Fail(message, new TimeoutException()); // everyone is so busy return result; }
爲了棄用這個2個鎖形成的異步併發問題,採起了下面2個改進措施:api
1)對於 ManualResetEventSlim ,參考 corefx 中 SqlClient 的 SNITcpHandle 的實現,改用 CancellationTokenSource 控制鏈接超時緩存
var cts = new CancellationTokenSource(); cts.CancelAfter(timeout); void Cancel() { if (!socket.Connected) { socket.Dispose(); } } cts.Token.Register(Cancel); socket.Connect(endpoint); if (socket.Connected) { connected = true; } else { socket.Dispose(); }
2)對於 Semaphore ,根據同事提交的 PR ,將 Semaphore 換成 SemaphoreSlim ,用 SemaphoreSlim.WaitAsync 方法等待信號量鎖併發
if (!await this.semaphore.WaitAsync(this.queueTimeout)) { message = "Pool is full, timeouting. " + _endPoint; if (_isDebugEnabled) _logger.LogDebug(message); result.Fail(message, new TimeoutException()); // everyone is so busy return result; }
改進後,壓力測試結果立馬與同步方法同樣,100% 成功!app
爲何會這樣?異步
咱們到 github 的 coreclr 倉庫(針對 .net core 2.2)中看看 ManualResetEventSlim 與 Semaphore 的實現源碼,看可否找到一些線索。socket
(一)
先看看 ManualResetEventSlim.Wait 方法的實現代碼(523開始):
1)先 SpinWait 等待
var spinner = new SpinWait(); while (spinner.Count < spinCount) { spinner.SpinOnce(sleep1Threshold: -1); if (IsSet) { return true; } }
SpinWait 等待時間比較短,不會形成長時間阻塞線程。
在高併發下大量線程在爭搶鎖,因此大量線程在這個階段等不到鎖。
2)而後 Monitor.Wait 等待
try { // ** the actual wait ** if (!Monitor.Wait(m_lock, realMillisecondsTimeout)) return false; //return immediately if the timeout has expired. } finally { // Clean up: we're done waiting. Waiters = Waiters - 1; }
Monitor.Wait 對應的實現代碼
[MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, object obj); public static bool Wait(object obj, int millisecondsTimeout, bool exitContext) { if (obj == null) throw (new ArgumentNullException(nameof(obj))); return ObjWait(exitContext, millisecondsTimeout, obj); }
最終調用的是一個本地庫的 ObjWait 方法。
查閱一下 Monitor.Wait 方法的幫助文檔:
Releases the lock on an object and blocks the current thread until it reacquires the lock. If the specified time-out interval elapses, the thread enters the ready queue.
Monitor.Wait 的確會阻塞當前線程,這在異步高併發下會帶來問題,詳見一碼阻塞,萬碼等待:ASP.NET Core 同步方法調用異步方法「死鎖」的真相。
(二)
再看看 Semaphore 的實現代碼,它繼承自 WaitHandle , Semaphore.Wait 實際調用的是 WaitHandle.Wait ,後者調用的是 WaitOneNative ,這是一個本地庫的方法
[MethodImplAttribute(MethodImplOptions.InternalCall)] private static extern int WaitOneNative(SafeHandle waitableSafeHandle, uint millisecondsTimeout, bool hasThreadAffinity, bool exitContext);
.net core 3.0 中有些變化,這裏調用的是 WaitOneCore 方法
[MethodImpl(MethodImplOptions.InternalCall)] private static extern int WaitOneCore(IntPtr waitHandle, int millisecondsTimeout);
查閱一下 WaitHandle.Wait 方法的幫助文檔:
Blocks the current thread until the current WaitHandle receives a signal, using a 32-bit signed integer to specify the time interval in milliseconds.
WaitHandle.Wait 也會阻塞當前線程。
2個地方在等待鎖時都會阻塞線程,難怪高併發下會出問題。
(三)
接着閱讀 SemaphoreSlim 的源碼學習它是如何在 WaitAsync 中實現異步等待鎖的?
public Task<bool> WaitAsync(int millisecondsTimeout, CancellationToken cancellationToken) { //... lock (m_lockObj!) { // If there are counts available, allow this waiter to succeed. if (m_currentCount > 0) { --m_currentCount; if (m_waitHandle != null && m_currentCount == 0) m_waitHandle.Reset(); return s_trueTask; } else if (millisecondsTimeout == 0) { // No counts, if timeout is zero fail fast return s_falseTask; } // If there aren't, create and return a task to the caller. // The task will be completed either when they've successfully acquired // the semaphore or when the timeout expired or cancellation was requested. else { Debug.Assert(m_currentCount == 0, "m_currentCount should never be negative"); var asyncWaiter = CreateAndAddAsyncWaiter(); return (millisecondsTimeout == Timeout.Infinite && !cancellationToken.CanBeCanceled) ? asyncWaiter : WaitUntilCountOrTimeoutAsync(asyncWaiter, millisecondsTimeout, cancellationToken); } } }
重點看 else 部分的代碼,SemaphoreSlim.WaitAsync 造了一個專門用於等待鎖的 Task —— TaskNode ,CreateAndAddAsyncWaiter 就用於建立 TaskNode 的實例
private TaskNode CreateAndAddAsyncWaiter() { // Create the task var task = new TaskNode(); // Add it to the linked list if (m_asyncHead == null) { m_asyncHead = task; m_asyncTail = task; } else { m_asyncTail.Next = task; task.Prev = m_asyncTail; m_asyncTail = task; } // Hand it back return task; }
從上面的代碼看到 TaskNode 用到了鏈表,神奇的等鎖專用 Task —— TaskNode 是如何實現的呢?
private sealed class TaskNode : Task<bool> { internal TaskNode? Prev, Next; internal TaskNode() : base((object?)null, TaskCreationOptions.RunContinuationsAsynchronously) { } }
好簡單!
那 SemaphoreSlim.WaitAsync 如何用 TaskNode 實現指定了超時時間的鎖等待?
看 WaitUntilCountOrTimeoutAsync 方法的實現源碼:
private async Task<bool> WaitUntilCountOrTimeoutAsync(TaskNode asyncWaiter, int millisecondsTimeout, CancellationToken cancellationToken) { // Wait until either the task is completed, timeout occurs, or cancellation is requested. // We need to ensure that the Task.Delay task is appropriately cleaned up if the await // completes due to the asyncWaiter completing, so we use our own token that we can explicitly // cancel, and we chain the caller's supplied token into it. using (var cts = cancellationToken.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, default(CancellationToken)) : new CancellationTokenSource()) { var waitCompleted = Task.WhenAny(asyncWaiter, Task.Delay(millisecondsTimeout, cts.Token)); if (asyncWaiter == await waitCompleted.ConfigureAwait(false)) { cts.Cancel(); // ensure that the Task.Delay task is cleaned up return true; // successfully acquired } } // If we get here, the wait has timed out or been canceled. // If the await completed synchronously, we still hold the lock. If it didn't, // we no longer hold the lock. As such, acquire it. lock (m_lockObj) { // Remove the task from the list. If we're successful in doing so, // we know that no one else has tried to complete this waiter yet, // so we can safely cancel or timeout. if (RemoveAsyncWaiter(asyncWaiter)) { cancellationToken.ThrowIfCancellationRequested(); // cancellation occurred return false; // timeout occurred } } // The waiter had already been removed, which means it's already completed or is about to // complete, so let it, and don't return until it does. return await asyncWaiter.ConfigureAwait(false); }
用 Task.WhenAny 等待 TaskNode 與 Task.Delay ,等其中任一者先完成,簡單到可怕。
又一次經過 .net core 源碼欣賞了高手是怎麼玩轉 Task 的。
【2019-5-6更新】
今天將 Task.WhenAny + Task.Delay 的招式用到了異步鏈接 Socket 的超時控制中
var connTask = _socket.ConnectAsync(_endpoint); if (await Task.WhenAny(connTask, Task.Delay(_connectionTimeout)) == connTask) { await connTask; }