.NET 並行編程——任務並行

本文內容

  • 並行編程
  • 任務並行
    • 隱式建立和運行任務
    • 顯式建立和運行任務
    • 任務 ID
    • 任務建立選項
    • 建立任務延續
    • 建立分離的子任務
    • 建立子任務
    • 等待任務完成
    • 組合任務
    • 任務中的異常處理
    • 取消任務
    • TaskFactory 類
    • 無委託的任務
    • 相關數據結構
  • 參考資料

下載 Demo

下載 Samples for Parallel Programming with .net framework

並行編程


多核 CPU 已經至關廣泛,使得多個線程可以同時執行。將代碼並行化,工做也就分攤到多個 CPU 上。express

過去,並行化須要線程和鎖的低級操做。而 Visual Studio 2010 和 .NET Framework 4 開始提供了新的運行時、新的類庫類型以及新的診斷工具,從而加強了對並行編程的支持。這些功能簡化了並行開發,經過固有方法編寫高效、細化且可伸縮的並行代碼,而沒必要直接處理線程或線程池。編程

下圖從較高層面上概述了 .NET Framework 4 中的並行編程體系結構。數組

211715268596419

任務並行庫(The Task Parallel Library,TPL)是 System.ThreadingSystem.Threading.Tasks 空間中的一組公共類型和 API。TPL 的目的是經過簡化將並行和併發添加到應用程序的過程來提升開發人員的工做效率。TPL 能動態地最有效地使用全部可用的處理器。此外,TPL 還處理工做分區、ThreadPool 上的線程調度、取消支持、狀態管理以及其餘低級別的細節操做。經過使用 TPL,你能夠將精力集中於程序要完成的工做,同時最大程度地提升代碼的性能。 安全

從 .NET Framework 4 開始,TPL 是編寫多線程代碼和並行代碼的首選方法。但並非全部代碼都適合並行化,例如,若是某個循環在每次迭代時只執行少許工做,或它在不少次迭代時都不運行,那麼並行化的開銷可能致使代碼運行更慢。 此外,像任何多線程代碼同樣,並行化會增長程序執行的複雜性。 儘管 TPL 簡化了多線程方案,但建議對線程處理概念(例如,鎖、死鎖和爭用條件)進行基本瞭解,以便可以有效地使用 TPL。數據結構

任務並行


「任務並行」是指一個或多個獨立的任務同時運行。好處固然是系統資源的使用效率更高,可伸縮性更好;對於線程或工做項,可使用更多的編程控件。所以,在 .NET Framework 中,TPL 是用於編寫多線程、異步和並行代碼的首選 API。多線程

隱式建立和運行任務

Parallel.Invoke 方法提供了一種簡便方式,可同時運行任意數量的任意語句。只需爲每一個工做項傳入 Action 委託便可。併發

下面的示例演示 Invoke 調用,建立並啓動同時運行三個任務。將對共享數據達爾文的《物種起源》執行三項操做:最長的詞、最頻繁出現的詞和字數。這些操做都不修改源,所以它們能夠直接並行執行。app

