2、並行編程 - Task任務

任務,基於線程池。其使咱們對並行編程變得更簡單,且不用關心底層是怎麼實現的。
System.Threading.Tasks.Task類是Task Programming Library(TPL)中最核心的一個類。
html

1、任務與線程

1:任務是架構在線程之上的,也就是說任務最終仍是要拋給線程去執行。spring

2:任務跟線程不是一對一的關係,好比開10個任務並非說會開10個線程,這一點任務有點相似線程池,可是任務相比線程池有很小的開銷和精確的控制。編程

咱們用VS裏面的「並行任務」看一看,快捷鍵Ctrl+D,K,或者找到「調試"->"窗口「->"並行任務「,咱們在WaitAll方法處插入一個斷點,最終咱們發現任務確實託管給了線程。api

image

2、初識Task

兩種構建Task的方式,只是StartNew方法直接構建出了一個Task以後又調用了其Start方法。安全

    Task.Factory.StartNew (() =>
    {
        Console.WriteLine("Hello word!");
    });

    Task task = 
new Task
(() =>
    {
        Console.WriteLine("Hello,Word!");
    });
    task.Start();

在Task內部執行的內容咱們稱做爲Task的Body,Task提供了多個初始化重載的方法。架構

public Task(Action action);
public Task(Action<object> action, 
object state
);給action傳參數
public Task(Action action, CancellationToken cancellationToken);
public Task(Action action, TaskCreationOptions creationOptions);

例如使用了重載方法的State參數:函數

    Task task2 = new Task((obj ) => 
     { Console.WriteLine("Message: {0}", obj); }, 
"Say \"Hello\" from task2
");
    task2.Start();

補充細節oop

  在建立Task的時候,Task有不少的構造函數的重載,一個主要的重載就是傳入TaskCreateOptions的枚舉:  spa

  •     TaskCreateOptions.None:用默認的方式建立一個Task
  •     TaskCreateOptions.PreferFairness:請求scheduler儘可能公平的執行Task(後續文章會將是,Task和線程同樣,有優先級的)
  •     TaskCreateOptions.LongRunning:聲明Task將會長時間的運行。
  •     TaskCreateOptions.AttachToParent:由於Task是能夠嵌套的,因此這個枚舉就是把一個子task附加到一個父task中。

3、任務的結果

任務結束時,它能夠把一些有用的狀態信總寫到共享對象中。這個共享對象必須是線程安全的。
另外一個方式是使用返回某個結果的任務。使用Task類的泛型版本,就能夠定義返冋某個結果的任務的返回類型。線程

使用返回值的Result屬性可獲取是在一個Task運行完成纔會獲取的,因此task2是在task1運行完成後,纔開始運行,也就是說上面的兩個result的值無論運行多少次都是不會變的。其中咱們也能夠經過CurrentId來獲取當前運行的Task的編號。

    var loop = 0;
    var task1 = new Task<int>(() => 
    {
        for (var i = 0; i < 1000; i++)
            loop += i;
        return loop;
    });
    task1.Start();           
    var loopResut = task1.Result;
 
    var task2 = new Task<long>(obj=>
    {
        long res = 0;
        var looptimes = (int)obj;
        for (var i = 0; i < looptimes; i++)
            res += i;
        return res;
    },loopResut);
     
    task2.Start();
    var resultTask2 = task2.Result;
 
    Console.WriteLine("任務1的結果':{0}\n任務2的結果:{1}",   loopResut,resultTask2);

 .NET 4.5 :Task.Run

在 .NET Framework 4.5 及更高版本(包括 .NET Core 和 .NET Standard)中,使用靜態 Task.Run 方法做爲 TaskFactory.StartNew 的快捷方式。

Task.Run的跟Task.Factory.StarNew和new Task相差很少,不一樣的是前兩種是放進線程池當即執行,而Task.Run則是等線程池空閒後在執行。

Run方法只接受無參的Action和Func委託,另外兩個接受一個object類型的參數。

 在msdn中TaskFactory.StartNew的備註信息以下:

4、連續任務

所謂的延續的Task就是在第一個Task完成後自動啓動下一個Task。咱們經過ContinueWith方法來建立延續的Task。咱們假設有一個接受xml解析的服務,首先從某個地方接受文件,而後解析入庫,最後發送是否解析正確的回執。在每次調用ContinueWith方法時,每次會把上次Task的引用傳入進來,以便檢測上次Task的狀態,好比咱們可使用上次Task的Result屬性來獲取返回值。

    var ReceiveTask  = new Task(() => ReceiveXml());
    var ResolveTask = ReceiveTask .ContinueWith <bool>((r) => ResolveXml());
    var SendFeedBackTask = ResolveTask.ContinueWith <string>((s) => SendFeedBack(s.Result));
    ReceiveTask.Start();
    Console.WriteLine(SendFeedBackTask.Result);

上面的代碼咱們也能夠這麼寫:

   var SendFeedBackTask = Task.Factory.StartNew(() => ReceiveXml())
                            .ContinueWith<bool>(s => ResolveXml())
                            .ContinueWith<string>(r => SendFeedBack(r.Result));
    Console.WriteLine(SendFeedBackTask.Result);

不管前一個任務是如何結束的,前面的連續任務老是在前一個任務結束時啓動。使用 TaskContinuationOptions枚舉中的值,能夠指定,連續任務只有在起始任務成功(或失敗)結束吋啓動。可能的值是 OnlyOnFaulted、NotOoFaulted、Onl)OnCanceIed、NotOnCanceled 和 OnlyOnRanToCompletion

