實踐基於Task的異步模式

Await

返回該系列目錄《基於Task的異步模式--全面介紹》


 

     在API級別,實現沒有阻塞的等待的方法是提供callback(回調函數)。對於Tasks來講,這是經過像ContinueWith的方法實現的。基於語言的異步支持經過容許在正常控制流內部等待異步操做隱藏callbacks,具備和編譯器生成的代碼相同的API級別的支持。html

    在.Net 4.5,C#直接異步地支持等待的Task和Task<TResult>,在C#中使用"await"關鍵字。若是等待一個Task,那麼await表達式是void類型。若是等待一個Task<TResult>,那麼await表達式是TResult類型。await表達式必須出如今返回類型是void,Task,Task<TResult>的異步方法體內。web

   在幕後,await功能經過在該Task上的一個continuation(後續操做)安裝了一個callback(回調函數)。此回調將恢復暫停的異步方法。當該異步方法恢復時,若是等待的異步操做成功地完成,而且它是一個Task<TResult>,那它就返回一個TResult。若是等待的Task或者Task<TResult>以Canceled狀態結束,那麼會引起OperationCanceledException。若是等待的Task或者Task<TResult>以Faulted狀態結束,那麼會引起形成它失敗的異常。對於一個Task來講,可能因爲多個異常形成失敗,在這種狀況下,這些異常只有一個會傳遞。然而,該Task的Exception屬性會返回一個包含全部錯誤的AggregateException。設計模式

若是一個同步上下文和執行在掛起狀態的異步方法相關聯(如SynchronizationContext.Current是非空的),那麼異步方法的恢復會經過使用該下上文的Post方法發生在相同的同步上下文上。不然,它會依賴當前在掛起點的System.Threading.Tasks.TaskScheduler是什麼(針對.Net線程池,通常這個默認是TaskScheduler.Default)。它取決於該TaskScheduler是否容許恢復在等待異步操做完成的地方或者強制恢復被調度的地方執行。默認的調度通常會容許後續操做在等待的操做完成的線程上執行。緩存

當調用的時候,異步方法同步地執行方法體,直到遇到在尚未完成的等待的實例上的第一個await表達式,此時控制權返回給調用者。若是該異步方法沒有返回void,那麼就會返回Task或Task<TResult>表示正在進行的計算。在一個非void的異步方法中,若是遇到了return語句或者到達了方法體的末尾,那麼該task會以RanToCompletion的最終狀態完成。若是一個未處理的異常形成控制權離開了異步方法體,該任務會以Faulted狀態結束(若是異常是OperationCanceledException,任務會以Canceled狀態結束)。結果或異常最終以這種方式發佈。數據結構

Yield 和 ConfigureAwait

一些成員對異步方法的執行提供了更多的控制。Task類提供了一個Yield方法,可使用它把一個屈服點(yield point)引入異步方法。併發

public class Task : … 異步

{ async

public static YieldAwaitable Yield(); 函數

post

}

這個等價於異步地推送(post)或調度回到當前的上下文。

Task.Run(async delegate

{

for(int i=0; i<1000000; i++)

{

await Task.Yield(); // fork the continuation into a separate work item

...

}
});

Task類也提供了ConfigureAwait方法更多的控制在異步方法中如何發生掛起和恢復。正如以前提到的,默認異步方法掛起時,當前上下文被捕獲,用捕獲的上下文在恢復時調用異步方法的後續操做。在許多案例中,這是你想要的確切的行爲。然而,在某些狀況下你不關心你在哪裏結束,結果你能夠經過避免這些返回到原始上下文的posts來實現更好的性能。爲了開啓這個,可使用ConfigureAwait通知await操做不捕獲和恢復上下文,而是更傾向於,不管異步操做在哪兒等待完成,都繼續執行:

await someTask.ConfigureAwait(continueOnCapturedContext:false);

Cancellation(撤銷)

支持可撤銷的TAP方法都至少公開了接受一個CancellationToken的重載,該類型在是在.Net 4的System.Threading中引入的。

CancellationToken是經過CancellationTokenSource建立的。當CancellationTokenSource的Cancel方法調用時,它的Token屬性會返回接收到信號的CancellationToken。好比,思考一下,下載一個單獨的web頁面,而後想要取消該操做。咱們建立一個CancellationTokenSource,再把它的token傳給TAP方法,之後會可能調用它的Cancel方法:

var cts = new CancellationTokenSource();

string result = await DownloadStringAsync(url, cts.Token);

// at some point later, potentially on another thread

cts.Cancel();

爲了取消多個異步的調用,能夠將相同的token傳給多有的調用:

var cts = new CancellationTokenSource();

IList<string> results = await Task.WhenAll(

from url in urls select DownloadStringAsync(url, cts.Token));

cts.Cancel();

相似地,相同的token也能夠有選擇性地傳給異步操做的子集:

var cts = new CancellationTokenSource();

byte [] data = await DownloadDataAsync(url, cts.Token);

await SaveToDiskAsync(outputPath, data, CancellationToken.None);

cts.Cancel();

來自任何線程的Cancellation請求均可以被初始化。

爲了代表cancellation請求永遠不會發出,CancellationToken.None能夠傳給任何接受一個CancellationToken的方法。被調用者會發現cancellationToken的CanBeCanceled屬性會返回false,所以它起到了優化。

相同的CancellationToken能夠分發給任何數量的異步和同步操做。這是CancellationToken 方法強項之一:cancellation可能使同步方法的調用請求的,而且相同的cancellation請求可能激增到任何數量的監聽器。另外一個好處是異步API的開發者能夠徹底地控制cancellation是否可能被請求以及cancellation什麼時候生效,還有該API的消費者能夠選擇性地決定多個異步調用的cancellation請求中哪個會被傳播。

Progress

一些異步方法經過把progress(進度)接口傳給該異步方法來公開進度。好比,思考一下異步下載一個文本字符串的函數,而後會引起包含至今已完成下載的百分比的進度的更新。下面的WPF應用的一個例子用到了這樣一個方法:

private async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.IsEnabled = false;

try

{

txtResult.Text = await DownloadStringAsync(txtUrl.Text,

new Progress<int>(p => pbDownloadProgress.Value = p));

}

finally { btnDownload.IsEnabled = true; }

}

 

使用內置的基於Task的鏈接器

System.Threading.Tasks命名空間包含了幾種處理和組合tasks的主要方法。

Task.Run

Task類公開了幾種Run方法,它們能夠輕易地做爲線程池的Task或Task<TResult>進行卸載工做,如:

public async void button1_Click(object sender, EventArgs e)

{

textBox1.Text = await Task.Run(() =>

{

// … 這裏處理一些計算受限的任務

return answer;

});

}

其中一些Run方法是自從.Net 4 就存在的TaskFactory.StartNew方法的簡寫。然而,其餘的重載(如Run<TResult>(Func<Task<TResult>>))能夠在卸載工做中使用await關鍵字,如:

public async void button1_Click(object sender, EventArgs e)

{

pictureBox1.Image = await Task.Run(() =>

{

using(Bitmap bmp1 = await DownloadFirstImageAsync())

using(Bitmap bmp2 = await DownloadSecondImageAsync())

return Mashup(bmp1, bmp2);

});

}

Task.FromResult

對於已是可以使用的或只須要從返回task的方法返回的數據提高到Task<TResult>的場景,可使用Task.FromResult方法:

public Task<int>GetValueAsync(string key)

{

int cachedValue;

return TryGetCachedValue(out cachedValue) ?

Task.FromResult(cachedValue) :

GetValueAsyncInternal();

}

private async Task<int> GetValueAsyncInternal(string key)

{

}

Task.WhenAll

WhenAll方法用於異步等待多個表明Tasks的異步操做。爲了適應一系列的非泛型的tasks或者一系列不均勻的泛型tasks(例如,異步等待多個返回void的操做,或者異步等待多個返回值類型的方法,每一個值能夠是不一樣的類型)以及一系列均勻 的泛型tasks(如,異步等待多個返回TResult的方法)。

思考給多個顧客發送郵件的需求。咱們能夠重疊全部的郵件發送(在發送下一封郵件以前不須要等待前一封郵件已經完成發送),而後咱們須要知道發送什麼時候完成和是否有錯誤發生。

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);

await Task.WhenAll(asyncOps);