using System;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
namespace ParallelInvokeDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            // Retrieve Darwin's "Origin of the Species" from Gutenberg.org.          
            string[] words = CreateWordArray();
 
 
            // Perform three tasks in parallel on the source array
            Parallel.Invoke(() =>
                            {
                                Console.WriteLine("Begin first task...");
                                GetLongestWord(words);
                            },  // close first Action
 
                             () =>
                             {
                                 Console.WriteLine("Begin second task...");
                                 GetMostCommonWords(words);
                             }, //close second Action
 
                             () =>
                             {
                                 Console.WriteLine("Begin third task...");
                                 GetCountForWord(words, "species");
                             }  //close third Action
                         ); //close parallel.invoke
 
            Console.WriteLine("Returned from Parallel.Invoke");
 
            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
 
        private static string GetLongestWord(string[] words)
        {
            var longestWord = (from w in words
                               orderby w.Length descending
                               select w).First();
 
            Console.WriteLine("Task 1 -- The longest word is {0}", longestWord);
            return longestWord;
        }
 
        private static void GetMostCommonWords(string[] words)
        {
            var frequencyOrder = from word in words
                                 where word.Length > 6
                                 group word by word into g
                                 orderby g.Count() descending
                                 select g.Key;
 
            var commonWords = frequencyOrder.Take(10);
 
            StringBuilder sb = new StringBuilder();
            sb.AppendLine("Task 2 -- The most common words are:");
            foreach (var v in commonWords)
            {
                sb.AppendLine("  " + v);
            }
            Console.WriteLine(sb.ToString());
        }
 
        private static void GetCountForWord(string[] words, string term)
        {
            var findWord = from word in words
                           where word.ToUpper().Contains(term.ToUpper())
                           select word;
 
            Console.WriteLine(@"Task 3 -- The word ""{0}"" occurs {1} times.", term, findWord.Count());
        }
 
        static string[] CreateWordArray()
        {
            string s = System.IO.File.ReadAllText("Origin of the Species.txt");
            // Separate string into an array of words, removing some common punctuation.
            return s.Split(
                new char[] { ' ', '\u000A', ',', '.', ';', ':', '-', '_', '/' },
                StringSplitOptions.RemoveEmptyEntries);
        }
    }
}
//RESULT:
//Begin second task...
//Begin first task...
//Begin third task...
//Task 2 -- The most common words are:
//  species
//  selection
//  varieties
//  natural
//  animals
//  between
//  different
//  distinct
//  several
//  conditions
 
//Task 1 -- The longest word is characteristically
//Task 3 -- The word "species" occurs 1927 times.
//Returned from Parallel.Invoke
//Press any key to exit.

爲了更好地控制任務執行或從任務返回值,必須更加顯式地使用 Task 對象。dom

顯式建立和運行任務

不返回值的任務由 System.Threading.Tasks.Task 類表示。返回值的任務由 System.Threading.Tasks.Task<TResult> 類表示,該類從 Task 繼承。 異步

任務對象處理基礎結構詳細信息,並提供可在任務的整個生存期內從調用線程訪問的方法和屬性。 例如,能夠隨時訪問任務的 Status 屬性,以肯定它是已開始運行、已完成運行、已取消仍是引起了異常。 狀態由 TaskStatus 枚舉表示。

在建立任務時,你賦予它一個用戶委託,該委託封裝該任務將執行的代碼。 該委託能夠表示爲命名的委託、匿名方法或 lambda 表達式。 lambda 表達式能夠包含對命名方法的調用,以下面的示例所示。

using System;
using System.Threading;
using System.Threading.Tasks;
 
namespace ExplicitTaskDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            Thread.CurrentThread.Name = "Main";
            // Create a task and supply a user delegate by using a lambda expression. 
            Task taskA = new Task(() => Console.WriteLine("Hello from taskA."));
            // Start the task.
            taskA.Start();
            // Output a message from the calling thread.
            Console.WriteLine("Hello from thread '{0}'.", Thread.CurrentThread.Name);
            taskA.Wait();
 
 
 
            //Thread.CurrentThread.Name = "Main";
            //// Define and run the task.
            //Task taskA = Task.Run(() => Console.WriteLine("Hello from taskA."));
            //// Output a message from the calling thread.
            //Console.WriteLine("Hello from thread '{0}'.", Thread.CurrentThread.Name);
            //taskA.Wait();
 
 
 
            //Thread.CurrentThread.Name = "Main";
            //// Better: Create and start the task in one operation. 
            //Task taskA = Task.Factory.StartNew(() => Console.WriteLine("Hello from taskA."));
            //// Output a message from the calling thread.
            //Console.WriteLine("Hello from thread '{0}'.", Thread.CurrentThread.Name);
            //taskA.Wait();
 
 
 
            Console.WriteLine("Press any Key to Exit.");
            Console.ReadKey();
        }
    }
}
//TaskParallel1 RESULT:
//Hello from thread 'Main'.
//Hello from taskA.
//Press any Key to Exit.

你能夠用 new Task(),也能夠用 Task.Run(),還能夠用 Task.Factory.StartNew(),建立並啓動一個任務。其中,Task.Run() 是首選方法;沒必要將建立和計劃分開而且你須要其餘任務建立選項或使用特定計劃程序時,或者須要經過 AsyncState 屬性將其餘狀態傳遞到任務時,使用 Task.Factory.StartNew()

任務建立選項

