AsyncLocal和Async原理解讀

AsyncLocal 的實現很簡單,將AsyncLocal 實例和當前線程的值以鍵值對的形式保存在Thread.CurrentThread.ExecutionContext.m_localValues.中。因爲使用[ThreadStatic] 修飾了 Thread.CurrentThread屬性對應的字段,因此實現了多個線程之間各自維護不一樣的一份數據。同時,在每一次修改AsyncLocal .Value 的時候,都新建了ExecutionContextIAsyncLocalValueMap對象並賦值給當前的線程。異步

如下爲AsyncLocal 的測試代碼

class AsyncLocalTests : Singleton<AsyncLocalTests>,ITestMethod
{
    private readonly AsyncLocal<int> asyncLocalVariable = new AsyncLocal<int>();
    
    public async Task MethodAsync()
    {
        asyncLocalVariable.Value = 88;
        await Task.Run(() =>
        {
            Console.WriteLine($"進入 Task,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
            asyncLocalVariable.Value = 888;
        });
        Console.WriteLine($"await Task 後,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
    }

    public void RunTest()
    {
        asyncLocalVariable.Value = 1;
        Console.WriteLine($"初始值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
        MethodAsync();
        Thread.Sleep(1000);
        Console.WriteLine($"async方法後,值:{asyncLocalVariable.Value};線程Id:{Thread.CurrentThread.ManagedThreadId};ExecutionContext:Hashcode:{Thread.CurrentThread.ExecutionContext.GetHashCode()}");
        Console.ReadKey();
    }
}

測試結果

image

從上面的測試結果咱們看出:

  1. MethodAsync() 異步方法先後,AsyncLocal .Value 的值相同
  2. await MethodAsync() 異步方法先後,AsyncLocal .Value 的值相同
  3. await Task.Run() 先後代碼塊中,AsyncLocal .Value 的值相同

因爲AsyncLocal .Value 是從 Thread.CurrentThread.ExecutionContext 獲取實際的值,那麼理解ExecutionContextasync、Task、Thread 的中流動就十分重要。先說結論,並簡單描述一下緣由:async

一、進入Task.Run()先後,ExecutionContext相同(處於不一樣線程)

緣由:這是由於 Task.Run()Thread.Start() 會捕獲當前線程的 ExecutionContext 傳遞給工做線程,而且在工做線程修改 AsyncLocal .Value 的值, 不會影響原線程的ExecutionContext 。由於每次修改 AsyncLocal .Value 的值,都會新建 ExecutionContext 實例並保存到工做線程ide

二、 MethodAsync() 先後代碼塊的 ExecutionContext 相同(不使用await)

緣由:在狀態機第一次執行先後會備份、恢復ExecutionContext(線程並無進行切換)函數

三、await Task.Run() 先後代碼塊的 ExecutionContext 相同(處於不一樣線程)

緣由:咱們知道await Task.Run()確定位於一個異步方法中,該異步方法會被編譯成一個狀態機,經過狀態的切換,將await先後的代碼分紅了兩步來執行。第一次執行由當前線程執行,在開啓新 Task 後、當前線程返回以前,會保存當前線程的 ExecutionContext,供狀態機第二次執行使用(工做線程)。從第一點咱們知道新建Task實例的時候會捕獲一次ExecutionContext給工做線程,碰到await返回以前會捕獲一次ExecutionContext給狀態機,這兩次捕獲的其實是同一個對象。oop

四、 await MethodAsync() 先後代碼塊的 ExecutionContext 相同(處於不一樣線程)

緣由:在狀態機第一次執行(當前線程)的時候,會捕獲當前線程的 ExecutionContext,供狀態機第二次執行使用(工做線程)。測試

2. async關鍵字對ExecutionContext的影響

async關鍵字其實是編譯器的語法糖,能夠經過Dnspy 反編譯查看去除語法糖的原始代碼。
Dnspy配置以下,去除勾選"反編譯異步方法(async/await)"ui

image

原代碼:

public async Task MethodAsyncWithAwait()
{
    asyncLocalVariable.Value = 88;
    await Task.Run(() =>
    {
        asyncLocalVariable.Value = 888;
    });
    asyncLocalVariable.Value = 8888;
}

去除async/await 語法糖代碼:

異步方法代碼:

能夠看到編譯器生成了一個實現 IAsyncStateMachine 接口的異步狀態機,並生成了一個私有方法,保存了Task.Run()中的的代碼塊。異步方法中實際上作了如下四個步驟:this

  1. 實例化異步狀態機, 將狀態置爲 -1
  2. 建立 AsyncTaskMethodBuilder
  3. 經過 AsyncTaskMethodBuilder.Sratr(ref IAsyncStateMachine)啓動狀態機
  4. 返回 Task
public Task MethodAsyncWithAwait()
{
    AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = new AsyncLocalTests.<MethodAsyncWithAwait>d__1();
    <MethodAsyncWithAwait>d__.<>4__this = this;
    <MethodAsyncWithAwait>d__.<>t__builder = AsyncTaskMethodBuilder.Create();
    <MethodAsyncWithAwait>d__.<>1__state = -1;
    AsyncTaskMethodBuilder <>t__builder = <MethodAsyncWithAwait>d__.<>t__builder;
    <>t__builder.Start<AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref <MethodAsyncWithAwait>d__);
    return <MethodAsyncWithAwait>d__.<>t__builder.Task;
}

狀態機啓動代碼:

能夠看到在執行 stateMachine.MoveNext() 以前備份了當前線程的 _executionContext_synchronizationContext,而且在 finally 代碼塊中恢復了備份的數據。
這樣也就解釋了:在不使用 await 等待異步方法的狀況下,雖然在原線程修改了AsyncLocal .Value 的值,可是離開async方法後,咱們獲取的仍是原來的值。值得注意的是,這裏的備份恢復針對的都是當前線程,而不涉及到工做線程。spa

public void Start<[Nullable(0)] TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    AsyncMethodBuilderCore.Start<TStateMachine>(ref stateMachine);
}
internal static class AsyncMethodBuilderCore
{
[DebuggerStepThrough]
public static void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    if (stateMachine == null)
    {
        ThrowHelper.ThrowArgumentNullException(ExceptionArgument.stateMachine);
    }
    Thread currentThread = Thread.CurrentThread;
    Thread thread = currentThread;
    ExecutionContext executionContext = currentThread._executionContext;
    ExecutionContext executionContext2 = executionContext;
    SynchronizationContext synchronizationContext = currentThread._synchronizationContext;
    try
    {
        stateMachine.MoveNext();
    }
    finally
    {
        SynchronizationContext synchronizationContext2 = synchronizationContext;
        Thread thread2 = thread;
        if (synchronizationContext2 != thread2._synchronizationContext)
        {
            thread2._synchronizationContext = synchronizationContext2;
        }
        ExecutionContext executionContext3 = executionContext2;
        ExecutionContext executionContext4 = thread2._executionContext;
        if (executionContext3 != executionContext4)
        {
            ExecutionContext.RestoreChangedContextToThread(thread2, executionContext3, executionContext4);
        }
    }
}

狀態機代碼:

實際上編譯器將標記爲 async 的方法分紅了兩部分,一部分是 await 以前的代碼(包括新建並啓動啓動Task部分),另外一部分是 await以後的代碼。經過狀態的改變,這兩部分代碼分兩次執行。若是沒有使用await修飾異步方法,那麼該狀態機沒有 else代碼塊, 只會執行一次stateMachine.MoveNext()。能夠看到在第一次執行stateMachine.MoveNext() 以後,當前線程就直接返回了,而後一層層的返回到最外層。這也是爲何說碰到await以後,當前線程就直接返回,固然最內層的返回是在開啓新Task以後。線程

[CompilerGenerated]
private void <MethodAsyncWithAwait>b__1_0()
{
    this.asyncLocalVariable.Value = 888;
}

[CompilerGenerated]
private sealed class <MethodAsyncWithAwait>d__1 : IAsyncStateMachine
{
    void IAsyncStateMachine.MoveNext()
    {
        int num = this.<>1__state;
        try
        {
            TaskAwaiter awaiter;
            if (num != 0)
            {
                this.<>4__this.asyncLocalVariable.Value = 88;
                awaiter = Task.Run(new Action(this.<>4__this.<MethodAsyncWithAwait>b__1_0)).GetAwaiter();
                if (!awaiter.IsCompleted)
                {
                    this.<>1__state = 0;
                    this.<>u__1 = awaiter;
                    AsyncLocalTests.<MethodAsyncWithAwait>d__1 <MethodAsyncWithAwait>d__ = this;
                    this.<>t__builder.AwaitUnsafeOnCompleted<TaskAwaiter, AsyncLocalTests.<MethodAsyncWithAwait>d__1>(ref awaiter, ref <MethodAsyncWithAwait>d__);
                    return;
                }
            }
            else
            {
                awaiter = this.<>u__1;
                this.<>u__1 = default(TaskAwaiter);
                this.<>1__state = -1;
            }
            awaiter.GetResult();
            this.<>4__this.asyncLocalVariable.Value = 8888;
        }
        catch (Exception exception)
        {
            this.<>1__state = -2;
            this.<>t__builder.SetException(exception);
            return;
        }
        this.<>1__state = -2;
        this.<>t__builder.SetResult();
    }
    [DebuggerHidden]
    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
    }