Task t5 = t1.ContinueWith(DoOnError,
TaskContinuationOptions.OnlyOnFaulted);

5、分離嵌套任務

有些狀況下咱們須要建立嵌套的Task,嵌套裏面又分爲分離的和不分離的。其建立的方式很簡單,就是在Task的body裏面建立一個新的Task。若是新的Task未指定AttachedToParent選項,那麼就是分離嵌套的。咱們看下面這段代碼。下面的代碼中outTask.Wait()表示等待outTask執行完成。

var outTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning...");
    var childTask = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(3000000);
        Console.WriteLine("Detached nested task completed.");
    });
});
outTask.Wait();
Console.WriteLine("Outer task completed.");
Console.ReadKey();

咱們能夠看到運行結果是:

image_thumb[2]

6、子任務

咱們將上面的代碼加上TaskCreationOptions選項:

若是父任務在子任務以前結束,父任務的狀態就顯示爲WaitingForChildrenToComplete。只要子任務也結束時,父任務的狀態就變成RanToCompletion。.、固然,若是父任務用TaskCreatiooOptions 枚舉中的DetachedFromParent建立子任務時,這就無效。

var outTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning...");
    var childTask = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(3000000);
        Console.WriteLine("Detached nested task completed.");
    },TaskCreationOptions.AttachedToParent);
});
outTask.Wait();
Console.WriteLine("Outer task completed.");

看到運行結果:

image_thumb_1[1]

7、取消任務

在4.0中給咱們提供一個「取消標記」叫作CancellationTokenSource.Token在建立task的時候傳入此參數,就能夠將主線程和任務相關聯。咱們經過cancellation的tokens來取消一個Task。

有點要特別注意的,當咱們調用了Cancel()方法以後,.NET Framework不會強制性的去關閉運行的Task。咱們本身必須去檢測以前在建立Task時候傳入的那個CancellationToken。

一旦cancel被調用,task將會拋出OperationCanceledException來中斷此任務的執行,最後將當前task的Status的IsCanceled屬性設爲true。

一、在不少Task的Body裏面包含循環,咱們能夠在輪詢的時候判斷IsCancellationRequested屬性是否爲True,若是是True的話,就能夠中止循環以及釋放資源,同時拋出OperationCanceledException異常出來。

二、或者在任務中設置「取消信號「叫作ThrowIfCancellationRequested,來等待主線程使用Cancel來通知。

三、檢測task是否被cancel就是調用CancellationToken.WaitHandle屬性。CancellationToken的WaitOne()方法會阻止task的運行,只有CancellationToken的cancel()方法被調用後,這種阻止纔會釋放。

var cts = new CancellationTokenSource();

var ct =  cts.Token;

var task = Task.Factory.StartNew(() =>
{
   
for (var i = 0; i < 10000000; i++)
    {
       
if (ct.IsCancellationRequested)
        {
            Console.WriteLine(
"任務開始取消...");
           
throw new OperationCanceledException(ct);
        }
         //或者直接在檢測到異常時,扔出異常:  token.ThrowIfCancellationRequested();
         //或者等待 WaitHandle:           token.WaitHandle.WaitOne();


    }
},ct);//傳入CancellationToken做爲Task第二個參數

ct.Register(()
=>
{
    Console.WriteLine(
"已經取消");
});

Thread.Sleep(
5000);
cts.Cancel();//若是想要取消一個Task的運行,只要調用CancellationToken實例的Cancel()方法就能夠了。

try
{
    task.Wait();
}
catch (AggregateException e)
{
   
foreach (var v in e.InnerExceptions)
        Console.WriteLine(
"msg: " + v.Message);
}

8、 休眠:等待時間執行

在TPL中咱們能夠經過三種方式進行等待,一是經過CancellTaken的WaitHanle進行等待、第二種則是經過傳統的Tread.Sleep方法、第三種則經過Thread.SpainWait方法。

