看了上一篇C# Task 是什麼?返回值如何實現? Wait如何實現 咱們提到FinishContinuations方法中會調用TaskContinuation實例,那麼咱們的ContinueWith就應該很是簡單,只須要把TASK放到TaskContinuation結合中就能夠了,ContinueWith能夠是 Action<Task<TResult>>也能夠是 Func<Task<TResult>,TNewResult> ,其中Task<TResult>的實現以下:html
public class Task<TResult> : Task{ //Creates a continuation that executes when the target Task{TResult}" completes public Task ContinueWith(Action<Task<TResult>> continuationAction) { StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; return ContinueWith(continuationAction, TaskScheduler.Current, default(CancellationToken), TaskContinuationOptions.None, ref stackMark); } internal Task ContinueWith(Action<Task<TResult>> continuationAction, TaskScheduler scheduler, CancellationToken cancellationToken,TaskContinuationOptions continuationOptions, ref StackCrawlMark stackMark) { if (continuationAction == null) { throw new ArgumentNullException("continuationAction"); } if (scheduler == null) { throw new ArgumentNullException("scheduler"); } TaskCreationOptions creationOptions; InternalTaskOptions internalOptions; CreationOptionsFromContinuationOptions(continuationOptions,out creationOptions,out internalOptions); Task continuationTask = new ContinuationTaskFromResultTask<TResult>(this, continuationAction, null, creationOptions, internalOptions,ref stackMark); ContinueWithCore(continuationTask, scheduler, cancellationToken, continuationOptions); return continuationTask; } public Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, Object, TNewResult> continuationFunction, Object state) { StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; return ContinueWith<TNewResult>(continuationFunction, state, TaskScheduler.Current, default(CancellationToken), TaskContinuationOptions.None, ref stackMark); } // Same as the above overload, just with a stack mark. internal Task<TNewResult> ContinueWith<TNewResult>(Func<Task<TResult>, Object, TNewResult> continuationFunction, Object state,TaskScheduler scheduler, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions, ref StackCrawlMark stackMark) { if (continuationFunction == null) { throw new ArgumentNullException("continuationFunction"); } if (scheduler == null) { throw new ArgumentNullException("scheduler"); } TaskCreationOptions creationOptions; InternalTaskOptions internalOptions; CreationOptionsFromContinuationOptions(continuationOptions,out creationOptions,out internalOptions); Task<TNewResult> continuationFuture = new ContinuationResultTaskFromResultTask<TResult,TNewResult>(this, continuationFunction, state,creationOptions, internalOptions,ref stackMark); ContinueWithCore(continuationFuture, scheduler, cancellationToken, continuationOptions); return continuationFuture; } }
ContinueWith的核心是調用Task的ContinueWithCore方法,這裏把咱們的Action或Fun包裝成子的Task,好比這裏的ContinuationResultTaskFromResultTask實現【非常標準】以下:app
internal sealed class ContinuationResultTaskFromResultTask<TAntecedentResult, TResult> : Task<TResult> { private Task<TAntecedentResult> m_antecedent; public ContinuationResultTaskFromResultTask( Task<TAntecedentResult> antecedent, Delegate function, object state, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) : base(function, state, Task.InternalCurrentIfAttached(creationOptions), default(CancellationToken), creationOptions, internalOptions, null) { Contract.Requires(function is Func<Task<TAntecedentResult>, TResult> || function is Func<Task<TAntecedentResult>, object, TResult>, "Invalid delegate type in ContinuationResultTaskFromResultTask"); m_antecedent = antecedent; PossiblyCaptureContext(ref stackMark); } internal override void InnerInvoke() { var antecedent = m_antecedent; Contract.Assert(antecedent != null, "No antecedent was set for the ContinuationResultTaskFromResultTask."); m_antecedent = null; antecedent.NotifyDebuggerOfWaitCompletionIfNecessary(); // Invoke the delegate Contract.Assert(m_action != null); var func = m_action as Func<Task<TAntecedentResult>, TResult>; if (func != null) { m_result = func(antecedent); return; } var funcWithState = m_action as Func<Task<TAntecedentResult>, object, TResult>; if (funcWithState != null) { m_result = funcWithState(antecedent, m_stateObject); return; } Contract.Assert(false, "Invalid m_action in ContinuationResultTaskFromResultTask"); } }
ContinuationResultTaskFromResultTask<TAntecedentResult, TResult> 就重寫基類Task的InnerInvoke方法,如今回到Task的ContinueWithCore方法:async
public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable { /// Registers the continuation and possibly runs it (if the task is already finished). internal void ContinueWithCore(Task continuationTask, TaskScheduler scheduler,CancellationToken cancellationToken, TaskContinuationOptions options) { Contract.Requires(continuationTask != null, "Task.ContinueWithCore(): null continuationTask"); Contract.Requires(scheduler != null, "Task.ContinueWithCore(): null scheduler"); Contract.Requires(!continuationTask.IsCompleted, "Did not expect continuationTask to be completed"); // Create a TaskContinuation TaskContinuation continuation = new StandardTaskContinuation(continuationTask, options, scheduler); // If cancellationToken is cancellable, then assign it. if (cancellationToken.CanBeCanceled) { if (IsCompleted || cancellationToken.IsCancellationRequested) { continuationTask.AssignCancellationToken(cancellationToken, null, null); } else { continuationTask.AssignCancellationToken(cancellationToken, this, continuation); } } // In the case of a pre-canceled token, continuationTask will have been completed // in a Canceled state by now. If such is the case, there is no need to go through // the motions of queuing up the continuation for eventual execution. if (!continuationTask.IsCompleted) { if ((this.Options & (TaskCreationOptions)InternalTaskOptions.PromiseTask) != 0 && !(this is ITaskCompletionAction)) { var etwLog = TplEtwProvider.Log; if (etwLog.IsEnabled()) { etwLog.AwaitTaskContinuationScheduled(TaskScheduler.Current.Id, Task.CurrentId ?? 0, continuationTask.Id); } } // Attempt to enqueue the continuation bool continuationQueued = AddTaskContinuation(continuation, addBeforeOthers: false); // If the continuation was not queued (because the task completed), then run it now. if (!continuationQueued) continuation.Run(this, bCanInlineContinuationTask: true); } } private bool AddTaskContinuation(object tc, bool addBeforeOthers) { Contract.Requires(tc != null); if (IsCompleted) return false; // Try to just jam tc into m_continuationObject if ((m_continuationObject != null) || (Interlocked.CompareExchange(ref m_continuationObject, tc, null) != null)) { return AddTaskContinuationComplex(tc, addBeforeOthers); } else return true; } private bool AddTaskContinuationComplex(object tc, bool addBeforeOthers) { Contract.Requires(tc != null, "Expected non-null tc object in AddTaskContinuationComplex"); object oldValue = m_continuationObject; // Logic for the case where we were previously storing a single continuation if ((oldValue != s_taskCompletionSentinel) && (!(oldValue is List<object>))) { List<object> newList = new List<object>(); newList.Add(oldValue); Interlocked.CompareExchange(ref m_continuationObject, newList, oldValue); } // m_continuationObject is guaranteed at this point to be either a List or // s_taskCompletionSentinel. List<object> list = m_continuationObject as List<object>; Contract.Assert((list != null) || (m_continuationObject == s_taskCompletionSentinel),"Expected m_continuationObject to be list or sentinel"); if (list != null) { lock (list) { if (m_continuationObject != s_taskCompletionSentinel) { // Before growing the list we remove possible null entries that are the // result from RemoveContinuations() if (list.Count == list.Capacity) { list.RemoveAll(s_IsTaskContinuationNullPredicate); } if (addBeforeOthers) list.Insert(0, tc); else list.Add(tc); return true; // continuation successfully queued, so return true. } } } // We didn't succeed in queuing the continuation, so return false. return false; } /// Handles everything needed for associating a CancellationToken with a task which is being constructed. /// This method is meant to be be called either from the TaskConstructorCore or from ContinueWithCore private void AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation) { ContingentProperties props = EnsureContingentPropertiesInitialized(needsProtection: false); props.m_cancellationToken = cancellationToken; try { if (AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource) { cancellationToken.ThrowIfSourceDisposed(); } if ((((InternalTaskOptions)Options & (InternalTaskOptions.QueuedByRuntime | InternalTaskOptions.PromiseTask | InternalTaskOptions.LazyCancellation)) == 0)) { if (cancellationToken.IsCancellationRequested) { // Fast path for an already-canceled cancellationToken this.InternalCancel(false); } else { // Regular path for an uncanceled cancellationToken CancellationTokenRegistration ctr; if (antecedent == null) { ctr = cancellationToken.InternalRegisterWithoutEC(s_taskCancelCallback, this); } else { ctr = cancellationToken.InternalRegisterWithoutEC(s_taskCancelCallback,new Tuple<Task, Task, TaskContinuation>(this, antecedent, continuation)); } props.m_cancellationRegistration = new Shared<CancellationTokenRegistration>(ctr); } } } catch { if ((m_parent != null) &&((Options & TaskCreationOptions.AttachedToParent) != 0)&& ((m_parent.Options & TaskCreationOptions.DenyChildAttach) == 0)) { m_parent.DisregardChild(); } throw; } } private readonly static Action<Object> s_taskCancelCallback = new Action<Object>(TaskCancelCallback); private static void TaskCancelCallback(Object o) { var targetTask = o as Task; if (targetTask == null) { var tuple = o as Tuple<Task, Task, TaskContinuation>; if (tuple != null) { targetTask = tuple.Item1; Task antecedentTask = tuple.Item2; TaskContinuation continuation = tuple.Item3; antecedentTask.RemoveContinuation(continuation); } } Contract.Assert(targetTask != null,"targetTask should have been non-null, with the supplied argument being a task or a tuple containing one"); targetTask.InternalCancel(false); } }
ContinueWithCore實現也比較簡單,首先把當前的continuationTask轉換爲StandardTaskContinuation,而後把CancellationToken賦給continuationTask,若是continuationTask沒有完成, 那麼調用AddTaskContinuation把continuationTask加到等待對象中,若是AddTaskContinuation添加失敗,就直接調用continuationTask。 讓我媽來看看StandardTaskContinuation的實現:ide
internal abstract class TaskContinuation { internal abstract void Run(Task completedTask, bool bCanInlineContinuationTask); /// <summary>Tries to run the task on the current thread, if possible; otherwise, schedules it.</summary> protected static void InlineIfPossibleOrElseQueue(Task task, bool needsProtection) { Contract.Requires(task != null); Contract.Assert(task.m_taskScheduler != null); if (needsProtection) { if (!task.MarkStarted()) return; // task has been previously started or canceled. Stop processing. } else { task.m_stateFlags |= Task.TASK_STATE_STARTED; } // Try to inline it but queue if we can't try { if (!task.m_taskScheduler.TryRunInline(task, taskWasPreviouslyQueued: false)) { task.m_taskScheduler.InternalQueueTask(task); } } catch (Exception e) { if (!(e is ThreadAbortException && (task.m_stateFlags & Task.TASK_STATE_THREAD_WAS_ABORTED) != 0)) // this ensures TAEs from QueueTask will be wrapped in TSE { TaskSchedulerException tse = new TaskSchedulerException(e); task.AddException(tse); task.Finish(false); } } } internal abstract Delegate[] GetDelegateContinuationsForDebugger(); } /// <summary>Provides the standard implementation of a task continuation.</summary> internal class StandardTaskContinuation : TaskContinuation { internal readonly Task m_task; internal readonly TaskContinuationOptions m_options; private readonly TaskScheduler m_taskScheduler; internal StandardTaskContinuation(Task task, TaskContinuationOptions options, TaskScheduler scheduler) { Contract.Requires(task != null, "TaskContinuation ctor: task is null"); Contract.Requires(scheduler != null, "TaskContinuation ctor: scheduler is null"); m_task = task; m_options = options; m_taskScheduler = scheduler; if (AsyncCausalityTracer.LoggingOn) AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, m_task.Id, "Task.ContinueWith: " + ((Delegate)task.m_action).Method.Name, 0); if (Task.s_asyncDebuggingEnabled) { Task.AddToActiveTasks(m_task); } } /// <summary>Invokes the continuation for the target completion task.</summary> /// <param name="completedTask">The completed task.</param> /// <param name="bCanInlineContinuationTask">Whether the continuation can be inlined.</param> internal override void Run(Task completedTask, bool bCanInlineContinuationTask) { Contract.Assert(completedTask != null); Contract.Assert(completedTask.IsCompleted, "ContinuationTask.Run(): completedTask not completed"); // Check if the completion status of the task works with the desired // activation criteria of the TaskContinuationOptions. TaskContinuationOptions options = m_options; bool isRightKind = completedTask.IsRanToCompletion ? (options & TaskContinuationOptions.NotOnRanToCompletion) == 0 : (completedTask.IsCanceled ? (options & TaskContinuationOptions.NotOnCanceled) == 0 : (options & TaskContinuationOptions.NotOnFaulted) == 0); // If the completion status is allowed, run the continuation. Task continuationTask = m_task; if (isRightKind) { if (!continuationTask.IsCanceled && AsyncCausalityTracer.LoggingOn) { // Log now that we are sure that this continuation is being ran AsyncCausalityTracer.TraceOperationRelation(CausalityTraceLevel.Important, continuationTask.Id, CausalityRelation.AssignDelegate); } continuationTask.m_taskScheduler = m_taskScheduler; if (bCanInlineContinuationTask && // inlining is allowed by the caller (options & TaskContinuationOptions.ExecuteSynchronously) != 0) // synchronous execution was requested by the continuation's creator { InlineIfPossibleOrElseQueue(continuationTask, needsProtection: true); } else { try { continuationTask.ScheduleAndStart(needsProtection: true); } catch (TaskSchedulerException) { // No further action is necessary -- ScheduleAndStart() already transitioned the // task to faulted. But we want to make sure that no exception is thrown from here. } } } // Otherwise, the final state of this task does not match the desired // continuation activation criteria; cancel it to denote this. else continuationTask.InternalCancel(false); } internal override Delegate[] GetDelegateContinuationsForDebugger() { if (m_task.m_action == null) { return m_task.GetDelegateContinuationsForDebugger(); } return new Delegate[] { m_task.m_action as Delegate }; } }
StandardTaskContinuation的實現很是簡單,而Task的AssignCancellationToken方法也沒什麼能夠說的,只是須要注意下一下回調s_taskCancelCallback。Task的AddTaskContinuation方法首先檢查當前Task是否結束,結束了就不用再調用AddTaskContinuationComplex方法了,直接調用continuation.Run方法,AddTaskContinuationComplex方法會把task添加到m_continuationObject中,最後FinishContinuations在調用m_continuationObject中的TaskContinuation.Run方法。ui
總結一下:ContinueWith方法主要調用ContinueWithCore方法,ContinueWithCore方法主要是調用AddTaskContinuation,AddTaskContinuation方法把Task加到m_continuationObject,【若是主的Task已經完成,那麼這裏AddTaskContinuation返回false,則直接調用TaskContinuation.Run】,當主的Task完成時會調用FinishContinuations方法,FinishContinuations方法會檢測m_continuationObject中TaskContinuation對象,一次調用它們的Run方法。this