建立任務的大多數 API 提供接受 TaskCreationOptions 枚舉參數。 經過指定下列選項之一,可指示任務計劃程序如何在線程池中安排任務計劃。

using System;
using System.Threading.Tasks;
 
namespace TaskCreationOptionsDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var task3 = new Task(() => MyLongRunningMethod(),
                    TaskCreationOptions.LongRunning | TaskCreationOptions.PreferFairness);
            task3.Start();
 
            task3.Wait();
 
            Console.WriteLine("Press any key to Exit.");
            Console.ReadKey();
 
        }
        static void MyLongRunningMethod()
        {
            Console.WriteLine("A long, long time ago......");
        }
    }
}

TaskCreationOptions 參數值

說明

None

未指定任何選項時的默認值。 計劃程序將使用其默認試探法來計劃任務。

PreferFairness

指定應當計劃任務,以使越早建立的任務將更可能越早執行,而越晚建立的任務將更可能越晚執行。

LongRunning

指定該任務表示長時間運行的運算。

AttachedToParent

指定應將任務建立爲當前任務(若是存在)的附加子級。 有關更多信息,請參見已附加和已分離的子任務。

DenyChildAttach

指定若是內部任務指定 AttachedToParent 選項,則該任務不會成爲附加的子任務。

HideScheduler

指定經過調用特定任務內部的 TaskFactory.StartNew 或 Task<TResult>.ContinueWith 等方法建立的任務的任務計劃程序是默認計劃程序,而不是正在運行此任務的計劃程序。

建立任務延續

使用 Task.ContinueWithTask<TResult>.ContinueWith 方法能夠指定在先行任務完成時要啓動的任務。 延續任務的委託已傳遞了對先行任務的引用,所以它能夠檢查先行任務的狀態,並經過檢索 Task<TResult>.Result 屬性的值將先行任務的輸出用做延續任務的輸入。

var getData = Task.Factory.StartNew(() =>
{
    Random rnd = new Random();
    int[] values = new int[100];
    for (int ctr = 0; ctr <= values.GetUpperBound(0); ctr++)
        values[ctr] = rnd.Next();
 
    return values;
});
 
 
var processData = getData.ContinueWith((x) =>
{
    int n = x.Result.Length;
    long sum = 0;
    double mean;
 
    for (int ctr = 0; ctr <= x.Result.GetUpperBound(0); ctr++)
        sum += x.Result[ctr];
 
    mean = sum / (double)n;
    return Tuple.Create(n, sum, mean);
});
 
 
var displayData = processData.ContinueWith((x) =>
{
    return String.Format("N={0:N0}, Total = {1:N0}, Mean = {2:N2}",
                         x.Result.Item1, x.Result.Item2, x.Result.Item3);
});
 
 
Console.WriteLine(displayData.Result);
//N=100, Total = 108,192,345,466, Mean = 1,081,923,454.66
//Press any key to Exit.

也能夠用鏈式寫法:

var displayData = Task.Factory.StartNew(() =>
{
    Random rnd = new Random();
    int[] values = new int[100];
    for (int ctr = 0; ctr <= values.GetUpperBound(0); ctr++)
        values[ctr] = rnd.Next();
 
    return values;
}).
ContinueWith((x) =>
{
    int n = x.Result.Length;
    long sum = 0;
    double mean;
 
    for (int ctr = 0; ctr <= x.Result.GetUpperBound(0); ctr++)
        sum += x.Result[ctr];
 
    mean = sum / (double)n;
    return Tuple.Create(n, sum, mean);
}).
ContinueWith((x) =>
{
    return String.Format("N={0:N0}, Total = {1:N0}, Mean = {2:N2}",
                            x.Result.Item1, x.Result.Item2,
                            x.Result.Item3);
});
 
Console.WriteLine(displayData.Result);

建立分離的子任務

若是在任務中運行的用戶代碼建立一個新任務,且未指定 AttachedToParent 選項,則該新任務不採用任何特殊方式與父任務同步。 這種不一樣步的任務類型稱爲「分離的嵌套任務」或「分離的子任務」。

var outer = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning.");
 
    var child = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(5000000);
        Console.WriteLine("Detached task completed.");
    });
 
});
 
outer.Wait();
Console.WriteLine("Outer task completed.");
//The example displays the following output:
//Outer task beginning.
//Outer task completed.
//Detached task completed.