    public int <>1__state;

    public AsyncTaskMethodBuilder <>t__builder;

    public AsyncLocalTests <>4__this;

    private TaskAwaiter <>u__1;
}

咱們能夠看到在第一次執行stateMachine.MoveNext()的時候,會經過當前線程執行 await 以前的代碼塊,並經過Task.Run()啓用工做線程去完成任務。IAsyncStateMachine.MoveNext() 裏面有三句代碼比較重要,這裏先大概描述一下做用:

一、 Task.Run():

新建Task實例、捕獲當前線程的ExecutionContext 保存到Task實例中、啓動新任務

二、 AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted(ref awaiter, ref stateMachine):

將狀態機和當前線程的上下文包裝成 AsyncStateMachineBox:Task 對象,保存到在 Task(第一步新建的 Task實例).m_continuationObject 字段中,最後將其傳遞到 stateMachine.AsyncTaskMethodBuilder.Task屬性中, 這樣外層狀態機能夠經過MethodAsync().GetAwaiter().m_task獲取到內層狀態機的AsyncStateMachineBox:Task 對象,一樣的外層狀態機再次執行本步驟,將自身的 AsyncStateMachineBox:Task 對象賦值給內層 AsyncStateMachineBox:Task 對象的 m_continuationObject字段,這樣的話,就構建了一個單向鏈表,該鏈表保存了每一層異步方法的stateMachineExecutionContext