上面的代碼沒有顯示地處理可能發生的異常,反而選擇讓異常在WhenAll產生的task上的await傳播出來。爲了處理這些異常,開發者可使用像下面這樣的代碼:

IEnumerable<Task> asyncOps = from addr in addrs select SendMailAsync(addr);

try

{

await Task.WhenAll(asyncOps);

}

catch(Exception exc)

{

...

}

思考另外一個從web上異步地下載多個文件的例子。在這種狀況下,全部的異步操做有同類的結果類型,而且這些結果的訪問很簡單:

string [] pages = await Task.WhenAll(

from url in urls select DownloadStringAsync(url));

做爲以前返回void的案例,這裏可使用相同的異步處理技巧:

Task [] asyncOps =

(from url in urls select DownloadStringAsync(url)).ToArray();

try

{

string [] pages = await Task.WhenAll(asyncOps);

...

}

catch(Exception exc)

{

foreach(Task<string> faulted in asyncOps.Where(t => t.IsFaulted))

{

// work with faulted and faulted.Exception

}

}

Task.WhenAny

WhenAny API異步地等待表示多個異步的操做,而且只是異步地等待它們中的一個完成。WhenAny主要有四種用法:

  1. 冗餘。屢次執行一個操做,而且選擇最早完成的那個(例如,聯繫多個會產生惟一結果的股票報價的Web服務並選擇完成最快的那個)。
  2. 交叉。啓動多個操做,並須要它們都要完成,可是當它們完成時再處理。
  3. 調節。當其它操做完成時容許多餘的操做開始。這是交叉狀況的擴展。
  4. 提前應急。t1表明的操做能夠組合進伴有另外一個task t2的WhenAny,而後咱們能夠等待WhenAny task。t2能夠表明一個超時,撤銷或某些其餘的信號來使WhenAny在t1完成以前完成。

冗餘

思考一下,咱們是否買一隻股票的決定的狀況。咱們有多個咱們信賴的股票推薦Web服務,可是基於每日負荷,每一種服務可能終將在不一樣的時間變得至關緩慢。咱們能夠發揮WhenAny的優點來知曉任意一個操做什麼時候完成:

var recommendations = new List<Task<bool>>()

{

GetBuyRecommendation1Async(symbol),

GetBuyRecommendation2Async(symbol),

GetBuyRecommendation3Async(symbol)

};

Task<bool> recommendation = await Task.WhenAny(recommendations);

if (await recommendation) BuyStock(symbol);

不像WhenAll在全部的tasks成功完成的狀況下返回它們爲包裝的結果集,WhenAny返回完成的Task:若是一個task失敗了,知道哪個task失敗很重要;若是一個task成功了,知道返回的值和哪個task相關很重要。由於這個緣由,咱們須要訪問返回的task的Result屬性,或者進一步等待它直到它完成。

自從有了WhenAll,咱們須要可以適應異常狀況。因爲已經接收到了返回的已完成的task,爲了讓錯誤傳播,咱們能夠等待返回的task,而後適當地try/catch,例如:

Task<bool> [] recommendations = …;

while(recommendations.Count > 0)

{

Task<bool> recommendation = await Task.WhenAny(recommendations);

try

{

if (await recommendation) BuyStock(symbol);

break;

}

catch(WebException exc)

{

recommendations.Remove(recommendation);

}

}

除此以外,即便第一個task成功完成了,隨後 的task也可能失敗。此時,咱們有多個處理這些異常的選擇。一種用例可能要求直到全部啓動的tasks已經完成才作進一步的向前進展,在這種狀況咱們可使用WhenAll。另外一種用例要求全部的異常必需要記錄。對於這個,當tasks已經異步完成時,咱們能夠利用後續操做直接接收一個通知:

foreach(Task recommendation in recommendations)

{

var ignored = recommendation.ContinueWith(

t => { if (t.IsFaulted) Log(t.Exception); });

}

或者

foreach(Task recommendation in recommendations)

{

var ignored = recommendation.ContinueWith(

t => Log(t.Exception), TaskContinuationOptions.OnlyOnFaulted);

}

或者

private static async void LogCompletionIfFailed(IEnumerable<Task> tasks)

{

foreach(var task in tasks)

{

try { await task; }

catch(Exception exc) { Log(exc); }

}
}

