C# Task WaitAll和WaitAny

Task 有靜態方法WaitAll和WaitAny,主要用於等待其餘Task完成後作一些事情,先看看其實現部分吧:數組

public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable
{
   //Waits for all of the provided Task objects to complete execution.
    public static void WaitAll(params Task[] tasks)
    {
        WaitAll(tasks, Timeout.Infinite);
    }
    
    //Waits for any of the provided Task objects to complete execution.Return The index of the completed task in the tasks array argument.
   public static int WaitAny(params Task[] tasks)
    {
        int waitResult = WaitAny(tasks, Timeout.Infinite);
        Contract.Assert(tasks.Length == 0 || waitResult != -1, "expected wait to succeed");
        return waitResult;
    }
    
    //true if all of the Task instances completed execution within the allotted time; otherwise, false.
    public static bool WaitAll(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken)
    {
        if (tasks == null)
        {
            throw new ArgumentNullException("tasks");
        }
        if (millisecondsTimeout < -1)
        {
            throw new ArgumentOutOfRangeException("millisecondsTimeout");
        }
        Contract.EndContractBlock();
        cancellationToken.ThrowIfCancellationRequested(); // early check before we make any allocations

        List<Exception> exceptions = null;
        List<Task> waitedOnTaskList = null;
        List<Task> notificationTasks = null;

        // If any of the waited-upon tasks end as Faulted or Canceled, set these to true.
        bool exceptionSeen = false, cancellationSeen = false;
        bool returnValue = true;

        // Collects incomplete tasks in "waitedOnTaskList"
        for (int i = tasks.Length - 1; i >= 0; i--)
        {
            Task task = tasks[i];
            if (task == null)
            {
                throw new ArgumentException(Environment.GetResourceString("Task_WaitMulti_NullTask"), "tasks");
            }

            bool taskIsCompleted = task.IsCompleted;
            if (!taskIsCompleted)
            {
                // try inlining the task only if we have an infinite timeout and an empty cancellation token
                if (millisecondsTimeout != Timeout.Infinite || cancellationToken.CanBeCanceled)
                {
                    AddToList(task, ref waitedOnTaskList, initSize: tasks.Length);
                }
                else
                {
                    // We are eligible for inlining.  If it doesn't work, we'll do a full wait.
                    taskIsCompleted = task.WrappedTryRunInline() && task.IsCompleted; // A successful TryRunInline doesn't guarantee completion
                    if (!taskIsCompleted) AddToList(task, ref waitedOnTaskList, initSize: tasks.Length);
                }
            }
            if (taskIsCompleted)
            {
                if (task.IsFaulted) exceptionSeen = true;
                else if (task.IsCanceled) cancellationSeen = true;
                if (task.IsWaitNotificationEnabled) AddToList(task, ref notificationTasks, initSize: 1);
            }
        }

        if (waitedOnTaskList != null)
        {
            // Block waiting for the tasks to complete.
            returnValue = WaitAllBlockingCore(waitedOnTaskList, millisecondsTimeout, cancellationToken);

            // If the wait didn't time out, ensure exceptions are propagated, and if a debugger is
            // attached and one of these tasks requires it, that we notify the debugger of a wait completion.
            if (returnValue)
            {
                foreach (var task in waitedOnTaskList)
                {
                    if (task.IsFaulted) exceptionSeen = true;
                    else if (task.IsCanceled) cancellationSeen = true;
                    if (task.IsWaitNotificationEnabled) AddToList(task, ref notificationTasks, initSize: 1);
                }
            }

           GC.KeepAlive(tasks);
        }

        if (returnValue && notificationTasks != null)
        {
            foreach (var task in notificationTasks)
            {
                if (task.NotifyDebuggerOfWaitCompletionIfNecessary()) break;
            }
        }

        // If one or more threw exceptions, aggregate and throw them.
        if (returnValue && (exceptionSeen || cancellationSeen))
        {    
            if (!exceptionSeen) cancellationToken.ThrowIfCancellationRequested();

            // Now gather up and throw all of the exceptions.
            foreach (var task in tasks) AddExceptionsForCompletedTask(ref exceptions, task);
            Contract.Assert(exceptions != null, "Should have seen at least one exception");
            throw new AggregateException(exceptions);
        }

        return returnValue;
    }
    
    public static int WaitAny(Task[] tasks, int millisecondsTimeout, CancellationToken cancellationToken)
    {
        if (tasks == null)
        {
            throw new ArgumentNullException("tasks");
        }
        if (millisecondsTimeout < -1)
        {
            throw new ArgumentOutOfRangeException("millisecondsTimeout");
        }
        Contract.EndContractBlock();
        cancellationToken.ThrowIfCancellationRequested(); // early check before we make any allocations

        int signaledTaskIndex = -1;
        for (int taskIndex = 0; taskIndex < tasks.Length; taskIndex++)
        {
            Task task = tasks[taskIndex];
            if (task == null)
            {
                throw new ArgumentException(Environment.GetResourceString("Task_WaitMulti_NullTask"), "tasks");
            }
            if (signaledTaskIndex == -1 && task.IsCompleted)
            {
                signaledTaskIndex = taskIndex;
            }
        }

        if (signaledTaskIndex == -1 && tasks.Length != 0)
        {
           Task<Task> firstCompleted = TaskFactory.CommonCWAnyLogic(tasks);
            bool waitCompleted = firstCompleted.Wait(millisecondsTimeout, cancellationToken);
            if (waitCompleted)
            {
                Contract.Assert(firstCompleted.Status == TaskStatus.RanToCompletion);
                signaledTaskIndex = Array.IndexOf(tasks, firstCompleted.Result);
                Contract.Assert(signaledTaskIndex >= 0);
            }
        }
        GC.KeepAlive(tasks);
        return signaledTaskIndex;
    }
    
    //Performs a blocking WaitAll on the vetted list of tasks.true if all of the tasks completed; otherwise, false.
    private static bool WaitAllBlockingCore(List<Task> tasks, int millisecondsTimeout, CancellationToken cancellationToken)
    {
        Contract.Assert(tasks != null, "Expected a non-null list of tasks");
        Contract.Assert(tasks.Count > 0, "Expected at least one task");
        bool waitCompleted = false;
        var mres = new SetOnCountdownMres(tasks.Count);
        try
        {
            foreach (var task in tasks)
            {
              task.AddCompletionAction(mres, addBeforeOthers: true);
            }
           waitCompleted = mres.Wait(millisecondsTimeout, cancellationToken);
        }
        finally
        {
            if (!waitCompleted)
            {
                foreach (var task in tasks)
                {
                    if (!task.IsCompleted) task.RemoveContinuation(mres);
                }
            }
        }
        return waitCompleted;
    }
    
    private sealed class SetOnCountdownMres : ManualResetEventSlim, ITaskCompletionAction
    {
        private int _count;

        internal SetOnCountdownMres(int count)
        {
            Contract.Assert(count > 0, "Expected count > 0");
            _count = count;
        }

        public void Invoke(Task completingTask)
        {
            if (Interlocked.Decrement(ref _count) == 0) Set();
            Contract.Assert(_count >= 0, "Count should never go below 0");
        }
    }    
}