一、CancellToken方式:每次咱們等待十秒鐘以後,再進行下次輸出。

有一點要注意:WaitOne()方法只有在設定的時間間隔到了,或者Cancel方法被調用,此時task纔會被喚醒。若是若是cancel()方法被調用而致使task被喚醒,那麼CancellationToken.WaitHandle.WaitOne()方法就會返回true,若是是由於設定的時間到了而致使task喚醒,那麼CancellationToken.WaitHandle.WaitOne()方法返回false。

    var cts = new CancellationTokenSource();
    var ct = cts.Token;
 
    var task = new Task(() =>
    {
        for (var i = 0; i < 100000; i++)
        {
            var cancelled = ct.WaitHandle.WaitOne(1000 );
            Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled);
            if (cancelled)
            {
                throw new OperationCanceledException(ct);
            }
        }
    }, ct);
    task.Start();

image

二、上面的功能若是咱們要是經過Tread.Sleep方式實現:

            var task = new Task(() =>
            {
                for (var i = 0; i < 100000; i++)
                {
                    Thread.Sleep(10000);
                    var cancelled =ct.IsCancellationRequested; Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled); if (cancelled) { throw new OperationCanceledException(ct); } } },ct);

三、Thread.SpainWait則跟上面兩種方式徹底不一樣,上面的兩種方式都是會在線程調度程序不考慮改線程,直等到運行結束。而Thread.SpainWait的做用實質上會將處理器置於十分緊密的循環中,主要的做用是來實現同步鎖的做用。並不經常使用,大部分狀況下咱們能夠經過Lock的方式來實現。

Thread.SpinWait(10000);

9、等待任務執行

在不少時候咱們也許須要等待同時開啓的幾個線程完成以後再來作其餘事,在TPL中提供了幾種方式來等待任務執行。Task.Wait等待單個任務完成;Task.WaitAll等待全部的Task完成、TaskAny等在其中的任何一個或則多個任務完成。

一、Task.Wait: 等待單獨的一個Task執行完成

共有5個重載:Wait()、Wait(CancellToken)、Wait(Int32)、Wait(TimeSpan)、Wait(TimeSpan、CancellToken)。各個重載方法的含義:

  • 1)Wait():等待整個任務完成或者取消或者出現異常;
  • 2)Wait(CancellToken):等待任務直到CancellToken調用取消或者完成,或者出現異常;
  • 3)Wait(Int32):等待任務,未完成則到指定的時間;
  • 4)Wait(TimeSpan):同上;
  • 5)Wait(TimeSpan、CancellToken):等待任務到指定時間,或者CancellToken調用取消或者任務完成。
static void Main(string[] args)
{
    var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
        Task task = createTask(token,6);    task.Start();
    Console.WriteLine("Wait() complete.");
    task.Wait();
    Console.WriteLine("Task Completed.");
    
    task = createTask(token,3);
    task.Start();
    Console.WriteLine("Wait(2) secs for task to complete.");
    bool completed = task.Wait(2000);
    Console.WriteLine("Wait ended - task completed: {0}", completed);
    
    task = createTask(token,4);
    task.Start();
    Console.WriteLine("Wait(2,token) for task to complete.");
    completed = task.Wait(2000, token);
    Console.WriteLine("Wait ended - task completed: {0} task cancelled {1}",
    completed, task.IsCanceled);
    Console.WriteLine("Main method complete. Press enter to finish.");
    Console.ReadLine();
}
static Task createTask(CancellationToken token,int loop)
{
    return new Task(() =>
    {
        for (int i = 0; i < loop; i++)
        {
            token.ThrowIfCancellationRequested();
            Console.WriteLine("Task - Int value {0}", i);
            token.WaitHandle.WaitOne(1000);
        }
    }, token);
}

循環都會等待1秒鐘,這樣咱們能夠看看Wait(2000)的效果,看看運行後的效果:

201103271456583296[1]

 從上面的例子能夠看出,wait方法子task執行完成以後會返回true。
注意:當在執行的task內部拋出了異常以後,這個異常在調用wait方法時會被再次拋出。後面再"異常處理篇"會講述。

二、Task.WaitAll方法: 等待多個task

是等待全部的任務完成,也有5個重載, 也能夠傳遞時間以及Token參數,進行等待時間以及取消Token的控制。

   var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
    var task1 = createTask(token,2);
    var task2 = createTask(token, 5);
    task1.Start();
    task2.Start();
    Console.WriteLine("Waiting for tasks to complete.");
    Task.WaitAll(task1, task2);
    Console.WriteLine("Tasks Completed.");

201103271456594866[1]