從輸出信息看,outer 沒有等 child。

建立子任務

若是在一個任務中運行的用戶代碼建立任務時指定了 AttachedToParent 選項,則該新任務稱爲父任務的「附加子任務」。 由於父任務隱式地等待全部附加子任務完成,因此你可使用 AttachedToParent 選項表示結構化的任務並行。

var parent = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Parent task beginning.");
    for (int ctr = 0; ctr < 10; ctr++)
    {
        int taskNo = ctr;
        Task.Factory.StartNew((x) =>
        {
            Thread.SpinWait(5000000);
            Console.WriteLine("Attached child #{0} completed.",
                              x);
        },
        taskNo, TaskCreationOptions.AttachedToParent);
    }
});
 
parent.Wait();
Console.WriteLine("Parent task completed.");
//The example displays the following output:
//Parent task beginning.
//Attached child #9 completed.
//Attached child #0 completed.
//Attached child #8 completed.
//Attached child #1 completed.
//Attached child #3 completed.
//Attached child #7 completed.
//Attached child #4 completed.
//Attached child #5 completed.
//Attached child #2 completed.
//Attached child #6 completed.
//Parent task completed.

等待任務完成

System.Threading.Tasks.Task 類型和 System.Threading.Tasks.Task<TResult> 類型提供了 Task.WaitTask<TResult>.Wait 方法的若干重載,使你可以等待任務完成。

此外,使用靜態 Task.WaitAllTask.WaitAny 方法的重載能夠等待一批任務中的任一任務或全部任務完成。

一般,會出於如下某個緣由等待任務:

  • 主線程依賴於任務計算的最終結果。
  • 你必須處理可能從任務引起的異常。
  • 應用程序能夠在全部任務執行完畢以前終止。 例如,執行 Main(應用程序入口點)中的全部同步代碼後,控制檯應用程序將當即終止。

組合任務

Task 類和 Task<TResult> 類提供多種方法,這些方法可以幫助你組合多個任務以實現常見模式,並更好地使用由 C#、Visual Basic 和 F# 提供的異步語言功能。 本節介紹了 WhenAllWhenAnyDelayFromResult<TResult> 方法。

Task.WhenAll 和 Task.WhenAny

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace TaskWaitAllWaitAnyDemo
{
    class Program
    {
        static Random rand = new Random();
 
        static void Main(string[] args)
        {
            // Wait on a single task with no timeout specified.
            Task taskA = Task.Factory.StartNew(() => DoSomeWork(10000000));
            taskA.Wait();
            Console.WriteLine("taskA has completed.");
 
 
            // Wait on a single task with a timeout specified.
            Task taskB = Task.Factory.StartNew(() => DoSomeWork(10000000));
            taskB.Wait(100); //Wait for 100 ms.
 
            if (taskB.IsCompleted)
                Console.WriteLine("taskB has completed.");
            else
                Console.WriteLine("Timed out before taskB completed.");
 
            // Wait for all tasks to complete.
            Task[] tasks = new Task[10];
            for (int i = 0; i < 10; i++)
            {
                tasks[i] = Task.Factory.StartNew(() => DoSomeWork(10000000));
            }
            Task.WaitAll(tasks);
 
            // Wait for first task to complete.
            Task<double>[] tasks2 = new Task<double>[3];
 
            // Try three different approaches to the problem. Take the first one.
            tasks2[0] = Task<double>.Factory.StartNew(() => TrySolution1());
            tasks2[1] = Task<double>.Factory.StartNew(() => TrySolution2());
            tasks2[2] = Task<double>.Factory.StartNew(() => TrySolution3());
 
 
            int index = Task.WaitAny(tasks2);
            double d = tasks2[index].Result;
            Console.WriteLine("task[{0}] completed first with result of {1}.", index, d);
 
            Console.ReadKey();
        }
 
        static void DoSomeWork(int val)
        {
            // Pretend to do something.
            Thread.SpinWait(val);
        }
 
        static double TrySolution1()
        {
            int i = rand.Next(1000000);
            // Simulate work by spinning
            Thread.SpinWait(i);
            return DateTime.Now.Millisecond;
        }
        static double TrySolution2()
        {
            int i = rand.Next(1000000);
            // Simulate work by spinning
            Thread.SpinWait(i);
            return DateTime.Now.Millisecond;
        }
        static double TrySolution3()
        {
            int i = rand.Next(1000000);
            // Simulate work by spinning
            Thread.SpinWait(i);
            Thread.SpinWait(1000000);
            return DateTime.Now.Millisecond;
        }
    }
}
//The example displays the following output:
//taskA has completed.
//taskB has completed.
//task[0] completed first with result of 353.