LogCompletionIfFailed(recommendations);

最後,開發者可能實際想要取消全部的保留的操做。

var cts = new CancellationTokenSource();

var recommendations = new List<Task<bool>>()

{

GetBuyRecommendation1Async(symbol, cts.Token),

GetBuyRecommendation2Async(symbol, cts.Token),

GetBuyRecommendation3Async(symbol, cts.Token)

};

 

Task<bool> recommendation = await Task.WhenAny(recommendations);

cts.Cancel();

if (await recommendation) BuyStock(symbol);

交叉

思考這樣一個狀況,從Web下載圖片,並對每張圖片作一些處理,例如把它加到UI控件上去。咱們須要按順序處理(在UI控件的例子中,在UI線程上),但咱們想要儘量地併發下載。而且咱們不想全部圖片都下載完畢後再加載到UI上,而是當它們完成時就加載它們:

List<Task<Bitmap>> imageTasks =

(from imageUrl in urls select GetBitmapAsync(imageUrl)).ToList();

while(imageTasks.Count > 0)

{

try

{

Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

imageTasks.Remove(imageTask);

 

Bitmap image = await imageTask;

panel.AddImage(image);

}

catch{}

}

那相同的交叉能夠應用到涉及下載以及在下載的圖片上的線程池的進行計算密集的處理的場景上,例如:

List<Task<Bitmap>> imageTasks =

(from imageUrl in urls select GetBitmapAsync(imageUrl)

.ContinueWith(t => ConvertImage(t.Result)).ToList();

while(imageTasks.Count > 0)

{

try

{

Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

imageTasks.Remove(imageTask);

 

Bitmap image = await imageTask;

panel.AddImage(image);

}

catch{}

}

調節

思考和交叉例子相同的案例,除了用戶正在下載不少圖片,這些下載須要顯示調節,例如,只有15個下載可能同時發生。爲實現這個,異步操做的子集可能會被調用。當操做完成的時候,其餘的操做會取而代之被調用。

const int CONCURRENCY_LEVEL = 15;

Uri [] urls = …;

int nextIndex = 0;

var imageTasks = new List<Task<Bitmap>>();

while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length)

{

imageTasks.Add(GetBitmapAsync(urls[nextIndex]));

nextIndex++;

}

 

while(imageTasks.Count > 0)

{

try

{

Task<Bitmap> imageTask = await Task.WhenAny(imageTasks);

imageTasks.Remove(imageTask);

 

Bitmap image = await imageTask;

panel.AddImage(image);

}

catch(Exception exc) { Log(exc); }

 

if (nextIndex < urls.Length)

{

imageTasks.Add(GetBitmapAsync(urls[nextIndex]));

nextIndex++;

}

}

提前應急

思考當異步等待一個操做完成的同時,又要響應一個用戶的撤銷請求(例如,點擊UI上的一個取消按鈕)。

private CancellationTokenSource m_cts;

 

public void btnCancel_Click(object sender, EventArgs e)

{

if (m_cts != null) m_cts.Cancel();

}

 

public async void btnRun_Click(object sender, EventArgs e)

{

m_cts = new CancellationTokenSource();

btnRun.Enabled = false;

try

{

Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text);

await UntilCompletionOrCancellation(imageDownload, m_cts.Token);

if (imageDownload.IsCompleted)

{

Bitmap image = await imageDownload;

panel.AddImage(image);

}

else imageDownload.ContinueWith(t => Log(t));

}

finally { btnRun.Enabled = true; }

}

 

private static async Task UntilCompletionOrCancellation(

Task asyncOp, CancellationToken ct)

{

var tcs = new TaskCompletionSource<bool>();

using(ct.Register(() => tcs.TrySetResult(true)))

await Task.WhenAny(asyncOp, tcs.Task);

return asyncOp;

}

咱們一決定拯救而不取消隱含的異步操做,這個實現就會再次開啓UI。當咱們決定拯救的時候,另外一個選擇會取消掛起的操做,然而直到操做最終完成纔會重建UI,這多是由於早期因爲取消請求結束形成的:

private CancellationTokenSource m_cts;

 

public async void btnRun_Click(object sender, EventArgs e)

