《C#併發編程經典實例》學習筆記—2.6 任務完成時的處理

問題

正在 await 一批任務,但願在每一個任務完成時對它作一些處理。另外,但願在任務一完成就當即進行處理,而不須要等待其餘任務。html

問題的重點在於但願任務完成以後當即進行處理,而不去等待其餘任務。
這裏還沿用文中的例子。
等待幾秒鐘以後返回等待的秒數,以後當即打印任務等待的秒數。
等待的函數以下git

static async Task<int> DelayAndReturnAsync(int val)
{
      await Task.Delay(TimeSpan.FromSeconds(val));
      return val;
}

如下方法執行以後的打印結果是「2」, 「3」, 「1」。想獲得結果「1」, 「2」, 「3」應該如何實現。github

static async Task ProcessTasksAsync()
{
    // 建立任務隊列。
    Task<int> taskA = DelayAndReturnAsync(2);
    Task<int> taskB = DelayAndReturnAsync(3);
    Task<int> taskC = DelayAndReturnAsync(1);
    var tasks = new[] { taskA, taskB, taskC };
    // 按順序 await 每一個任務。
    foreach (var task in tasks)
    {
        var result = await task;
        Trace.WriteLine(result);
    }
}

文中給了兩種解決方案。一種是抽出更高級的async方法,一種是藉助做者的nuget拓展。做者還推薦了另外兩個博客文章。
Processing tasks as they complete
ORDERING BY COMPLETION, AHEAD OF TIME
這兩篇文章介紹了更多處理方法。算法

抽象方法,併發執行

static async Task AwaitAndProcessAsync(Task<int> task)
{
    var result = await task;
    Trace.WriteLine(result);
}

將執行和處理抽象出來,藉助Task.WhenAll和LINQ併發執行。併發

var processingTasks = (from t in tasks
select AwaitAndProcessAsync(t)).ToArray();
// 等待所有處理過程的完成。
await Task.WhenAll(processingTasks);

或者async

var processingTasks = tasks.Select(async t =>
{
var result = await t;
Trace.WriteLine(result);
}).ToArray();
// 等待所有處理過程的完成。
await Task.WhenAll(processingTasks);

藉助nuget拓展:Nito.AsyncEx

推薦預發佈版本:https://www.nuget.org/packages/Nito.AsyncEx/5.0.0-pre-06
須要添加引用using Nito.AsyncEx;函數

static async Task UseOrderByCompletionAsync()
{
      // 建立任務隊列。
      Task<int> taskA = DelayAndReturnAsync(2);
      Task<int> taskB = DelayAndReturnAsync(3);
      Task<int> taskC = DelayAndReturnAsync(1);
      var tasks = new[] { taskA, taskB, taskC };
      // 等待每個任務完成。
      foreach (var task in tasks.OrderByCompletion())
      {
           var result = await task;
           Trace.WriteLine(result);
      }
}

串行執行

使用ConcurrentExclusiveSchedulerPair,使任務串行執行,結果是「2」, 「3」, 「1」。code

var scheduler = new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler;
foreach (var t in tasks)
{
    await t.ContinueWith(completed =>
    {
             switch (completed.Status)
             {
                   case TaskStatus.RanToCompletion:
                   Trace.WriteLine(completed.Result);
                   //Process(completed.Result);
                   break;
                   case TaskStatus.Faulted:
                   //Handle(completed.Exception.InnerException);
                   break;
               }
     }, scheduler);
}

上篇文章中提到了使用Task.WhenAny處理已完成的任務:http://www.javashuo.com/article/p-uxgvedou-cy.html
文中的例子從算法層面是不推薦使用的,做者推薦了他本身的拓展Nito.AsyncEx,源碼地址:https://github.com/StephenCleary/AsyncEx/blob/master/src/Nito.AsyncEx.Tasks/TaskExtensions.cs
另外兩種實現的實現方法差很少,都是藉助TaskCompletionSource<T>和Interlocked.Incrementa處理Task。
這裏只列出ORDERING BY COMPLETION, AHEAD OF TIME的解決方案。htm

/// <summary> 
/// 返回一系列任務,這些任務的輸入類型相同和返回結果類型一致
/// 返回的任務將以完成順序返回
/// </summary> 
private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks) 
{ 
    // 複製輸入,如下的處理將不須要考慮是否會對輸入有影響
    var inputTaskList = inputTasks.ToList();
    var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count); 
    for (int i = 0; i < inputTaskList.Count; i++) 
    { 
        completionSourceList.Add(new TaskCompletionSource<T>()); 
    }

    // 索引
    // 索引最好是從0開始,可是 Interlocked.Increment 返回的是遞增以後的值,因此這裏應該賦值-1
    int prevIndex = -1;

    // 能夠不用再循環以外處理Action,這樣會讓代碼更清晰。如今有C#7.0的新特性本地方法可使用
     /* //本地方法
     void continuation(Task<T> completedTask)
     {
          int index = Interlocked.Increment(ref prevIndex);
          var source = completionSourceList[index];
          PropagateResult(completedTask, source);
     }*/ 
   
    Action<Task<T>> continuation = completedTask => 
    { 
        int index = Interlocked.Increment(ref prevIndex); 
        var source = completionSourceList[index]; 
        PropagateResult(completedTask, source); 
    };

    foreach (var inputTask in inputTaskList) 
    {  
        inputTask.ContinueWith(continuation, 
                               CancellationToken.None, 
                               TaskContinuationOptions.ExecuteSynchronously, 
                               TaskScheduler.Default); 
    }

    return completionSourceList.Select(source => source.Task); 
}

/// <summary> 
/// 對 TaskCompletionSource 進行處理
/// </summary> 
private static void PropagateResult<T>(Task<T> completedTask, 
    TaskCompletionSource<T> completionSource) 
{ 
    switch (completedTask.Status) 
    { 
        case TaskStatus.Canceled: 
            completionSource.TrySetCanceled(); 
            break; 
        case TaskStatus.Faulted: 
            completionSource.TrySetException(completedTask.Exception.InnerExceptions); 
            break; 
        case TaskStatus.RanToCompletion: 
            completionSource.TrySetResult(completedTask.Result); 
            break; 
        default: 
            throw new ArgumentException("Task was not completed"); 
    } 
}
相關文章
相關標籤/搜索