正在 await 一批任務,但願在每一個任務完成時對它作一些處理。另外,但願在任務一完成就當即進行處理,而不須要等待其餘任務。html
問題的重點在於但願任務完成以後當即進行處理,而不去等待其餘任務。
這裏還沿用文中的例子。
等待幾秒鐘以後返回等待的秒數,以後當即打印任務等待的秒數。
等待的函數以下git
static async Task<int> DelayAndReturnAsync(int val) { await Task.Delay(TimeSpan.FromSeconds(val)); return val; }
如下方法執行以後的打印結果是「2」, 「3」, 「1」。想獲得結果「1」, 「2」, 「3」應該如何實現。github
static async Task ProcessTasksAsync() { // 建立任務隊列。 Task<int> taskA = DelayAndReturnAsync(2); Task<int> taskB = DelayAndReturnAsync(3); Task<int> taskC = DelayAndReturnAsync(1); var tasks = new[] { taskA, taskB, taskC }; // 按順序 await 每一個任務。 foreach (var task in tasks) { var result = await task; Trace.WriteLine(result); } }
文中給了兩種解決方案。一種是抽出更高級的async方法,一種是藉助做者的nuget拓展。做者還推薦了另外兩個博客文章。
Processing tasks as they complete
ORDERING BY COMPLETION, AHEAD OF TIME
這兩篇文章介紹了更多處理方法。算法
static async Task AwaitAndProcessAsync(Task<int> task) { var result = await task; Trace.WriteLine(result); }
將執行和處理抽象出來,藉助Task.WhenAll和LINQ併發執行。併發
var processingTasks = (from t in tasks select AwaitAndProcessAsync(t)).ToArray(); // 等待所有處理過程的完成。 await Task.WhenAll(processingTasks);
或者async
var processingTasks = tasks.Select(async t => { var result = await t; Trace.WriteLine(result); }).ToArray(); // 等待所有處理過程的完成。 await Task.WhenAll(processingTasks);
推薦預發佈版本:https://www.nuget.org/packages/Nito.AsyncEx/5.0.0-pre-06
須要添加引用using Nito.AsyncEx;
函數
static async Task UseOrderByCompletionAsync() { // 建立任務隊列。 Task<int> taskA = DelayAndReturnAsync(2); Task<int> taskB = DelayAndReturnAsync(3); Task<int> taskC = DelayAndReturnAsync(1); var tasks = new[] { taskA, taskB, taskC }; // 等待每個任務完成。 foreach (var task in tasks.OrderByCompletion()) { var result = await task; Trace.WriteLine(result); } }
使用ConcurrentExclusiveSchedulerPair,使任務串行執行,結果是「2」, 「3」, 「1」。code
var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler; foreach (var t in tasks) { await t.ContinueWith(completed => { switch (completed.Status) { case TaskStatus.RanToCompletion: Trace.WriteLine(completed.Result); //Process(completed.Result); break; case TaskStatus.Faulted: //Handle(completed.Exception.InnerException); break; } }, scheduler); }
上篇文章中提到了使用Task.WhenAny處理已完成的任務:http://www.javashuo.com/article/p-uxgvedou-cy.html
文中的例子從算法層面是不推薦使用的,做者推薦了他本身的拓展Nito.AsyncEx
,源碼地址:https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Tasks/TaskExtensions.cs。
另外兩種實現的實現方法差很少,都是藉助TaskCompletionSource<T>
和Interlocked.Incrementa處理Task。
這裏只列出ORDERING BY COMPLETION, AHEAD OF TIME的解決方案。htm
/// <summary> /// 返回一系列任務,這些任務的輸入類型相同和返回結果類型一致 /// 返回的任務將以完成順序返回 /// </summary> private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks) { // 複製輸入,如下的處理將不須要考慮是否會對輸入有影響 var inputTaskList = inputTasks.ToList(); var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count); for (int i = 0; i < inputTaskList.Count; i++) { completionSourceList.Add(new TaskCompletionSource<T>()); } // 索引 // 索引最好是從0開始,可是 Interlocked.Increment 返回的是遞增以後的值,因此這裏應該賦值-1 int prevIndex = -1; // 能夠不用再循環以外處理Action,這樣會讓代碼更清晰。如今有C#7.0的新特性本地方法可使用 /* //本地方法 void continuation(Task<T> completedTask) { int index = Interlocked.Increment(ref prevIndex); var source = completionSourceList[index]; PropagateResult(completedTask, source); }*/ Action<Task<T>> continuation = completedTask => { int index = Interlocked.Increment(ref prevIndex); var source = completionSourceList[index]; PropagateResult(completedTask, source); }; foreach (var inputTask in inputTaskList) { inputTask.ContinueWith(continuation, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } return completionSourceList.Select(source => source.Task); } /// <summary> /// 對 TaskCompletionSource 進行處理 /// </summary> private static void PropagateResult<T>(Task<T> completedTask, TaskCompletionSource<T> completionSource) { switch (completedTask.Status) { case TaskStatus.Canceled: completionSource.TrySetCanceled(); break; case TaskStatus.Faulted: completionSource.TrySetException(completedTask.Exception.InnerExceptions); break; case TaskStatus.RanToCompletion: completionSource.TrySetResult(completedTask.Result); break; default: throw new ArgumentException("Task was not completed"); } }