Parallel.Invoke應該是Parallel幾個方法中最簡單的一個了,咱們來看看它的實現,爲了方法你們理解,我儘可能保留源碼中的註釋:多線程
public static class Parallel { internal static int s_forkJoinContextID; internal const int DEFAULT_LOOP_STRIDE = 16; internal static ParallelOptions s_defaultParallelOptions = new ParallelOptions(); public static void Invoke(params Action[] actions) { Invoke(s_defaultParallelOptions, actions); } //Executes each of the provided actions, possibly in parallel. public static void Invoke(ParallelOptions parallelOptions, params Action[] actions) { if (actions == null) { throw new ArgumentNullException("actions"); } if (parallelOptions == null) { throw new ArgumentNullException("parallelOptions"); } // Throw an ODE if we're passed a disposed CancellationToken. if (parallelOptions.CancellationToken.CanBeCanceled && AppContextSwitches.ThrowExceptionIfDisposedCancellationTokenSource) { parallelOptions.CancellationToken.ThrowIfSourceDisposed(); } // Quit early if we're already canceled -- avoid a bunch of work. if (parallelOptions.CancellationToken.IsCancellationRequested) throw new OperationCanceledException(parallelOptions.CancellationToken); // We must validate that the actions array contains no null elements, and also // make a defensive copy of the actions array. Action[] actionsCopy = new Action[actions.Length]; for (int i = 0; i < actionsCopy.Length; i++) { actionsCopy[i] = actions[i]; if (actionsCopy[i] == null) { throw new ArgumentException(Environment.GetResourceString("Parallel_Invoke_ActionNull")); } } // ETW event for Parallel Invoke Begin int forkJoinContextID = 0; Task callerTask = null; if (TplEtwProvider.Log.IsEnabled()) { forkJoinContextID = Interlocked.Increment(ref s_forkJoinContextID); callerTask = Task.InternalCurrent; TplEtwProvider.Log.ParallelInvokeBegin((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), forkJoinContextID, TplEtwProvider.ForkJoinOperationType.ParallelInvoke, actionsCopy.Length); } // If we have no work to do, we are done. if (actionsCopy.Length < 1) return; // In the algorithm below, if the number of actions is greater than this, we automatically // use Parallel.For() to handle the actions, rather than the Task-per-Action strategy. const int SMALL_ACTIONCOUNT_LIMIT = 10; try { // If we've gotten this far, it's time to process the actions. if ((actionsCopy.Length > SMALL_ACTIONCOUNT_LIMIT) || (parallelOptions.MaxDegreeOfParallelism != -1 && parallelOptions.MaxDegreeOfParallelism < actionsCopy.Length)) { // Used to hold any exceptions encountered during action processing ConcurrentQueue<Exception> exceptionQ = null; // will be lazily initialized if necessary // This is more efficient for a large number of actions, or for enforcing MaxDegreeOfParallelism. try { // Launch a self-replicating task to handle the execution of all actions. // The use of a self-replicating task allows us to use as many cores // as are available, and no more. The exception to this rule is // that, in the case of a blocked action, the ThreadPool may inject // extra threads, which means extra tasks can run. int actionIndex = 0; ParallelForReplicatingTask rootTask = new ParallelForReplicatingTask(parallelOptions, delegate { // Each for-task will pull an action at a time from the list int myIndex = Interlocked.Increment(ref actionIndex); // = index to use + 1 while (myIndex <= actionsCopy.Length) { // Catch and store any exceptions. If we don't catch them, the self-replicating // task will exit, and that may cause other SR-tasks to exit. // And (absent cancellation) we want all actions to execute. try { actionsCopy[myIndex - 1](); } catch (Exception e) { LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); exceptionQ.Enqueue(e); } // Check for cancellation. If it is encountered, then exit the delegate. if (parallelOptions.CancellationToken.IsCancellationRequested) throw new OperationCanceledException(parallelOptions.CancellationToken); // You're still in the game. Grab your next action index. myIndex = Interlocked.Increment(ref actionIndex); } }, TaskCreationOptions.None, InternalTaskOptions.SelfReplicating); rootTask.RunSynchronously(parallelOptions.EffectiveTaskScheduler); rootTask.Wait(); } catch (Exception e) { LazyInitializer.EnsureInitialized<ConcurrentQueue<Exception>>(ref exceptionQ, () => { return new ConcurrentQueue<Exception>(); }); // Since we're consuming all action exceptions, there are very few reasons that // we would see an exception here. Two that come to mind: // (1) An OCE thrown by one or more actions (AggregateException thrown) // (2) An exception thrown from the ParallelForReplicatingTask constructor // (regular exception thrown). // We'll need to cover them both. AggregateException ae = e as AggregateException; if (ae != null) { // Strip off outer container of an AggregateException, because downstream // logic needs OCEs to be at the top level. foreach (Exception exc in ae.InnerExceptions) exceptionQ.Enqueue(exc); } else { exceptionQ.Enqueue(e); } } // If we have encountered any exceptions, then throw. if ((exceptionQ != null) && (exceptionQ.Count > 0)) { ThrowIfReducableToSingleOCE(exceptionQ, parallelOptions.CancellationToken); throw new AggregateException(exceptionQ); } } else { // This is more efficient for a small number of actions and no DOP support // Initialize our array of tasks, one per action. Task[] tasks = new Task[actionsCopy.Length]; // One more check before we begin... if (parallelOptions.CancellationToken.IsCancellationRequested) throw new OperationCanceledException(parallelOptions.CancellationToken); // Launch all actions as tasks for (int i = 1; i < tasks.Length; i++) { tasks[i] = Task.Factory.StartNew(actionsCopy[i], parallelOptions.CancellationToken, TaskCreationOptions.None, InternalTaskOptions.None, parallelOptions.EffectiveTaskScheduler); } // Optimization: Use current thread to run something before we block waiting for all tasks. tasks[0] = new Task(actionsCopy[0]); tasks[0].RunSynchronously(parallelOptions.EffectiveTaskScheduler); // Now wait for the tasks to complete. This will not unblock until all of // them complete, and it will throw an exception if one or more of them also // threw an exception. We let such exceptions go completely unhandled. try { if (tasks.Length <= 4) { // for 4 or less tasks, the sequential waitall version is faster Task.FastWaitAll(tasks); } else { // otherwise we revert to the regular WaitAll which delegates the multiple wait to the cooperative event. Task.WaitAll(tasks); } } catch (AggregateException aggExp) { // see if we can combine it into a single OCE. If not propagate the original exception ThrowIfReducableToSingleOCE(aggExp.InnerExceptions, parallelOptions.CancellationToken); throw; } finally { for (int i = 0; i < tasks.Length; i++) { if (tasks[i].IsCompleted) tasks[i].Dispose(); } } } } finally { // ETW event for Parallel Invoke End if (TplEtwProvider.Log.IsEnabled()) { TplEtwProvider.Log.ParallelInvokeEnd((callerTask != null ? callerTask.m_taskScheduler.Id : TaskScheduler.Current.Id), (callerTask != null ? callerTask.Id : 0), forkJoinContextID); } } } }
Parallel.Invoke 的實現很是簡單,若是咱們Action的個數超過10或者咱們制定的並行度MaxDegreeOfParallelism小於Action的個數,咱們採用ParallelForReplicatingTask來完成,不然咱們直接把每一個Action包裝成Task【Task.Factory.StartNew】。這裏咱們主要看看ParallelForReplicatingTask的實現。less
internal class ParallelForReplicatingTask : Task { private int m_replicationDownCount; // downcounter to control replication internal ParallelForReplicatingTask( ParallelOptions parallelOptions, Action action, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions) : base(action, null, Task.InternalCurrent, default(CancellationToken), creationOptions, internalOptions | InternalTaskOptions.SelfReplicating, null) { m_replicationDownCount = parallelOptions.EffectiveMaxConcurrencyLevel; StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; PossiblyCaptureContext(ref stackMark); } internal override bool ShouldReplicate() { if (m_replicationDownCount == -1) return true; // "run wild" if (m_replicationDownCount > 0) // Decrement and return true if not called with 0 downcount { m_replicationDownCount--; return true; } return false; // We're done replicating } internal override Task CreateReplicaTask(Action<object> taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler, TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) { return new ParallelForReplicaTask(taskReplicaDelegate, stateObject, parentTask, taskScheduler, creationOptionsForReplica, internalOptionsForReplica); } } internal class ParallelForReplicaTask : Task { internal object m_stateForNextReplica; internal object m_stateFromPreviousReplica; internal Task m_handedOverChildReplica; internal ParallelForReplicaTask(Action<object> taskReplicaDelegate, Object stateObject, Task parentTask, TaskScheduler taskScheduler, TaskCreationOptions creationOptionsForReplica, InternalTaskOptions internalOptionsForReplica) : base(taskReplicaDelegate, stateObject, parentTask, default(CancellationToken), creationOptionsForReplica, internalOptionsForReplica, taskScheduler) { } internal override Object SavedStateForNextReplica { get { return m_stateForNextReplica; } set { m_stateForNextReplica = value; } } internal override Object SavedStateFromPreviousReplica { get { return m_stateFromPreviousReplica; } set { m_stateFromPreviousReplica = value; } } internal override Task HandedOverChildReplica { get { return m_handedOverChildReplica; } set { m_handedOverChildReplica = value; } } }
ParallelForReplicatingTask的ShouldReplicate方法表示當前Task是否能夠繼續Replicate,每Replicate一次並行計數器減1,調用CreateReplicaTask方法建立新的ParallelForReplicaTask實例,最後調用Task的RunSynchronously方法,RunSynchronously【ExecuteSelfReplicating】纔是核心實現。ide
public class Task : IThreadPoolWorkItem, IAsyncResult, IDisposable { /* Runs the Task synchronously on the current TaskScheduler. A task may only be started and run only once. Any attempts to schedule a task a second time will result in an exception.If the target scheduler does not support running this Task on the current thread, the Task will be scheduled for execution on the scheduler, and the current thread will block until the Task has completed execution. */ public void RunSynchronously() { InternalRunSynchronously(TaskScheduler.Current, waitForCompletion: true); } public void RunSynchronously(TaskScheduler scheduler) { if (scheduler == null) { throw new ArgumentNullException("scheduler"); } Contract.EndContractBlock(); InternalRunSynchronously(scheduler, waitForCompletion: true); } internal void InnerInvokeWithArg(Task childTask) { InnerInvoke(); } private static void ExecuteSelfReplicating(Task root) { TaskCreationOptions creationOptionsForReplicas = root.CreationOptions | TaskCreationOptions.AttachedToParent; InternalTaskOptions internalOptionsForReplicas = InternalTaskOptions.ChildReplica | // child replica flag disables self replication for the replicas themselves. InternalTaskOptions.SelfReplicating | // we still want to identify this as part of a self replicating group InternalTaskOptions.QueuedByRuntime; // we queue and cancel these tasks internally, so don't allow CT registration to take place // Important Note: The child replicas we launch from here will be attached the root replica (by virtue of the root.CreateReplicaTask call) // because we need the root task to receive all their exceptions, and to block until all of them return // This variable is captured in a closure and shared among all replicas. bool replicasAreQuitting = false; // Set up a delegate that will form the body of the root and all recursively created replicas. Action<object> taskReplicaDelegate = null; taskReplicaDelegate = delegate { Task currentTask = Task.InternalCurrent; // Check if a child task has been handed over by a prematurely quiting replica that we might be a replacement for. Task childTask = currentTask.HandedOverChildReplica; if (childTask == null) { // Apparently we are not a replacement task. This means we need to queue up a child task for replication to progress // Down-counts a counter in the root task. if (!root.ShouldReplicate()) return; // If any of the replicas have quit, we will do so ourselves. if (Volatile.Read(ref replicasAreQuitting)) { return; } // Propagate a copy of the context from the root task. It may be null if flow was suppressed. ExecutionContext creatorContext = root.CapturedContext; childTask = root.CreateReplicaTask(taskReplicaDelegate, root.m_stateObject, root, root.ExecutingTaskScheduler, creationOptionsForReplicas, internalOptionsForReplicas); childTask.CapturedContext = CopyExecutionContext(creatorContext); childTask.ScheduleAndStart(false); } // Finally invoke the meat of the task. // Note that we are directly calling root.InnerInvoke() even though we are currently be in the action delegate of a child replica // This is because the actual work was passed down in that delegate, and the action delegate of the child replica simply contains this // replication control logic. try { // passing in currentTask only so that the parallel debugger can find it root.InnerInvokeWithArg(currentTask); } catch (Exception exn) { // Record this exception in the root task's exception list root.HandleException(exn); if (exn is ThreadAbortException) { // If this is a ThreadAbortException it will escape this catch clause, causing us to skip the regular Finish codepath // In order not to leave the task unfinished, we now call FinishThreadAbortedTask here currentTask.FinishThreadAbortedTask(false, true); } } Object savedState = currentTask.SavedStateForNextReplica; // check for premature exit if (savedState != null) { // the replica decided to exit early // we need to queue up a replacement, attach the saved state, and yield the thread right away Task replacementReplica = root.CreateReplicaTask(taskReplicaDelegate, root.m_stateObject, root, root.ExecutingTaskScheduler, creationOptionsForReplicas, internalOptionsForReplicas); // Propagate a copy of the context from the root task to the replacement task ExecutionContext creatorContext = root.CapturedContext; replacementReplica.CapturedContext = CopyExecutionContext(creatorContext); replacementReplica.HandedOverChildReplica = childTask; replacementReplica.SavedStateFromPreviousReplica = savedState; replacementReplica.ScheduleAndStart(false); } else { // The replica finished normally, which means it can't find more work to grab. // Time to mark replicas quitting replicasAreQuitting = true; // InternalCancel() could conceivably throw in the underlying scheduler's TryDequeue() method. // If it does, then make sure that we record it. try { childTask.InternalCancel(true); } catch (Exception e) { // Apparently TryDequeue threw an exception. Before propagating that exception, InternalCancel should have // attempted an atomic state transition and a call to CancellationCleanupLogic() on this task. So we know // the task was properly cleaned up if it was possible. // // Now all we need to do is to Record the exception in the root task. root.HandleException(e); } // No specific action needed if the child could not be canceled // because we attached it to the root task, which should therefore be receiving any exceptions from the child, // and root.wait will not return before this child finishes anyway. } }; // // Now we execute as the root task // taskReplicaDelegate(null); } }
Task的RunSynchronously的實現路徑有如下兩種方式:oop
Task.RunSynchronously->Task.InternalRunSynchronously->TaskScheduler.TryRunInline->ThreadPoolTaskScheduler.TryExecuteTaskInline->Task.ExecuteWithThreadLocal->Task.ExecuteEntry->Task.Execute
Task.RunSynchronously->Task.InternalRunSynchronously->TaskScheduler.TryRunInline(false)->TaskScheduler.InternalQueueTask->ThreadPoolTaskScheduler.QueueTask->Task.IThreadPoolWorkItem.ExecuteWorkItem()->Task.ExecuteWithThreadLocal->Task.ExecuteEntry->Task.Execute,說白了最終會調用Task的Execute方法,在Execute方法中會檢查 IsSelfReplicatingRoot是否爲true【在實例ParallelForReplicatingTask時指定了參數InternalTaskOptions.SelfReplicating】,若是是則調用ExecuteSelfReplicating方法。ui
ExecuteSelfReplicating方法首先檢查當前Task的ExecuteSelfReplicating屬性是否爲空【該屬性也是一個Task,若是爲空表示這個task運行的Action已經結束】,不爲空時 咱們檢查Root Task是否還須要 Replicate【調用ParallelForReplicatingTask的ShouldReplicate,root.ShouldReplicate()】,而後在檢查變量replicasAreQuitting是否退出循環【if (Volatile.Read(ref replicasAreQuitting)) 多線程讀】,否者調用ParallelForReplicatingTask的CreateReplicaTask建立子任務,最後調用root.InnerInvokeWithArg(currentTask);,其實這裏就是調用Parallel.Invoke裏面的的delegate委託,每次調用只執行一個Action,currentTask.SavedStateForNextReplica這一句在Parallel.Invoke沒有什麼意義,可是在Parallel.For裏面表示下一個要執行的Task,ParallelForReplicatingTask會執行一個Action,它能夠建立子的ParallelForReplicatingTask,每一個ParallelForReplicatingTask實例也會執行一個Action。實際上我沒有多少個Action 就會調用多少次Task.Execute,ParallelForReplicatingTask實例個數很大程度上取決於並行度參數EffectiveMaxConcurrencyLevel,也決定ExecuteSelfReplicating調用的次數。以下線程代用流程以下: 流程1永遠只調用1次【 rootTask.RunSynchronously】,流程2是ExecuteSelfReplicating代用次數,流程3是普通InnerInvoke調用次數this