注意:若是在等在的多個task之中,有一個task拋出了異常,那麼調用WaitAll()方法時就會拋出異常。

ContinueWith結合WaitAll來玩一把

    當這二者結合起來,咱們就能夠玩一些複雜一點的東西,好比說如今有4個任務,其中t1須要串行,t2-t3能夠並行,t4須要串行.

image

ConcurrentStack<int> stack = new ConcurrentStack<int>();

//t1先執行
var t1 = Task.Factory.StartNew(() =>
{
    stack.Push(1);
    stack.Push(2);
});

//t2,t3並行執行
var t2 = t1.
ContinueWith
(t =>
{
    int result;

    stack.TryPop(out result);
});

//t2,t3並行執行
var t3 = t1.
ContinueWith
(t =>
{
    int result;

    stack.TryPop(out result);
});

//等待t2和t3執行完
Task.WaitAll(t2, t3);

 //t4z再執行
var t4 = Task.Factory.StartNew(() =>
{
    Console.WriteLine("當前集合元素個數:" + stack.Count);
});

三、Task.WaitAny

等待任何一個任務完成,完成以後返回其完成的任務的Index:

var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
    var task1 = createTask(token,2);
    var task2 = createTask(token, 5);
    task1.Start();
    task2.Start();
    Console.WriteLine("Waiting for tasks to complete.");
    var 
index
 = Task.WaitAny(task1, task2);
    Console.WriteLine("Tasks Completed.Index is {0}",index);

image

10、異常處理

在TPL中,異常的觸發器主要是這幾個:
Task.Wait(), Task.WaitAll(), Task,WaitAny(),Task.Result。而在TPL出現的異常都會以AggregateException的示例拋出,咱們在進行基本的異常處理時,能夠經過查看AggregateException的InnerExceptions來進行內部異常的捕獲:

var tokenSource = new CancellationTokenSource();
    var token = tokenSource.Token;
    var task1 = new Task(() =>
    {
        throw new NullReferenceException() { 
Source
="task1"};
    });
    var task2 = new Task(() =>
    {
        throw new ArgumentNullException("a", "a para can not be null") { Source="task2"};
    });
    task1.Start(); task2.Start(); 
    try
    {
        Task.WaitAll(task1, task2);
    }
    catch(AggregateException ex)
    {
        foreach (Exception inner in ex.InnerExceptions)
        {
            Console.WriteLine("Exception type {0} from {1}",
            inner.GetType(), inner.Source);
        }
    }

同時,咱們還能夠經過Task的幾個屬性來判斷Task的狀態,如:IsCompleted, IsFaulted, IsCancelled,Exception。

另外,AggregateException中還提供了Handle方法來給咱們方法來給咱們處理每一個內部 異常,每一個異常發生時都會調用Handle傳入的delegate ,同時咱們須要經過返回True,False來告訴異常是否已經被處理,好比對於OperationCanceledException咱們知道是取消了Task,是確定能夠處理的:

try
    {
        Task.WaitAll(task1, task2, task3, task4);
    }
    catch(AggregateException ex)
    {
        ex.Handle((e) =>
        {
            if (e is OperationCanceledException)
            {
                return true;
            }
            else
            {
                return false;
            }
        });
        
    }

11、執行晚加載的Task(Lazily Task)

晚加載,或者又名延遲初始化,主要的好處就是避免沒必要要的系統開銷。在並行編程中,能夠聯合使用Lazy變量和Task<>.Factory.StartNew()作到這點。(Lazy變量時.NET 4中的一個新特性,這裏你們不用知道Lazy的具體細節)。

  Lazy變量只有在用到的時候纔會被初始化。因此咱們能夠把Lazy變量和task的建立結合:只有這個task要被執行的時候纔去初始化。

// do the same thing in a single statement
            Lazy<Task<string>> lazyData2 = new Lazy<Task<string>>(
            () => Task<string>.Factory.StartNew(() =>
            {
                Console.WriteLine("Task body working...");
                return "Task Result";
            }));

            Console.WriteLine("Calling second lazy variable");
            Console.WriteLine("Result from task: {0}", lazyData2.Value.Result);

 首先咱們回想一下,在以前的系列文章中咱們是怎麼定義一個task的:直接new,或者經過task的factory來建立,由於建立task的代碼是在main函數中的,因此只要new了一個task,那麼這個task就被初始化。如今若是用了Lazy的task,那麼如今咱們初始化的就是那個Lazy變量了,而沒有初始化task,(初始化Lazy變量的開銷小於初始化task),只有當調用了lazyData.Value時,Lazy變量中包含的那個task纔會初始化。(這裏歡迎你們提出本身的理解)

相關文章
相關標籤/搜索