咱們首先看看WaitAll的方法,檢查Task數組中每一個Task實例,檢查Task是否已經完成,若是沒有完成就把Task添加到waitedOnTaskList集合中,若是waitedOnTaskList集合有元素那麼,咱們就調用WaitAllBlockingCore來實現真正的等待,當代完畢後咱們須要檢查notificationTasks集合是否有元素,若是有則依次調用Task的NotifyDebuggerOfWaitCompletionIfNecessary方法WaitAllBlockingCore實現阻塞是依靠SetOnCountdownMres實例的【和CountdownEvent思路同樣,每次調用Invoke的時候,就把計數器_count減1,當_count==0時就調用Set方法】,在WaitAllBlockingCore方法退出前,須要檢查Task是否都完成,若是有沒有完成的須要移除相應task的SetOnCountdownMres實例【if (!task.IsCompleted) task.RemoveContinuation(mres);】,SetOnCountdownMres的Invoke方法是在Task的FinishContinuations方法調用的【 ITaskCompletionAction singleTaskCompletionAction = continuationObject as ITaskCompletionAction; singleTaskCompletionAction.Invoke(this);注意FinishContinuations方法是在FinishStageThree中調用】注意裏面的GC.KeepAlive(tasks)promise

如今咱們來看看WaitAny方法的實現,首先咱們須要循環Task[],檢查裏面是否有Task已經完成,若是有則直接返回,否者咱們調用Task<Task> firstCompleted = TaskFactory.CommonCWAnyLogic(tasks);返回一個Task,而後調用該Task的Wait方法【bool waitCompleted = firstCompleted.Wait(millisecondsTimeout, cancellationToken);】,讓咱們來看看CommonCWAnyLogic的實現:app