最後一個輸出結果是不肯定的,由於 int index = Task.WaitAny(tasks2); 語句,task2 任務數組中有一個完成,就能夠執行該語句下面的語句。

Task.FromResult<TResult>

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
 
namespace TaskFromResultDemo
{
    /// <summary>
    /// Demonstrates how to use Task<TResult>.FromResult to create a task that holds a pre-computed result.
    /// </summary>
    class Program
    {
        // Holds the results of download operations.
        static ConcurrentDictionary<string, string> cachedDownloads = new ConcurrentDictionary<string, string>();
 
        // Asynchronously downloads the requested resource as a string.
        public static Task<string> DownloadStringAsync(string address)
        {
            // First try to retrieve the content from cache.
            string content;
            if (cachedDownloads.TryGetValue(address, out content))
            {
                return Task.FromResult<string>(content);
            }
 
            // If the result was not in the cache, download the 
            // string and add it to the cache.
            return Task.Run(async () =>
            {
                content = await new WebClient().DownloadStringTaskAsync(address);
                cachedDownloads.TryAdd(address, content);
                return content;
            });
        }
 
        static void Main(string[] args)
        {
            // The URLs to download.
            string[] urls = new string[]
      {
         "http://msdn.microsoft.com",
         "http://www.contoso.com",
         "http://www.microsoft.com"
      };
 
            // Used to time download operations.
            Stopwatch stopwatch = new Stopwatch();
 
            // Compute the time required to download the URLs.
            stopwatch.Start();
            var downloads = from url in urls
                            select DownloadStringAsync(url);
            Task.WhenAll(downloads).ContinueWith(results =>
            {
                stopwatch.Stop();
 
                // Print the number of characters download and the elapsed time.
                Console.WriteLine("Retrieved {0} characters. Elapsed time was {1} ms.",
                   results.Result.Sum(result => result.Length),
                   stopwatch.ElapsedMilliseconds);
            })
            .Wait();
 
            // Perform the same operation a second time. The time required
            // should be shorter because the results are held in the cache.
            stopwatch.Restart();
            downloads = from url in urls
                        select DownloadStringAsync(url);
            Task.WhenAll(downloads).ContinueWith(results =>
            {
                stopwatch.Stop();
 
                // Print the number of characters download and the elapsed time.
                Console.WriteLine("Retrieved {0} characters. Elapsed time was {1} ms.",
                   results.Result.Sum(result => result.Length),
                   stopwatch.ElapsedMilliseconds);
            })
            .Wait();
 
            Console.WriteLine("Press any key to Exit.");
            Console.ReadKey();
        }
    }
}
//The example displays the following output:
//Retrieved 45462 characters. Elapsed time was 23734 ms.
//Retrieved 45462 characters. Elapsed time was 0 ms.
//Press any key to Exit.

任務中的異常處理

當某個任務拋出一個或多個異常時,異常包裝在 AggregateException 異常中。 該異常傳播回與該任務聯接的線程,一般該線程正在等待該任務完成或該線程訪問 Result 屬性。此行爲用於強制實施 .NET Framework 策略 - 默認全部未處理的異常應終止進程。

能夠經過將 WaitWaitAllWaitAny 方法,以及 Result 屬性放入 try/catch 塊中來處理異常。

捕獲 System.AggregateException,而後檢查其 InnerExceptions 以查看是否存在任何可由程序代碼處理的異常。以下代碼所示:

