Task
在C#編程中,實現並行能夠直接使用線程,但使用起來很繁瑣;也可使用線程池,線程池很大程度上簡化了線程的使用,可是也有着一些侷限,好比咱們不知道做業何時完成,也取不到做業的返回值;解決線程池侷限性的方案是使用任務
。本文將總結C#中Task
的使用。編程
相似於線程池工做項對異步操做的封裝,任務是對異步操做的另外一種形式的封裝,這種封裝抽象層次更高,讓咱們可以對異步操做進行更多的控制。異步
任務啓動後,經過任務調度器TaskScheduler
來調度。.NET中提供兩種任務調度器,一種是線程池任務調度器,也是默認調度器,它會將任務派發給線程池工做者線程;另外一種是上下文同步任務調度器,它會將任務派發給當前上下文線程,例如GUI線程。此外,咱們也能自定義任務調度器,例如能夠將異步IO任務派發給線程池IO線程。測試
Parallel
靜態類除了提供並行循環的各類重載,還提供了一個方法Parallel.Invoke
。這個方法能夠建立並執行一個或多個異步任務,使用方法以下:this
/// <summary> /// 任務模擬 /// </summary> private static void DoWork(int workId = 0) { Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started work[{workId}]."); Thread.Sleep(3000); Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] done work[{workId}]."); } /// <summary> /// 任務的隱式使用 /// </summary> public static void ImplicitUsingOfTask() { Parallel.Invoke(()=>DoWork(1),()=>DoWork(2),() => DoWork(3)); }
上例的運行結果以下:pwa
2019/3/27 20:40:18=> Thread[9] started work[1].
2019/3/27 20:40:18=> Thread[12] started work[3].
2019/3/27 20:40:18=> Thread[10] started work[2].
2019/3/27 20:40:21=> Thread[9] done work[1].
2019/3/27 20:40:21=> Thread[12] done work[3].
2019/3/27 20:40:21=> Thread[10] done work[2].線程
對於簡單的多任務並行,使用上述的方式很方便,可是這種方式與線程池同樣,咱們不能控制任務的執行或者獲取任務返回值。code
相對於使用Parallel.Invoke
執行並行操做,更經常使用的是使用Task
和Task<T>
提供的方法進行異步和並行處理。下面是任務最基本的使用:繼承
Task.Run(() => { //TODO }); Task.Factory.StartNew(() => { //TODO });
具備返回值的任務使用Task<T>
,T
可根據咱們的需求指定,下面是獲取任務返回值的方法。token
Task<int> task = Task<int>.Factory.StartNew(() => { Thread.Sleep(3000);//模擬操做用時 return DateTime.Now.Day; }); int day = task.Result;
須要說明的是,獲取任務的結果會阻塞當前線程。ci
有時候,咱們須要等待一些任務所有完成後才能執行後續操做,有時候只要多個任務中的一個完成了,就能夠執行後續操做。Task
提供了Wait
、WaitAll
和WaitAny
等方法知足咱們的需求。下面的例子展現了各類等待方法的使用。
/// <summary> /// 任務等待測試 /// </summary> public static void TaskWait() { Stopwatch watch = new Stopwatch(); #region 場景1:等待一個任務完成 Task task = Task.Run(() => DoWorkOfTask(1000)); Console.WriteLine("start wait. work duration: 1000"); watch.Start(); task.Wait();//等待1秒左右 watch.Stop(); Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}"); #endregion #region 場景2:等待多個任務完成 Task[] tasks = new Task[3] { Task.Run(() => DoWorkOfTask(1000)), Task.Run(() => DoWorkOfTask(2000)), Task.Run(() => DoWorkOfTask(3000)), }; Console.WriteLine("start wait all. work duration: min 1000 max 3000."); watch.Restart(); Task.WaitAll(tasks);//等待3秒左右 watch.Stop(); Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}"); #endregion #region 場景3:等待某個任務完成 tasks = new Task[3] { Task.Run(() => DoWorkOfTask(1000)), Task.Run(() => DoWorkOfTask(2000)), Task.Run(() => DoWorkOfTask(3000)), }; Console.WriteLine("start wait any. work duration: min 1000 max 3000."); watch.Restart(); Task.WaitAny(tasks);//等待1秒左右 watch.Stop(); Console.WriteLine($"end wait. time: {watch.ElapsedMilliseconds}"); #endregion } /// <summary> /// 作任務 /// </summary> /// <param name="workDuration">任務時長</param> private static void DoWorkOfTask(int workDuration) { Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task[{Task.CurrentId}]."); Thread.Sleep(workDuration); Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task[{Task.CurrentId}]."); }
使用Wait
、WaitAll
和WaitAny
方法時,咱們能夠設置超時時間或者傳入取消Token,以控制等待時間。但這些方法返回布爾值,只能代表是否等待成功;假如咱們須要知道所等待的任務返回值,則可使用WhenAll
或WhenAny
方法,這兩個方法不能控制等待時間,但會返回一個完成的任務。以下例:
Task<int>[] tasks = new Task<int>[3] { Task<int>.Factory.StartNew(() => { Console.WriteLine($"task #{Task.CurrentId} run"); Thread.Sleep(100); Console.WriteLine($"task #{Task.CurrentId} done"); return 100; }), Task<int>.Factory.StartNew(() => { Console.WriteLine($"task #{Task.CurrentId} run"); Thread.Sleep(500); Console.WriteLine($"task #{Task.CurrentId} done"); return 1000; }), Task<int>.Factory.StartNew(() => { Console.WriteLine($"task #{Task.CurrentId} run"); Thread.Sleep(1000); Console.WriteLine($"task #{Task.CurrentId} done"); return 10000; }), }; //int[] results = Task.WhenAll(tasks).Result; //Console.WriteLine($"[{string.Join(",",results)}]"); Task<int> task = Task.WhenAny(tasks).Result; Console.WriteLine($"task #{task.Id}. result {task.Result}");
Task.WhenAll
和Task.WhenAny
在等待結束時,都會建立一個完成狀態的任務,WhenAll
將等待的全部已完成任務的結果放入建立任務的結果中,WhenAny
則將等待的已完成任務放到建立任務的結果中。
有時候,咱們須要在一個任務完成時開始另外一個任務。對於這種需求,咱們可使用Task
的ContinueWith
等方法來處理。
Task task = Task.Run(() => DoWorkOfTask(3000)); task.ContinueWith(t => DoWorkOfTask(1000));
運行結果:
2019/3/27 21:25:09=> Thread[10] started task[1].
2019/3/27 21:25:12=> Thread[10] completed task[1].
2019/3/27 21:25:12=> Thread[11] started task[2].
2019/3/27 21:25:13=> Thread[11] completed task[2].
咱們還能夠經過TaskContinuationOptions
指定延續任務的執行條件,如任務取消時或者任務出現異常時才執行,等。
有時候,咱們要在一個任務裏面建立一些其餘任務,而且還要在任務裏面等待建立的任務完成,此時咱們可使用子任務。
Task parent = Task.Factory.StartNew(() => { Console.WriteLine($"parent task #{Task.CurrentId} run."); for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => { Console.WriteLine($"child task #{Task.CurrentId} run."); Thread.Sleep(1000); Console.WriteLine($"child task #{Task.CurrentId} done."); }, TaskCreationOptions.AttachedToParent); } }); parent.Wait(); Console.WriteLine($"parent task #{parent.Id} done.");
在一個任務中建立的新任務,默認狀況下與父級任務是分離的,各自的運行不受影響,除非在建立任務時顯式附加到父級任務中。例如,上例中若是不指定TaskCreationOptions.AttachedToParent
,parent.Wait()
就不會持續到全部子任務都執行完成。
咱們在啓動任務時,傳入取消令牌CancellationToken
,當收到取消請求時,拋出取消異常並在等待任務完成時捕獲異常TaskCanceledException
。咱們經過這種方式控制任務的取消。
/// <summary> /// 任務取消 /// </summary> public static void TaskCancle() { Console.WriteLine("Press any key to begin. Press 'c' to cancel. "); Console.ReadKey(true); Console.WriteLine(); CancellationTokenSource tokenSource = new CancellationTokenSource(); ConcurrentBag<Task> tasks = new ConcurrentBag<Task>(); //單任務取消 Task task1 = Task.Factory.StartNew(() => DoWorkOfTask(5000, tokenSource.Token), tokenSource.Token); tasks.Add(task1); //嵌套任務取消 Task task2 = Task.Factory.StartNew(() => { for (int i = 0; i < 10; i++) { int duration = 1000 * i; tasks.Add(Task.Factory.StartNew(()=>DoWorkOfTask(duration, tokenSource.Token), tokenSource.Token)); } DoWorkOfTask(5000,tokenSource.Token); }, tokenSource.Token); tasks.Add(task2); char ch = Console.ReadKey().KeyChar; if (ch == 'c' || ch == 'C') { tokenSource.Cancel(); Console.WriteLine($"{DateTime.Now}=> Task cancellation requested."); } try { Task.WaitAll(tasks.ToArray()); } catch (AggregateException ae) { foreach (Exception ex in ae.InnerExceptions) {//任務取消經過拋出TaskCanceledException實現 TaskCanceledException tce = ex as TaskCanceledException; string cancelledTask = tce == null ? string.Empty : $"Task #{tce.Task.Id}"; Console.WriteLine($"Exception: {ex.GetType().Name}. {cancelledTask}"); } } finally { tokenSource.Dispose(); } Console.WriteLine(); //顯示任務狀態 foreach (Task task in tasks) { Console.WriteLine($"Task: #{task.Id} now is {task.Status}"); } } /// <summary> /// 帶取消令牌的做業 /// </summary> /// <param name="workDuration">做業時長</param> /// <param name="cancleToken">取消令牌</param> private static void DoWorkOfTask(int workDuration, CancellationToken cancleToken) { if (cancleToken.IsCancellationRequested) {//開始以前取消 Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled before it got started."); cancleToken.ThrowIfCancellationRequested(); } Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] started task #{Task.CurrentId}."); Thread.Sleep(workDuration); if (cancleToken.IsCancellationRequested) {//開始以後取消 Console.WriteLine($"{DateTime.Now}=> Task #{Task.CurrentId} was cancelled."); cancleToken.ThrowIfCancellationRequested(); } Console.WriteLine($"{DateTime.Now}=> Thread[{Thread.CurrentThread.ManagedThreadId}] completed task #{Task.CurrentId}."); }
上面提到經過取消令牌拋出TaskCanceledException
的方式控制任務的取消,實際上,Task會把自身執行過程當中的全部異常都包裝到一個AggregateException
中,並傳回調用線程。咱們在主線程中經過捕獲AggregateException
來進行異常處理。
咱們能夠在任務的調用線程捕獲並遍歷AggregateException
的內部異常,或者使用AggregateException
提供的Handle方法進行處理,以下:
Task task = Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); }); try { task.Wait(); } catch (AggregateException ae) { //處理方式1:遍歷內部異常進行處理 foreach (Exception ex in ae.InnerExceptions) { Console.WriteLine($"foreach: {ex.Message}"); } //處理方式2:使用AggregateException的Handle方法 ae.Handle(ex=> { Console.WriteLine($"handle: {ex.Message}"); return true ; }); }
有時候,咱們能夠給任務附加一個任務異常時纔會執行的延續任務,並在延續任務中進行異常處理。
Task.Run(() => { throw new Exception($"Task #{Task.CurrentId} thrown an exception"); }) .ContinueWith(t => { Console.WriteLine($"{t.Exception?.InnerException?.Message}"); }, TaskContinuationOptions.OnlyOnFaulted);
下面是一個3層嵌套的任務。
Task parent = Task.Factory.StartNew(() => {//父級任務 for (int i = 0; i < 10; i++) { Task.Factory.StartNew(() => {//1代子任務 for (int j = 0; j < 10; j++) { Task.Factory.StartNew(() => {//2代子任務 throw new Exception($"Task #{Task.CurrentId} thrown an exception. "); }/*, TaskCreationOptions.AttachedToParent*/); } throw new Exception($"Task #{Task.CurrentId} thrown an exception. "); }/*, TaskCreationOptions.AttachedToParent*/); } throw new Exception($"Task #{Task.CurrentId} thrown an exception. "); }); try { parent.Wait(); } catch (AggregateException ae) { ae.Flatten().Handle(ex => { Console.WriteLine(ex.Message); return true; }); }
運行上面的代碼只會獲得一行輸出:
Task #1 thrown an exception.
看起來有點奇怪,爲何只捕獲到一個異常呢?其實也是在情理之中的:任務默認只會把自身異常傳遞到它本身的調用線程,子任務是在父任務中調用的,其異常只會傳遞到父任務的執行線程,因此咱們在父任務的調用線程,也就是咱們的主線程中是捕獲不到子任務的異常的。
取消上面代碼的兩處/*, TaskCreationOptions.AttachedToParent*/
,就會捕獲到全部異常。
任務是由TaskScheduler
調度的,啓動任務時,默認使用線程池任務調度器,任務將會被派發到線程池工做線程。線程池的調度前面已經總結過,這裏再也不展開。.NET提供的另外一種任務調度器是同步上下文調度器,用TaskScheduler.FromCurrentSynchronizationContext()
獲取,這個調度器會把任務派發給當前的上下文線程,經常使用在GUI應用程序中。
例如,咱們在一個窗體中新建一個ListBox,新建幾個任務向其中添加項,代碼以下:
this.lbxMsg.Items.Add($"{DateTime.Now:O}=>Current thread is thread #{Thread.CurrentThread.ManagedThreadId} ."); for (int i = 0; i < 10; i++) { new Task(() => { for (int j = 0; j < 3; j++) { this.lbxMsg.Items.Add($"{DateTime.Now:O}=> Task #{Task.CurrentId} add an item with thread #{Thread.CurrentThread.ManagedThreadId}."); } }).Start(TaskScheduler.FromCurrentSynchronizationContext()); }
運行上面的代碼能夠發現建立的任務都是由界面線程執行的。這裏若是使用默認的任務調度器將產生"線程間操做無效"的異常。
實際使用時,能夠給一個異步任務添加延續任務,來處理異步任務的結果或者異常等。以下:
Task.Run(() => { Thread.Sleep(3000); // 模擬操做過程 return 1000; // 模擬結果 }).ContinueWith(t => { this.lbxMsg.Items.Add(t.Result); // 在界面呈現結果或作其餘處理 }, TaskScheduler.FromCurrentSynchronizationContext());
除了使用.NET提供的調度器外,咱們可以繼承類TaskScheduler
來實現本身的任務調度器。這裏再也不展開,須要瞭解的能夠參考Samples for Parallel Programming with the .NET Framework。