三、 this.<>t__builder.SetResult():
設置異步方法的結果,並檢查 Task.m_continuationObject 是否爲空,不爲空的狀況下,執行外層狀態機的第二次 stateMachine.MoveNext()。最內層 Task.m_continuationObject的執行會在Task完成以後調用,接下來經過SetResult()一層層調用了外層狀態機的第二次 stateMachine.MoveNext()

3. AwaitUnsafeOnCompleted() 代碼分析

如下只放出了簡化的代碼。這裏首先構建 IAsyncStateMachineBox實例,並將其賦值給 m_task 供外層狀態機使用。IAsyncStateMachineBox實例保存了本層的狀態機,並捕獲了當前線程的ExecutionContextawaiter.m_task 是調用內層異步方法返回的 Task實例。最後在 TaskAwaiter.UnsafeOnCompletedInternal() 方法中,將構建的IAsyncStateMachineBox實例保存到 awaiter(內層).m_task.m_continuationObject字段中,使得內層狀態機指向本層狀態機。由於每一層狀態機都會調用AwaitUnsafeOnCompleted 方法,因此一層層構建了 await 後的全部回調,而且每一層回調的 ExecutionContext 都不一樣。

public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
    where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
    IAsyncStateMachineBox box = GetStateMachineBox(ref stateMachine);

    if ((null != (object)default(TAwaiter)!) && (awaiter is ITaskAwaiter))
    {
        ref TaskAwaiter ta = ref Unsafe.As<TAwaiter, TaskAwaiter>(ref awaiter);
        TaskAwaiter.UnsafeOnCompletedInternal(ta.m_task, box, continueOnCapturedContext: true);
    }
}
private IAsyncStateMachineBox GetStateMachineBox<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
    ExecutionContext? currentContext = ExecutionContext.Capture();

    AsyncStateMachineBox<TStateMachine> box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ?
        CreateDebugFinalizableAsyncStateMachineBox<TStateMachine>() :
        new AsyncStateMachineBox<TStateMachine>();
    m_task = box;
    box.StateMachine = stateMachine;
    box.Context = currentContext;
    
    return box;
}

4. SetResult() 代碼分析

該方法最後調用了 Task .TrySetResult() 方法,讀取了Task.m_continuationObject 來獲取外一層的回調(包括狀態機、ExecutionContext),並經過FinishContinuations()執行外層狀態機的第二次執行。