static void HandleExceptions()
{
    // Assume this is a user-entered string.
    string path = @"C:\";
 
    // Use this line to throw UnauthorizedAccessException, which we handle.
    Task<string[]> task1 = Task<string[]>.Factory.StartNew(() => GetAllFiles(path));
 
    // Use this line to throw an exception that is not handled.
    //  Task task1 = Task.Factory.StartNew(() => { throw new IndexOutOfRangeException(); } );
    try
    {
        task1.Wait();
    }
    catch (AggregateException ae)
    {
        ae.Handle((x) =>
        {
            if (x is UnauthorizedAccessException) // This we know how to handle.
            {
                Console.WriteLine("You do not have permission to access all folders in this path.");
                Console.WriteLine("See your network administrator or try another path.");
                return true;
            }
            return false; // Let anything else stop the application.
        });
 
    }
 
    Console.WriteLine("task1 has completed.");
}
 
static string[] GetAllFiles(string str)
{
    // Should throw an AccessDenied exception on Vista.
    return System.IO.Directory.GetFiles(str, "*.txt", System.IO.SearchOption.AllDirectories);
}

在此示例中,捕獲 System.AggregateException,但不嘗試處理任何其內部異常。 而是使用 Flatten 方法從任何嵌套的 AggregateException 實例中提取內部異常,並再次引起直接包含全部內部未處理異常的單個 AggregateException。 展平異常將使其更便於供客戶端代碼處理。

static void RethrowAllExceptions()
{
    // Assume this is a user-entered string.
    string path = @"C:\";
 
 
    Task<string[]>[] tasks = new Task<string[]>[3];
    tasks[0] = Task<string[]>.Factory.StartNew(() => GetAllFiles(path));
    tasks[1] = Task<string[]>.Factory.StartNew(() => GetValidExtensions(path));
    tasks[2] = Task<string[]>.Factory.StartNew(() => new string[10]);
 
 
    //int index = Task.WaitAny(tasks2);
    //double d = tasks2[index].Result;
    try
    {
        Task.WaitAll(tasks);
    }
    catch (AggregateException ae)
    {
        throw ae.Flatten();
    }
 
    Console.WriteLine("task1 has completed.");
}
 
