AsyncLocal
的實現很簡單,將AsyncLocal
實例和當前線程的值以鍵值對的形式保存在Thread.CurrentThread.ExecutionContext.m_localValues.
中。因爲使用[ThreadStatic]
修飾了 Thread.CurrentThread
屬性對應的字段,因此實現了多個線程之間各自維護不一樣的一份數據。同時,在每一次修改AsyncLocal
的時候,都新建了ExecutionContext
和IAsyncLocalValueMap
對象並賦值給當前的線程。異步
AsyncLocal
的測試代碼class AsyncLocalTests : Singleton<AsyncLocalTests>,ITestMethod { private readonly AsyncLocal<int> asyncLocalVariable = new AsyncLocal<int>(); public async Task MethodAsync() { asyncLocalVariable.Value = 88; await Task.Run(() => { Console.WriteLine($"進入 Task,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); asyncLocalVariable.Value = 888; }); Console.WriteLine($"await Task 後,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); } public void RunTest() { asyncLocalVariable.Value = 1; Console.WriteLine($"初始值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); MethodAsync(); Thread.Sleep(1000); Console.WriteLine($"async方法後,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}"); Console.ReadKey(); } }
MethodAsync()
異步方法先後,AsyncLocal
.Value
的值相同await MethodAsync()
異步方法先後,AsyncLocal
.Value
的值相同await Task.Run()
先後代碼塊中,AsyncLocal
.Value
的值相同因爲AsyncLocal
是從 Thread.CurrentThread.ExecutionContext
獲取實際的值,那麼理解ExecutionContext
在 async、Task、Thread
的中流動就十分重要。先說結論,並簡單描述一下緣由:async
Task.Run()
先後,ExecutionContext
相同(處於不一樣線程)緣由:這是由於 Task.Run()
和Thread.Start()
會捕獲當前線程的 ExecutionContext
傳遞給工做線程,而且在工做線程修改 AsyncLocal
的值, 不會影響原線程的ExecutionContext
。由於每次修改 AsyncLocal
的值,都會新建 ExecutionContext
實例並保存到工做線程ide
MethodAsync()
先後代碼塊的 ExecutionContext
相同(不使用await
)緣由:在狀態機第一次執行先後會備份、恢復ExecutionContext
(線程並無進行切換)函數
await Task.Run()
先後代碼塊的 ExecutionContext
相同(處於不一樣線程)緣由:咱們知道await Task.Run()
確定位於一個異步方法中,該異步方法會被編譯成一個狀態機,經過狀態的切換,將await
先後的代碼分紅了兩步來執行。第一次執行由當前線程執行,在開啓新 Task
後、當前線程返回以前,會保存當前線程的 ExecutionContext
,供狀態機第二次執行使用(工做線程)。從第一點咱們知道新建Task
實例的時候會捕獲一次ExecutionContext
給工做線程,碰到await
返回以前會捕獲一次ExecutionContext
給狀態機,這兩次捕獲的其實是同一個對象。oop
await MethodAsync()
先後代碼塊的 ExecutionContext
相同(處於不一樣線程)緣由:在狀態機第一次執行(當前線程)的時候,會捕獲當前線程的 ExecutionContext
,供狀態機第二次執行使用(工做線程)。測試
async
關鍵字對ExecutionContext
的影響async
關鍵字其實是編譯器的語法糖,能夠經過Dnspy
反編譯查看去除語法糖的原始代碼。
Dnspy
配置以下,去除勾選"反編譯異步方法(async/await)"
ui
public async Task MethodAsyncWithAwait() { asyncLocalVariable.Value = 88; await Task.Run(() => { asyncLocalVariable.Value = 888; }); asyncLocalVariable.Value = 8888; }
async/await
語法糖代碼:能夠看到編譯器生成了一個實現 IAsyncStateMachine
接口的異步狀態機,並生成了一個私有方法,保存了Task.Run()
中的的代碼塊。異步方法中實際上作了如下四個步驟:this
AsyncTaskMethodBuilder
AsyncTaskMethodBuilder.Sratr(ref IAsyncStateMachine)
啓動狀態機Task
public Task MethodAsyncWithAwait() { AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = new AsyncLocalTests.<MethodAsyncWithAwait>d__1(); <MethodAsyncWithAwait>d__.<>4__this = this; <MethodAsyncWithAwait>d__.<>t__builder = AsyncTaskMethodBuilder.Create(); <MethodAsyncWithAwait>d__.<>1__state = -1; AsyncTaskMethodBuilder <>t__builder = <MethodAsyncWithAwait>d__.<>t__builder; <>t__builder.Start<AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref <MethodAsyncWithAwait>d__); return <MethodAsyncWithAwait>d__.<>t__builder.Task; }
能夠看到在執行 stateMachine.MoveNext()
以前備份了當前線程的 _executionContext
和 _synchronizationContext
,而且在 finally
代碼塊中恢復了備份的數據。
這樣也就解釋了:在不使用 await
等待異步方法的狀況下,雖然在原線程修改了AsyncLocal
的值,可是離開async
方法後,咱們獲取的仍是原來的值。值得注意的是,這裏的備份恢復針對的都是當前線程,而不涉及到工做線程。spa
public void Start<[Nullable(0)] TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { AsyncMethodBuilderCore.Start<TStateMachine>(ref stateMachine); } internal static class AsyncMethodBuilderCore { [DebuggerStepThrough] public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { if (stateMachine == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine); } Thread currentThread = Thread.CurrentThread; Thread thread = currentThread; ExecutionContext executionContext = currentThread._executionContext; ExecutionContext executionContext2 = executionContext; SynchronizationContext synchronizationContext = currentThread._synchronizationContext; try { stateMachine.MoveNext(); } finally { SynchronizationContext synchronizationContext2 = synchronizationContext; Thread thread2 = thread; if (synchronizationContext2 != thread2._synchronizationContext) { thread2._synchronizationContext = synchronizationContext2; } ExecutionContext executionContext3 = executionContext2; ExecutionContext executionContext4 = thread2._executionContext; if (executionContext3 != executionContext4) { ExecutionContext.RestoreChangedContextToThread(thread2, executionContext3, executionContext4); } } }
實際上編譯器將標記爲 async
的方法分紅了兩部分,一部分是 await
以前的代碼(包括新建並啓動啓動Task
部分),另外一部分是 await
以後的代碼。經過狀態的改變,這兩部分代碼分兩次執行。若是沒有使用await
修飾異步方法,那麼該狀態機沒有 else
代碼塊, 只會執行一次stateMachine.MoveNext()
。能夠看到在第一次執行stateMachine.MoveNext()
以後,當前線程就直接返回了,而後一層層的返回到最外層。這也是爲何說碰到await
以後,當前線程就直接返回,固然最內層的返回是在開啓新Task
以後。線程
[CompilerGenerated] private void <MethodAsyncWithAwait>b__1_0() { this.asyncLocalVariable.Value = 888; } [CompilerGenerated] private sealed class <MethodAsyncWithAwait>d__1 : IAsyncStateMachine { void IAsyncStateMachine.MoveNext() { int num = this.<>1__state; try { TaskAwaiter awaiter; if (num != 0) { this.<>4__this.asyncLocalVariable.Value = 88; awaiter = Task.Run(new Action(this.<>4__this.<MethodAsyncWithAwait>b__1_0)).GetAwaiter(); if (!awaiter.IsCompleted) { this.<>1__state = 0; this.<>u__1 = awaiter; AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = this; this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref awaiter, ref <MethodAsyncWithAwait>d__); return; } } else { awaiter = this.<>u__1; this.<>u__1 = default(TaskAwaiter); this.<>1__state = -1; } awaiter.GetResult(); this.<>4__this.asyncLocalVariable.Value = 8888; } catch (Exception exception) { this.<>1__state = -2; this.<>t__builder.SetException(exception); return; } this.<>1__state = -2; this.<>t__builder.SetResult(); } [DebuggerHidden] void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine) { } public int <>1__state; public AsyncTaskMethodBuilder <>t__builder; public AsyncLocalTests <>4__this; private TaskAwaiter <>u__1; }
咱們能夠看到在第一次執行stateMachine.MoveNext()
的時候,會經過當前線程執行 await
以前的代碼塊,並經過Task.Run()
啓用工做線程去完成任務。IAsyncStateMachine.MoveNext()
裏面有三句代碼比較重要,這裏先大概描述一下做用:
Task.Run()
:新建Task
實例、捕獲當前線程的ExecutionContext
保存到Task
實例中、啓動新任務
AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine)
:將狀態機和當前線程的上下文包裝成 AsyncStateMachineBox:Task
對象,保存到在 Task(第一步新建的
字段中,最後將其傳遞到 Task
實例).m_continuationObjectstateMachine.AsyncTaskMethodBuilder.Task
屬性中, 這樣外層狀態機能夠經過MethodAsync().GetAwaiter().m_task
獲取到內層狀態機的AsyncStateMachineBox:Task
對象,一樣的外層狀態機再次執行本步驟,將自身的 AsyncStateMachineBox:Task
對象賦值給內層 AsyncStateMachineBox:Task
對象的 m_continuationObject
字段,這樣的話,就構建了一個單向鏈表,該鏈表保存了每一層異步方法的stateMachine
和 ExecutionContext
。
this.<>t__builder.SetResult()
:Task.m_continuationObject
是否爲空,不爲空的狀況下,執行外層狀態機的第二次 stateMachine.MoveNext()
。最內層 Task.m_continuationObject
的執行會在Task
完成以後調用,接下來經過SetResult()
一層層調用了外層狀態機的第二次 stateMachine.MoveNext()
AwaitUnsafeOnCompleted()
代碼分析如下只放出了簡化的代碼。這裏首先構建 IAsyncStateMachineBox
實例,並將其賦值給 m_task
供外層狀態機使用。IAsyncStateMachineBox
實例保存了本層的狀態機,並捕獲了當前線程的ExecutionContext
。awaiter.m_task
是調用內層異步方法返回的 Task
實例。最後在 TaskAwaiter.UnsafeOnCompletedInternal()
方法中,將構建的IAsyncStateMachineBox
實例保存到 awaiter(內層).m_task.m_continuationObject
字段中,使得內層狀態機指向本層狀態機。由於每一層狀態機都會調用AwaitUnsafeOnCompleted
方法,因此一層層構建了 await
後的全部回調,而且每一層回調的 ExecutionContext
都不一樣。
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { IAsyncStateMachineBox box = GetStateMachineBox(ref stateMachine); if ((null != (object)default(TAwaiter)!) && (awaiter is ITaskAwaiter)) { ref TaskAwaiter ta = ref Unsafe.As<TAwaiter, TaskAwaiter>(ref awaiter); TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, continueOnCapturedContext: true); } }
private IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { ExecutionContext? currentContext = ExecutionContext.Capture(); AsyncStateMachineBox<TStateMachine> box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ? CreateDebugFinalizableAsyncStateMachineBox<TStateMachine>() : new AsyncStateMachineBox<TStateMachine>(); m_task = box; box.StateMachine = stateMachine; box.Context = currentContext; return box; }
SetResult()
代碼分析該方法最後調用了 Task
方法,讀取了Task.m_continuationObject
來獲取外一層的回調(包括狀態機、ExecutionContext
),並經過FinishContinuations()
執行外層狀態機的第二次執行。
internal bool TrySetResult([AllowNull] TResult result) { bool result2 = false; if (base.AtomicStateUpdate(67108864, 90177536)) { this.m_result = result; Interlocked.Exchange(ref this.m_stateFlags, this.m_stateFlags | 16777216); Task.ContingentProperties contingentProperties = this.m_contingentProperties; if (contingentProperties != null) { base.NotifyParentIfPotentiallyAttachedTask(); contingentProperties.SetCompleted(); } base.FinishContinuations(); result2 = true; } return result2; }
FinishContinuations()
方法接下來的調用在 Task.Run()
部分會有介紹。
internal void FinishContinuations() { object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel); if (obj != null) { this.RunContinuations(obj); } }
Task.Run()
代碼分析實際上在Task.Run()
內部也是先新建一個Task
實例,而後經過Task.ScheduleAndStart()
方法來調度並啓動任務。二者的區別在於傳入的Options
和scheduler
是不相同的。
var task1 = Task.Run(() => { }); //默認配置沒法更改: InternalTaskOptions.QueuedByRuntime, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default //t.ScheduleAndStart(false); var task2 = new Task(() => { }, TaskCreationOptions.LongRunning); //默認配置能夠修改: InternalTaskOptions.None, TaskCreationOptions.None, scheduler:null task2.Start(); //能夠傳入scheduler //默認使用TaskScheduler.Current: 先取[ThreadStatic]Task.InternalCurrent,若是爲空取 TaskScheduler.Default //t.ScheduleAndStart(true);
經過在Task
的構造函數中調用ExecutionContext.Capture()
方法來保存當前線程的ExecutionContext
到Task
實例中,這樣的話,只要將到Task
實例做爲參數傳入到工做線程中,工做線程就能夠獲取到ExecutionContext
internal Task(Delegate action, object state, Task parent, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) { if (action == null) { ThrowHelper.ThrowArgumentNullException(ExceptionArgument.action); } if (parent != null && (creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None) { this.EnsureContingentPropertiesInitializedUnsafe().m_parent = parent; } this.TaskConstructorCore(action, state, cancellationToken, creationOptions, internalOptions, scheduler); this.CapturedContext = ExecutionContext.Capture(); }
Task.ScheduleAndStart()
方法:Task
的調度分兩種狀況:一、配置了TaskCreationOptions.LongRunning
的Task
實例直接新建一個後臺 Thread
,並將Task
實例做爲啓動參數來啓動工做線程;二、對於沒有配置TaskCreationOptions.LongRunning
的Task
實例,將其加入ThreadPool
的線程池,由線程池來調度運行
internal sealed class ThreadPoolTaskScheduler : TaskScheduler { protected internal override void QueueTask(Task task) { TaskCreationOptions options = task.Options; if ((options & TaskCreationOptions.LongRunning) != 0) { // Run LongRunning tasks on their own dedicated thread. Thread thread = new Thread(s_longRunningThreadWork); thread.IsBackground = true; // Keep this thread from blocking process shutdown thread.Start(task); } else { // Normal handling for non-LongRunning tasks. bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0); ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal); } } }
雖然已經在Task的構造函數中,捕獲了ExecutionContext
,可是對於直接新建的Thread實例,啓動的時候一樣也須要捕獲當前線程的ExecutionContext
public void Start() { this.StartupSetApartmentStateInternal(); if (this._delegate != null) { ThreadHelper threadHelper = (ThreadHelper)this._delegate.Target; ExecutionContext executionContextHelper = ExecutionContext.Capture(); threadHelper.SetExecutionContextHelper(executionContextHelper); } this.StartInternal(); }
Task、IThreadPoolWorkItem
邏輯:從線程池取出工做線程,工做線程調用Dispatch()
方法,對於IThreadPoolWorkItem
,直接執行IThreadPoolWorkItem.Execute()
方法,因此線程池處理IThreadPoolWorkItem
是不涉及到上下文切換的。對於 Task
,將ExecutionContext
賦值給工做線程,調用委託,而後清除工做線程的上下文,最後調用Finish(true)
來執行任務完成的回調方法。調用鏈路很長,這裏直接跳到RunContinuations()
方法。
調用鏈路:
// 最內層 async 狀態機 執行await Task.Run()以後的代碼塊 //=>Task.Finish(true); //=>FinishStageTwo(); //===>FinishStageThree(); //=====>FinishContinuations(); //=======>RunContinuations(continuationObject); //=========>AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag); 狀態機的回調執行這個 //==========>box.ExecuteFromThreadPool(threadPoolThread); 或者 box.MoveNext();
咱們知道,異步方法的最內層確定有一個 await Task
。 正是Task.Finish(true)
這個方法調用了最內層狀態機,去執行第二次stateMachine.MoveNext()
方法,而且在MoveNext()
方法中都會調用SetResult()
方法,從而觸發外層狀態機的第二次stateMachine.MoveNext()
執行,就這樣一層層的調用完成了全部的層次的回調。能夠看到,工做線程在執行 await Task.Run()/MethodAsnc()
後代碼塊時,傳入的是在 AwaitUnsafeOnCompleted()
方法中捕獲的 ExecutionContext
。
s_callback
字段保存了狀態機的MoveNext()
方法。
private class AsyncStateMachineBox<TStateMachine> :Task<TResult>,IAsyncStateMachineBox where TStateMachine : IAsyncStateMachine { private static readonly ContextCallback s_callback = ExecutionContextCallback; private static void ExecutionContextCallback(object? s) { Unsafe.As<AsyncStateMachineBox<TStateMachine>>(s).StateMachine!.MoveNext(); } public TStateMachine StateMachine = default; public ExecutionContext? Context; internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread); internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread); public void MoveNext() => MoveNext(threadPoolThread: null); private void MoveNext(Thread? threadPoolThread) { bool loggingOn = AsyncCausalityTracer.LoggingOn; if (loggingOn) { AsyncCausalityTracer.TraceSynchronousWorkStart(this, CausalitySynchronousWork.Execution); } ExecutionContext? context = Context; if (context == null) { Debug.Assert(StateMachine != null); StateMachine.MoveNext(); } else { if (threadPoolThread is null) { ExecutionContext.RunInternal(context, s_callback, this); } else { ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this); } } if (IsCompleted) { if (System.Threading.Tasks.Task.s_asyncDebuggingEnabled) { System.Threading.Tasks.Task.RemoveFromActiveTasks(this); } StateMachine = default; Context = default; } if (loggingOn) { AsyncCausalityTracer.TraceSynchronousWorkCompletion(CausalitySynchronousWork.Execution); } } }
最後在 RunInternal
和 RunFromThreadPoolDispatchLoop
中,都會使用在AwaitUnsafeOnCompleted()
方法裏面捕獲的 ExecutionContext
,這也就解釋了爲何在 await Task.Run()/MethodAsync()
先後的代碼塊中,ExecutionContext
始終相同。須要注意的一點是,無論在Task
任務執行以後,仍是 await
回調執行以後,都會把工做線程的上下文清空。
internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state) { if (executionContext != null && !executionContext.m_isDefault) { ExecutionContext.RestoreChangedContextToThread(threadPoolThread, executionContext, null); } ExceptionDispatchInfo exceptionDispatchInfo = null; try { callback(state); } catch (Exception source) { exceptionDispatchInfo = ExceptionDispatchInfo.Capture(source); } ExecutionContext executionContext2 = threadPoolThread._executionContext; threadPoolThread._synchronizationContext = null; if (executionContext2 != null) { ExecutionContext.RestoreChangedContextToThread(threadPoolThread, null, executionContext2); } if (exceptionDispatchInfo != null) { exceptionDispatchInfo.Throw(); } }