internal bool TrySetResult([AllowNull] TResult result)
{
    bool result2 = false;
    if (base.AtomicStateUpdate(67108864, 90177536))
    {
        this.m_result = result;
        Interlocked.Exchange(ref this.m_stateFlags, this.m_stateFlags | 16777216);
        Task.ContingentProperties contingentProperties = this.m_contingentProperties;
        if (contingentProperties != null)
        {
            base.NotifyParentIfPotentiallyAttachedTask();
            contingentProperties.SetCompleted();
        }
        base.FinishContinuations();
        result2 = true;
    }
    return result2;
}

FinishContinuations() 方法接下來的調用在 Task.Run() 部分會有介紹。

internal void FinishContinuations()
{
    object obj = Interlocked.Exchange(ref this.m_continuationObject, Task.s_taskCompletionSentinel);
    if (obj != null)
    {
        this.RunContinuations(obj);
    }
}

5. Task.Run() 代碼分析

實際上在Task.Run()內部也是先新建一個Task 實例,而後經過Task.ScheduleAndStart()方法來調度並啓動任務。二者的區別在於傳入的Optionsscheduler 是不相同的。

var task1 = Task.Run(() => { });
//默認配置沒法更改: InternalTaskOptions.QueuedByRuntime, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default
//t.ScheduleAndStart(false);
var task2 = new Task(() => { }, TaskCreationOptions.LongRunning);
//默認配置能夠修改: InternalTaskOptions.None, TaskCreationOptions.None, scheduler:null
task2.Start();
//能夠傳入scheduler 
//默認使用TaskScheduler.Current: 先取[ThreadStatic]Task.InternalCurrent,若是爲空取 TaskScheduler.Default
//t.ScheduleAndStart(true);

經過在Task的構造函數中調用ExecutionContext.Capture() 方法來保存當前線程的ExecutionContextTask實例中,這樣的話,只要將到Task實例做爲參數傳入到工做線程中,工做線程就能夠獲取到ExecutionContext