static string[] GetValidExtensions(string path)
{
    if (path == @"C:\")
        throw new ArgumentException("The system root is not a valid path.");
 
    return new string[10];
}

取消任務

Task 類支持協做取消,並與 .NET Framework 4 中新增的 System.Threading.CancellationTokenSource 類和 System.Threading.CancellationToken 類徹底集成。

建立任務的大多數 API 提供接受 CancellationToken 參數。

var tokenSource2 = new CancellationTokenSource();
CancellationToken ct = tokenSource2.Token;
 
var task = Task.Factory.StartNew(() =>
{
    // Were we already canceled?
    ct.ThrowIfCancellationRequested();
 
    bool moreToDo = true;
    while (moreToDo)
    {
        // Poll on this property if you have to do
        // other cleanup before throwing.
        if (ct.IsCancellationRequested)
        {
            // Clean up here, then...
            ct.ThrowIfCancellationRequested();
        }
    }
}, tokenSource2.Token); // Pass same token to StartNew.
 
tokenSource2.Cancel();
 
// Just continue on this thread, or Wait/WaitAll with try-catch:
try
{
    task.Wait();
}
catch (AggregateException e)
{
    foreach (var v in e.InnerExceptions)
        Console.WriteLine(e.Message + " " + v.Message);
}

TaskFactory 類

TaskFactory 類提供靜態方法,這些方法封裝了用於建立和啓動任務和延續任務的一些經常使用模式。

  • 最經常使用模式爲 StartNew,它在一個語句中建立並啓動任務。
  • 從多個先行任務建立延續任務時,請使用 ContinueWhenAll 方法或 ContinueWhenAny 方法,或者它們在 Task<TResult> 類中的等效方法。 有關更多信息,請參見延續任務。
  • 若要在 Task Task<TResult> 實例中封裝異步編程模型 BeginXEndX 方法,請使用 FromAsync 方法。

默認的 TaskFactory 可做爲 Task 類或 Task<TResult> 類上的靜態屬性訪問。 你還能夠直接實例化 TaskFactory 並指定各類選項,包括 CancellationTokenTaskCreationOptions 選項、TaskContinuationOptions 選項或 TaskScheduler。 建立任務工廠時所指定的任何選項將應用於它建立的全部任務,除非 Task 是經過使用 TaskCreationOptions 枚舉建立的(在這種狀況下,任務的選項重寫任務工廠的選項)。

無委託的任務

在某些狀況下,可能須要使用 Task 封裝由外部組件(而不是你本身的用戶委託)執行的某個異步操做。 若是該操做基於異步編程模型 Begin/End 模式,你可使用 FromAsync 方法。 若是不是這種狀況,你可使用 TaskCompletionSource<TResult> 對象將該操做包裝在任務中,並於是得到 Task 可編程性的一些好處,例如對異常傳播和延續的支持。

在許多方案中,啓用一個 Task<TResult> 來表示外部異步操做很是有用。 出於此目的,提供了 TaskCompletionSource<TResult>。 它容許建立能夠分發到使用者的任務,這些使用者能夠同其餘狀況下同樣使用該任務的成員。 可是,與大多數任務不一樣,TaskCompletionSource 建立的任務的狀態由 TaskCompletionSource 上的方法顯式控制。 這使得要傳播到基礎任務的外部異步操做可以完成。 分離還確保使用者沒有訪問對應的 TaskCompletionSource 時不能轉換狀態。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
 
namespace TaskCompletionSourceDemo
{
    class Program
    {
        // Demonstrated features:
        //         TaskCompletionSource ctor()
        //         TaskCompletionSource.SetResult()
        //         TaskCompletionSource.SetException()
        //        Task.Result
        // Expected results:
        //         The attempt to get t1.Result blocks for ~1000ms until tcs1 gets signaled. 15 is printed out.
        //         The attempt to get t2.Result blocks for ~1000ms until tcs2 gets signaled. An exception is printed out.
        // Documentation:
        //        http://msdn.microsoft.com/en-us/library/dd449199(VS.100).aspx
        static void Main(string[] args)
        {
            TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>();
            Task<int> t1 = tcs1.Task;
 
            // Start a background task that will complete tcs1.Task
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                tcs1.SetResult(15);
            });
 
            // The attempt to get the result of t1 blocks the current thread until the completion source gets signaled.
            // It should be a wait of ~1000 ms.
            Stopwatch sw = Stopwatch.StartNew();
            int result = t1.Result;
            sw.Stop();
 
            Console.WriteLine("(ElapsedTime={0}): t1.Result={1} (expected 15) ", sw.ElapsedMilliseconds, result);
 
            // ------------------------------------------------------------------
 
            // Alternatively, an exception can be manually set on a TaskCompletionSource.Task
            TaskCompletionSource<int> tcs2 = new TaskCompletionSource<int>();
            Task<int> t2 = tcs2.Task;
 
            // Start a background Task that will complete tcs2.Task with an exception
            Task.Factory.StartNew(() =>
            {
                Thread.Sleep(1000);
                tcs2.SetException(new InvalidOperationException("SIMULATED EXCEPTION"));
            });
 
            // The attempt to get the result of t2 blocks the current thread until the completion source gets signaled with either a result or an exception.
            // In either case it should be a wait of ~1000 ms.
            sw = Stopwatch.StartNew();
            try
            {
                result = t2.Result;
 
                Console.WriteLine("t2.Result succeeded. THIS WAS NOT EXPECTED.");
 
                Console.WriteLine("Press any key to Exit.");
                Console.ReadKey();
            }
            catch (AggregateException e)
            {
                Console.Write("(ElapsedTime={0}): ", sw.ElapsedMilliseconds);
                Console.WriteLine("The following exceptions have been thrown by t2.Result: (THIS WAS EXPECTED)");
                for (int j = 0; j < e.InnerExceptions.Count; j++)
                {
                    Console.WriteLine("\n-------------------------------------------------\n{0}", e.InnerExceptions[j].ToString());
                }
 
                Console.WriteLine("Press any key to Exit.");
                Console.ReadKey();
            }
        }
    }
}

相關數據結構

TPL 有幾種在並行和順序方案中都有用的新公共類型。 它們包括 System.Collections.Concurrent 命名空間中的一些線程安全的、快速且可縮放的集合類,還包括一些新的同步類型(例如 System.Threading.Semaphore System.Threading.ManualResetEventSlim),對特定類型的工做負荷而言,這些新同步類型比舊的同步類型效率更高。 .NET Framework 4 中的其餘新類型(例如 System.Threading.BarrierSystem.Threading.SpinLock)提供了早期版本中未提供的功能。

參考資料


 

下載 Demo

下載 Samples for Parallel Programming with .net framework

相關文章
相關標籤/搜索