{

m_cts = new CancellationTokenSource();

 

btnRun.Enabled = false;

try

{

Task<Bitmap> imageDownload = GetBitmapAsync(txtUrl.Text, m_cts.Token);

await UntilCompletionOrCancellation(imageDownload, m_cts.Token);

Bitmap image = await imageDownload;

panel.AddImage(image);

}

catch(OperationCanceledException) {}

finally { btnRun.Enabled = true; }

}

使用 WhenAny 爲早期提供緊急援助的另外一個例子涉及到Task.WhenAny 與 Task.Delay 的一塊兒使用。

Task.Delay

以前演示過,能夠經過調用Task.Delay把中斷引入一個異步方法的執行當中。這對於各類各樣的功能是有用的,包括構建輪詢,對於一個預約時間內用戶輸入處理的延遲等諸如此類。在聯合Task.WhenAny對await實現超時也是有用的。

若是一個task是一個更大的異步操做(如,一個ASP.Net Web服務)的一部分,花費太長時間完成了,那麼總體的操做就會變差,尤爲是若是該操做曾經沒有完成的話。爲此,可以在一個異步操做上超時等待很重要。同步的Task.Wait, WaitAll, 和WaitAny方法接收超時值,但相應的 ContinueWhenAll/Any和上述的 WhenAll/WhenAny APIs不這樣作。相反,Task.Delay和Task.WhenAny能夠聯合使用來實現超時。

思考一個UI應用,該應用想下載一張圖片,而且當圖片在下載的過程當中時,UI不可用。然而,若是下載須要花很長時間,那麼該UI應該可用而且下載的操做應該放棄。

public async void btnDownload_Click(object sender, EventArgs e)

{

btnDownload.Enabled = false;

try

{

Task<Bitmap> download = GetBitmapAsync(url);

if (download == await Task.WhenAny(download, Task.Delay(3000)))

{

Bitmap bmp = await download;

pictureBox.Image = bmp;

status.Text = "Downloaded";

}

else

{

pictureBox.Image = null;

status.Text = "Timed out";

var ignored = download.ContinueWith(

t => Trace("Task finally completed"));

}

}

finally { btnDownload.Enabled = true; }

}

既然WhenAll返回一個task,那麼同樣能夠用到多個下載上:

public async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.Enabled = false;

try

{

Task<Bitmap[]> downloads =

Task.WhenAll(from url in urls select GetBitmapAsync(url));

if (downloads == await Task.WhenAny(downloads, Task.Delay(3000)))

{

foreach(var bmp in downloads) panel.AddImage(bmp);

status.Text = "Downloaded";

}

else

{

status.Text = "Timed out";

downloads.ContinueWith(t => Log(t));

}

}

finally { btnDownload.Enabled = true; }

}

 

構建基於Task的鏈接器

因爲一個task徹底有能力表示一個異步操做,並提供同步和異步鏈接,檢索該操做等的能力,於是構建有用的組合建立更大模式的tasks的"鏈接器"庫成爲可能。該文章先前提到過,.Net Framework包括了多個內置的鏈接器,然而,也可能和期待開發者建立他們本身的。這裏咱們提供幾個可能會用到的鏈接器方法和類型的例子。

RetryOnFault(錯誤重試)

在不少場合,若是一個以前的嘗試操做失敗了,很渴望重試一下該操做。對於同步代碼來講,咱們能夠構建一個幫助方法來完成這個:

public static T RetryOnFault<T>(

Func<T> function, int maxTries)

{

for(int i=0; i<maxTries; i++)

{

try { return function(); }

catch { if (i == maxTries-1) throw; }

}

return default(T);

}

咱們能夠構建一個幾乎徹底同樣的幫助方法,可是針對使用TAP實現的異步操做的,於是返回tasks:

public static async Task<T> RetryOnFault<T>(

Func<Task<T>> function, int maxTries)

{

for(int i=0; i<maxTries; i++)

{

try { return await function().ConfigureAwait(false); }

catch { if (i == maxTries-1) throw; }

}

return default(T);

}

有了本身的函數,咱們如今能夠利用此鏈接器將重試編碼到應用邏輯中,如:

// Download the URL, trying up to three times in case of failure

string pageContents = await RetryOnFault(

() => DownloadStringAsync(url), 3);