internal Task(Delegate action, object state, Task parent, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
{
    if (action == null)
    {
        ThrowHelper.ThrowArgumentNullException(ExceptionArgument.action);
    }
    if (parent != null && (creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None)
    {
        this.EnsureContingentPropertiesInitializedUnsafe().m_parent = parent;
    }
    this.TaskConstructorCore(action, state, cancellationToken, creationOptions, internalOptions, scheduler);
    this.CapturedContext = ExecutionContext.Capture();
}

Task.ScheduleAndStart()方法:

Task的調度分兩種狀況:一、配置了TaskCreationOptions.LongRunningTask實例直接新建一個後臺 Thread,並將Task實例做爲啓動參數來啓動工做線程;二、對於沒有配置TaskCreationOptions.LongRunningTask實例,將其加入ThreadPool的線程池,由線程池來調度運行

internal sealed class ThreadPoolTaskScheduler : TaskScheduler
{
    protected internal override void QueueTask(Task task)
    {
        TaskCreationOptions options = task.Options;
        if ((options & TaskCreationOptions.LongRunning) != 0)
        {
            // Run LongRunning tasks on their own dedicated thread.
            Thread thread = new Thread(s_longRunningThreadWork);
            thread.IsBackground = true; // Keep this thread from blocking process shutdown
            thread.Start(task);
        }
        else
        {
            // Normal handling for non-LongRunning tasks.
            bool preferLocal = ((options & TaskCreationOptions.PreferFairness) == 0);
            ThreadPool.UnsafeQueueUserWorkItemInternal(task, preferLocal);
        }
    }
}

雖然已經在Task的構造函數中,捕獲了ExecutionContext,可是對於直接新建的Thread實例,啓動的時候一樣也須要捕獲當前線程的ExecutionContext

public void Start()
{
    this.StartupSetApartmentStateInternal();
    if (this._delegate != null)
    {
        ThreadHelper threadHelper = (ThreadHelper)this._delegate.Target;
        ExecutionContext executionContextHelper = ExecutionContext.Capture();
        threadHelper.SetExecutionContextHelper(executionContextHelper);
    }
    this.StartInternal();
}

線程池調度Task、IThreadPoolWorkItem邏輯:

從線程池取出工做線程,工做線程調用Dispatch()方法,對於IThreadPoolWorkItem,直接執行IThreadPoolWorkItem.Execute() 方法,因此線程池處理IThreadPoolWorkItem是不涉及到上下文切換的。對於 Task ,將ExecutionContext賦值給工做線程,調用委託,而後清除工做線程的上下文,最後調用Finish(true) 來執行任務完成的回調方法。調用鏈路很長,這裏直接跳到RunContinuations()方法。

調用鏈路:

// 最內層 async 狀態機 執行await Task.Run()以後的代碼塊
    //=>Task.Finish(true);
    //=>FinishStageTwo();
    //===>FinishStageThree();
    //=====>FinishContinuations();
    //=======>RunContinuations(continuationObject);
    //=========>AwaitTaskContinuation.RunOrScheduleAction(asyncStateMachineBox, flag); 狀態機的回調執行這個
    //==========>box.ExecuteFromThreadPool(threadPoolThread); 或者 box.MoveNext();

咱們知道,異步方法的最內層確定有一個 await Task。 正是Task.Finish(true)這個方法調用了最內層狀態機,去執行第二次stateMachine.MoveNext()方法,而且在MoveNext()方法中都會調用SetResult() 方法,從而觸發外層狀態機的第二次stateMachine.MoveNext()執行,就這樣一層層的調用完成了全部的層次的回調。能夠看到,工做線程在執行 await Task.Run()/MethodAsnc() 後代碼塊時,傳入的是在 AwaitUnsafeOnCompleted() 方法中捕獲的 ExecutionContext
s_callback字段保存了狀態機的MoveNext()方法。

private class AsyncStateMachineBox<TStateMachine> :Task<TResult>,IAsyncStateMachineBox  where TStateMachine : IAsyncStateMachine
{
    private static readonly ContextCallback s_callback = ExecutionContextCallback;

    private static void ExecutionContextCallback(object? s)
    {
        Unsafe.As<AsyncStateMachineBox<TStateMachine>>(s).StateMachine!.MoveNext();
    }

    public TStateMachine StateMachine = default; 
    
    public ExecutionContext? Context;

    internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread);
        internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) => MoveNext(threadPoolThread);

    public void MoveNext() => MoveNext(threadPoolThread: null);

    private void MoveNext(Thread? threadPoolThread)
    {
        bool loggingOn = AsyncCausalityTracer.LoggingOn;
        if (loggingOn)
        {
            AsyncCausalityTracer.TraceSynchronousWorkStart(this, CausalitySynchronousWork.Execution);
        }

        ExecutionContext? context = Context;
        if (context == null)
        {
            Debug.Assert(StateMachine != null);
            StateMachine.MoveNext();
        }
        else
        {
            if (threadPoolThread is null)
            {
                ExecutionContext.RunInternal(context, s_callback, this);
            }
            else
            {
                ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this);
            }
        }

        if (IsCompleted)
        {
            if (System.Threading.Tasks.Task.s_asyncDebuggingEnabled)
            {
                System.Threading.Tasks.Task.RemoveFromActiveTasks(this);
            }
            StateMachine = default;
            Context = default;
        }

        if (loggingOn)
        {
            AsyncCausalityTracer.TraceSynchronousWorkCompletion(CausalitySynchronousWork.Execution);
        }
    }
}

最後在 RunInternalRunFromThreadPoolDispatchLoop 中,都會使用在AwaitUnsafeOnCompleted()方法裏面捕獲的 ExecutionContext,這也就解釋了爲何在 await Task.Run()/MethodAsync() 先後的代碼塊中,ExecutionContext始終相同。須要注意的一點是,無論在Task 任務執行以後,仍是 await 回調執行以後,都會把工做線程的上下文清空。

internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state)
{
    if (executionContext != null && !executionContext.m_isDefault)
    {
        ExecutionContext.RestoreChangedContextToThread(threadPoolThread, executionContext, null);
    }
    ExceptionDispatchInfo exceptionDispatchInfo = null;
    try
    {
        callback(state);
    }
    catch (Exception source)
    {
        exceptionDispatchInfo = ExceptionDispatchInfo.Capture(source);
    }
    ExecutionContext executionContext2 = threadPoolThread._executionContext;
    threadPoolThread._synchronizationContext = null;
    if (executionContext2 != null)
    {
        ExecutionContext.RestoreChangedContextToThread(threadPoolThread, null, executionContext2);
    }
    if (exceptionDispatchInfo != null)
    {
        exceptionDispatchInfo.Throw();
    }
}
相關文章
相關標籤/搜索