public class TaskFactory
{
    internal static Task<Task> CommonCWAnyLogic(IList<Task> tasks)
    {
        Contract.Requires(tasks != null);
        var promise = new CompleteOnInvokePromise(tasks);
        bool checkArgsOnly = false;
        int numTasks = tasks.Count;
        for(int i=0; i<numTasks; i++)
        {
            var task = tasks[i];
            if (task == null) throw new ArgumentException(Environment.GetResourceString("Task_MultiTaskContinuation_NullTask"), "tasks");

            if (checkArgsOnly) continue;

            // If the promise has already completed, don't bother with checking any more tasks.
            if (promise.IsCompleted)
            {
                checkArgsOnly = true;
            }
            // If a task has already completed, complete the promise.
            else if (task.IsCompleted)
            {
                promise.Invoke(task);
                checkArgsOnly = true;
            }
            // Otherwise, add the completion action and keep going.
            else task.AddCompletionAction(promise);
        }
        return promise;
    }
     internal sealed class CompleteOnInvokePromise : Task<Task>, ITaskCompletionAction
    {
        private IList<Task> _tasks; // must track this for cleanup
        private int m_firstTaskAlreadyCompleted;

        public CompleteOnInvokePromise(IList<Task> tasks) : base()
        {
            Contract.Requires(tasks != null, "Expected non-null collection of tasks");
            _tasks = tasks;
            
            if (AsyncCausalityTracer.LoggingOn)
                AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "TaskFactory.ContinueWhenAny", 0);

            if (Task.s_asyncDebuggingEnabled)
            {
                AddToActiveTasks(this);
            }
        }

        public void Invoke(Task completingTask)
        {
            if (Interlocked.CompareExchange(ref m_firstTaskAlreadyCompleted, 1, 0) == 0)
            {
                if (AsyncCausalityTracer.LoggingOn)
                {
                    AsyncCausalityTracer.TraceOperationRelation(CausalityTraceLevel.Important, this.Id, CausalityRelation.Choice);
                    AsyncCausalityTracer.TraceOperationCompletion(CausalityTraceLevel.Required, this.Id, AsyncCausalityStatus.Completed);
                }

                if (Task.s_asyncDebuggingEnabled)
                {
                    RemoveFromActiveTasks(this.Id);
                }

                bool success = TrySetResult(completingTask);
                Contract.Assert(success, "Only one task should have gotten to this point, and thus this must be successful.");

                var tasks = _tasks;
                int numTasks = tasks.Count;
                for (int i = 0; i < numTasks; i++)
                {
                    var task = tasks[i];
                    if (task != null && // if an element was erroneously nulled out concurrently, just skip it; worst case is we don't remove a continuation
                        !task.IsCompleted) task.RemoveContinuation(this);
                }
                _tasks = null;

            }
        }
    }
}

CommonCWAnyLogic首先實例化CompleteOnInvokePromise【var promise = new CompleteOnInvokePromise(tasks)】,檢查promise 是否完成,檢查每一個Task是否完成,否者就把promise做爲Task的Continue Task【這裏能夠理解爲每一個Task都有一個相同Continue Task】,而CompleteOnInvokePromise本身的wait是在WaitAny中的firstCompleted.Wait(millisecondsTimeout, cancellationToken)方法,當其中其中一個Task完成後,在Task的FinishContinuations方法調用的CompleteOnInvokePromise的Invoke【一旦觸發後就須要移調其餘task上的CompleteOnInvokePromise,如這裏的task.RemoveContinuation(this)】。在CompleteOnInvokePromise的Invoke方法咱們調用TrySetResult(completingTask)方法,期實現以下:async

public class Task<TResult> : Task
{
    internal bool TrySetResult(TResult result)
    {
        if (IsCompleted) return false;
        Contract.Assert(m_action == null, "Task<T>.TrySetResult(): non-null m_action");
        if (AtomicStateUpdate(TASK_STATE_COMPLETION_RESERVED,
                TASK_STATE_COMPLETION_RESERVED | TASK_STATE_RAN_TO_COMPLETION | TASK_STATE_FAULTED | TASK_STATE_CANCELED))
        {
            m_result = result;
            Interlocked.Exchange(ref m_stateFlags, m_stateFlags | TASK_STATE_RAN_TO_COMPLETION);
            
            var cp = m_contingentProperties;
            if (cp != null) cp.SetCompleted();
            FinishStageThree();
            return true;
        }
        return false;
    }
}

這裏的TrySetResult方法裏面調用FinishStageThree方法,以保證Task後面的Continue Task的執行。ide

相關文章
相關標籤/搜索