RetryOnFault函數能夠進一步擴展,例如,爲了決定什麼時候重試更好,能夠接受在重試操做之間的另外一個Func<Task>:

public static async Task<T> RetryOnFault<T>(

Func<Task<T>> function, int maxTries, Func<Task> retryWhen)

{

for(int i=0; i<maxTries; i++)

{

try { return await function(); }

catch { if (i == maxTries-1) throw; }

await retryWhen().ConfigureAwait(false);

}

return default(T);

}

此後可使用像下面的代碼在重試以前再等待一秒:

// Download the URL, trying up to three times in case of failure,

// and delaying for a second between retries

string pageContents = await RetryOnFault(

() => DownloadStringAsync(url), 3, () => Task.Delay(1000));

 

NeedOnlyOne(只須要一個)

有時發揮冗餘的優點能夠提升操做的延遲和成功的機會。思考一下,有多個提供股票報價的Web服務,但在當天的不一樣時間,每個服務可能提供不一樣級別的數量和響應時間。爲處理這些狀況,咱們能夠向全部Web服務發送請求,只要得到了任何響應就取消其餘請求。咱們能夠實現一個函數來簡化這個啓動多個操做,等待任意一個,而後取消其他請求的通用模式:

public static async Task<T> NeedOnlyOne(

params Func<CancellationToken,Task<T>> [] functions)

{

var cts = new CancellationTokenSource();

var tasks = (from function in functions

select function(cts.Token)).ToArray();

var completed = await Task.WhenAny(tasks).ConfigureAwait(false);

cts.Cancel();

foreach(var task in tasks)

{

var ignored = task.ContinueWith(

t => Log(t), TaskContinuationOptions.OnlyOnFaulted);

}

return completed;

}

而後可使用該函數來實現咱們的例子:

double currentPrice = await NeedOnlyOne(

ct => GetCurrentPriceFromServer1Async("msft", ct),

ct => GetCurrentPriceFromServer2Async("msft", ct),

ct => GetCurrentPriceFromServer3Async("msft", ct));

Interleaved(交錯)

當使用很是大的tasks集合時,使用Task.WhenAnyl來支持一個該交錯的場景會有一個潛在的性能問題。WhenAny的每次調用會致使每個task註冊一個後續操做,對於N個tasks的調用會在交錯的操做的生命週期內產生O(N2)數量級的後續操做。爲了解決這個問題,一種方法是使用專一於目標的組合:

static IEnumerable<Task<T>> Interleaved<T>(IEnumerable<Task<T>> tasks)

{

var inputTasks = tasks.ToList();

var sources = (from _ in Enumerable.Range(0, inputTasks.Count)

select new TaskCompletionSource<T>()).ToList();

int nextTaskIndex = -1;

foreach (var inputTask in inputTasks)

{

inputTask.ContinueWith(completed =>

{

var source = sources[Interlocked.Increment(ref nextTaskIndex)];

if (completed.IsFaulted)

source.TrySetException(completed.Exception.InnerExceptions);

else if (completed.IsCanceled)

source.TrySetCanceled();

else

source.TrySetResult(completed.Result);

}, CancellationToken.None,

TaskContinuationOptions.ExecuteSynchronously,

TaskScheduler.Default);

}

return from source in sources

select source.Task;

}

當tasks完成以後,可使用這個能夠處理tasks的結果,如:

IEnumerable<Task<int>> tasks = ...;

foreach(var task in tasks)

{

int result = await task;

}

WhenAllOrFirstException

在肯定的分散/集中場合,可能會想要等待全部的tasks,除非它們中有錯誤,在這種狀況下,只要一出現異常,你就想要中止等待。咱們也可使用鏈接器方法來完成,例如:

public static Task<T[]> WhenAllOrFirstException<T>(IEnumerable<Task<T>> tasks)
{

    var inputs = tasks.ToList();

    var ce = new CountdownEvent(inputs.Count);

    var tcs = new TaskCompletionSource<T[]>();


    Action<Task> onCompleted = (Task completed) =>
    {
        if (completed.IsFaulted)
            tcs.TrySetException(completed.Exception.InnerExceptions);
        if (ce.Signal() && !tcs.Task.IsCompleted)
            tcs.TrySetResult(inputs.Select(t => t.Result).ToArray());
    };

    foreach (var t in inputs) t.ContinueWith(onCompleted);

    return tcs.Task;
}

構建基於Task的數據結構

除了構建自定義的基於task的鏈接器的能力以外,在Task和表明異步操做結果以及鏈接必要的同步的Task<TResult>中使用數據結構可使它成爲很是強大的類型,在此類型上構建的自定義的數據結構能夠用在異步情景中。

AsyncCache(異步緩存)

Task一個重要的方面是它能夠提供給多個消費者,全部的消費者能夠等待它,用它註冊後續操做,得到結果(Task<TResult>的場合)或異常等等。這使得Task和Task<TResult>完美地集成到了異步緩存基礎設施中。這兒是一個小而有力的構建在Task<TResult>之上的異步緩存:

public class AsyncCache<TKey, TValue>

{

private readonly Func<TKey, Task<TValue>> _valueFactory;

private readonly ConcurrentDictionary<TKey, Lazy<Task<TValue>>> _map;

 

public AsyncCache(Func<TKey, Task<TValue>> valueFactory)

{

if (valueFactory == null) throw new ArgumentNullException("loader");

_valueFactory = valueFactory;

_map = new ConcurrentDictionary<TKey, Lazy<Task<TValue>>>();

}

 

public Task<TValue> this[TKey key]

{

get

{

if (key == null) throw new ArgumentNullException("key");

return _map.GetOrAdd(key, toAdd =>

new Lazy<Task<TValue>>(() => _valueFactory(toAdd))).Value;

}

}

}

 

AsyncCache<TKey,TValue>類的構造函數接受一個TKey做爲參數,返回Task<TValue>的方法委託。以前從這個cache訪問到的任何值都存儲在內部的字典中,同時AsyncCache確保每一個key只生成一個task,即便併發訪問cache。

使用這個,咱們能夠構建一個下載web頁面的cache,如:

private AsyncCache<string,string> m_webPages =

new AsyncCache<string,string>(DownloadStringAsync);

如今,不管什麼時候咱們須要web頁面的內容,均可以在異步方法中使用這個,而且AsyncCache會確保咱們儘量同時下載多個頁面,緩存該結果。

private async void btnDownload_Click(object sender, RoutedEventArgs e)

{

btnDownload.IsEnabled = false;

try

{

txtContents.Text = await m_webPages["http://www.microsoft.com"];

}

finally { btnDownload.IsEnabled = true; }

}

AsyncProducerConsumerCollection

Tasks也用來構建數據結構來協調多個異步活動。思考一個典型的平行設計模式:生產者/消費者,生產者生成消費者消費的數據,而且生產者和消費者可能併發運行(例如,消費者處理生產者以前生產的item1時,生產者在生產item2).對於生產者/消費者,咱們始終須要一些數據結構存儲生產者建立的任務,爲的是能夠告知消費者新數據且當它可用時發現它。

這裏有一個簡單的構建於tasks之上的數據結構的例子,它使異步方法用做生產者和消費者:

public class AsyncProducerConsumerCollection<T>

{

private readonly Queue<T> m_collection = new Queue<T>();

private readonly Queue<TaskCompletionSource<T>> m_waiting =

new Queue<TaskCompletionSource<T>>();

 

public void Add(T item)

{

TaskCompletionSource<T> tcs = null;

lock (m_collection)

{

if (m_waiting.Count > 0) tcs = m_waiting.Dequeue();

else m_collection.Enqueue(item);

}

if (tcs != null) tcs.TrySetResult(item);

}

 

public Task<T> Take()

{

lock (m_collection)

{

if (m_collection.Count > 0)

{

return Task.FromResult(m_collection.Dequeue());

}

else

{

var tcs = new TaskCompletionSource<T>();

m_waiting.Enqueue(tcs);

return tcs.Task;

}

}

}

}

如今咱們能夠像下面同樣寫代碼了:

private static AsyncProducerConsumerCollection<int> m_data = …;

private static async Task ConsumerAsync()

{

while(true)

{

int nextItem = await m_data.Take();

ProcessNextItem(nextItem);

}

}

private static void Produce(int data)

{

m_data.Add(data);
}

 

返回該系列目錄《基於Task的異步模式--全面介紹》

相關文章
相關標籤/搜索