在使用異步方法中最好不要使用void當作返回值,無返回值也應使用Task做爲返回值,由於使用void做爲返回值具備如下缺點ios
- 沒法得知異步函數的狀態機在何時執行完畢
- 若是異步函數中出現異常,則會致使進程崩潰
❌異步函數不該該返回voidgit
static void Main(string[] args) { try { // 若是Run方法無異常正常執行,那麼程序沒法得知其狀態機何時執行完畢 Run(); } catch (Exception ex) { Console.WriteLine(ex.Message); } Console.Read(); } static async void Run() { // 因爲方法返回的爲void,因此在調用此方法時沒法捕捉異常,使得進程崩潰 throw new Exception("異常了"); await Task.Run(() => { }); }
☑️應該將異步函數返回Taskgithub
static async Task Main(string[] args) { try { // 由於在此進行await,因此主程序知道何時狀態機執行完成 await RunAsync(); Console.Read(); } catch (Exception ex) { Console.WriteLine(ex.Message); } } static async Task RunAsync() { // 由於此異步方法返回的爲Task,因此此異常能夠被捕捉 throw new Exception("異常了"); await Task.Run(() => { }); }
注:事件是一個例外,異步事件也是返回void編程
對於一些預先知道的結果或者只是一個簡單的計算函數,使用Task,FromResult要比Task.Run性能要好,由於Task.FromResult只是建立了一個包裝已計算任務的任務,而Task.Run會將一個工做項在線程池進行排隊,計算,返回.而且使用Task.FromResult在具備SynchronizationContext 程序中(例如WinForm)調用Result或wait()並不會死鎖(雖然並不建議這麼幹)c#
❌對於預計算或普通計算的函數不該該這麼寫api
public async Task<int> RunAsync() { return await Task.Run(()=>1+1); }
☑️而應該使用Task.FromResult代替緩存
public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
還有另一種代替方法,那就是使用ValueTask
☑️使用ValueTask
static async Task Main(string[] args) { await AddAsync(1, 1); } static ValueTask<int> AddAsync(int a, int b) { // 返回一個可被等待的ValueTask類型 return new ValueTask<int>(a + b); }
注: ValueTask
結構是C#7.0加入的,存在於Sysntem,Threading.Task.Extensions包中async
長時間運行的工做是指在應用程序生命週期執行後臺工做的線程,如:執行processing queue items,執行sleeping,執行waiting或者處理某些數據,此類線程不建議使用Task.Run方法執行,由於Task.Run方法是將任務在線程池內進行排隊執行,若是線程池線程進行長時間堵塞,會致使線程池增加,進而浪費性能,因此若是想要運行長時間的工做建議直接建立一個新線程進行工做
❌下面這個例子就利用了線程池執行長時間的阻塞工做
public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection<Message>(); public void StartProcessing() { Task.Run(ProcessQueue); } public void Enqueue(Message message) { _messageQueue.Add(message); } private void ProcessQueue() { foreach (var item in _messageQueue.GetConsumingEnumerable()) { ProcessItem(item); } } private void ProcessItem(Message message) { } }
☑️因此應該改爲這樣
public class QueueProcessor { private readonly BlockingCollection<Message> _messageQueue = new BlockingCollection<Message>(); public void StartProcessing() { var thread = new Thread(ProcessQueue) { // 設置線程爲背後線程,使得在主線程結束時此線程也會自動結束 IsBackground = true }; thread.Start(); } public void Enqueue(Message message) { _messageQueue.Add(message); } private void ProcessQueue() { foreach (var item in _messageQueue.GetConsumingEnumerable()) { ProcessItem(item); } } private void ProcessItem(Message message) { } }
🔔線程池內線程增長會致使在執行時大量的進行上下文切換,從而浪費程序的總體性能, 線程池詳細信息請參考CLR第27章
🔔Task.Factory.StartNew方法中有一個TaskCreationOptions參數重載,若是設置爲LongRunning,則會建立一個新線程執行
// 此方法會建立一個新線程進行執行 Task.Factory.StartNew(() => { }, TaskCreationOptions.LongRunning);
使用Task.Result和Task.Wait()兩個方法進行阻塞異步同步化比直接同步方法阻塞還要MUCH worse(更糟),這種方式被稱爲Sync over async 此方式操做步驟以下
1.異步線程啓動
2.調用線程調用Result或者Wait()進行阻塞
3.異步完成時,將一個延續代碼調度到線程池,恢復等待該操做的代碼
雖然看起來並無什麼關係,可是其實這裏倒是使用了兩個線程來完成同步操做,這樣一般會致使線程飢餓和死鎖
🔔線程飢餓(starvation):指等待時間已經影響到進程運行,若是等待時間過長,致使進程使命沒有意義時,稱之爲餓死
🔔死鎖(deadlock):指兩個或兩個以上的線程相互爭奪資源,致使進程永久堵塞,
🔔使用Task.Result和Task.Wait()會在winform和ASP.NET中會死鎖,由於它們SynchronizationContext
具備對象,兩個線程在SynchronizationContext
爭奪致使死鎖,而ASP.NET Core則不會產生死鎖,由於ASP.NET Core本質是一個控制檯應用程序,並無上下文
❌下面的例子,雖然都不會產生死鎖,可是依然具備不少問題
async Task<string> RunAsync() { // 此線程ID輸出與UI線程ID不一致 Debug.WriteLine("UI線程:"+Thread.CurrentThread.ManagedThreadId); return await Task.Run(() => "Run"); } string DoOperationBlocking() { // 這種方法雖然擺脫了死鎖的問題,可是也致使了上下文問題,RunAsync不在以UI線程調用 // Result和Wait()方法若是出現異常,異常將被包裝爲AggregateException進行拋出, return Task.Run(() => RunAsync()).Result; } } private async void button1_Click(object sender, EventArgs e) { Debug.WriteLine("RunAsync:" + Thread.CurrentThread.ManagedThreadId); Debug.WriteLine(DoOperationBlocking()); }
public string DoOperationBlocking2() { // 此方法也是會致使上下文問題, // GetAwaiter()方法對異常不會包裝 return Task.Run(() => RunAsync()).GetAwaiter().GetResult(); }
在async和await,當時可使用continueWith來延遲執行一些方法,可是continueWith並不會捕捉`SynchronizationContext
`,因此建議使用await代替continueWith
❌下面例子就是使用continueWith
private void button1_Click(object sender, EventArgs e) { Debug.WriteLine("UI線程:" + Thread.CurrentThread.ManagedThreadId); RunAsync().ContinueWith(task => { Console.WriteLine("RunAsync returned:"+task.Result); // 由於是使用的continueWith,因此線程ID與UI線程並不一致 Debug.WriteLine("ContinueWith:" + Thread.CurrentThread.ManagedThreadId); }); } public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
☑️應該使用await來代替continueWith
private async void button1_Click(object sender, EventArgs e) { Debug.WriteLine("UI線程:" + Thread.CurrentThread.ManagedThreadId); Debug.WriteLine("RunAsync returned:"+ await RunAsync()); Debug.WriteLine("UI線程:" + Thread.CurrentThread.ManagedThreadId); } public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
對於編寫類庫的人來講TaskCompletionSource<T>
是一個具備很是重要的做用,默認狀況下任務延續可能會在調用try/set(Result/Exception/Cancel)的線程上進行運行,這也就是說做爲編寫類庫的人來講必須須要考慮上下文,這一般是很是危險,可能就會致使死鎖' 線程池飢餓 *數據結構損壞(若是代碼異常運行)
因此在建立TaskCompletionSourece<T>
時,應該使用TaskCreationOption.RunContinuationAsyncchronously
參數將後續任務交給線程池進行處理
❌下面例子就沒有使用TaskCreationOptions.RunComtinuationsAsynchronously
,
static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); var tcs = new TaskCompletionSource<bool>(); // 使用TaskContinuationOptions.ExecuteSynchronously來測試延續任務 ContinueWith(1, tcs.Task); // 測試await延續任務 ContinueAsync(2, tcs.Task); Task.Run(() => { Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId ); tcs.TrySetResult(true); }); Console.ReadLine(); } static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}"); static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); print(id); } static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => print(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }
☑️因此應該改成使用TaskCreationOptions.RunComtinuationsAsynchronously
參數進行設置TaskCompletionSoure
static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("Main CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); // 使用TaskContinuationOptions.ExecuteSynchronously來測試延續任務 ContinueWith(1, tcs.Task); // 測試await延續任務 ContinueAsync(2, tcs.Task); Task.Run(() => { Console.WriteLine("Task Run CurrentManagedThreadId:" + Environment.CurrentManagedThreadId); tcs.TrySetResult(true); }); Console.ReadLine(); } static void print(int id) => Console.WriteLine($"continuation:{id}\tCurrentManagedThread:{Environment.CurrentManagedThreadId}"); static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); print(id); } static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => print(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); }
🔔TaskCreationOptions.RunContinuationsAsynchronously
屬性和TaskContinuationOptions.RunContinuationsAsynchronously
很類似,但請注意它們的使用方式
用於進行超時的CancellationTokenSources,若是不釋放,則會增長timer queue(計時器隊列)
的壓力
❌下面例子由於沒有釋放,因此在每次請求發出以後,計時器在隊列中停留10秒鐘
public async Task<Stream> HttpClientAsyncWithCancellationBad() { var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); using (var client = _httpClientFactory.CreateClient()) { var response = await client.GetAsync("http://backend/api/1", cts.Token); return await response.Content.ReadAsStreamAsync(); } }
☑️因此應該及時的釋放CancellationSoure,使得正確的從隊列中刪除計時器
public async Task<Stream> HttpClientAsyncWithCancellationGood() { using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10))) { using (var client = _httpClientFactory.CreateClient()) { var response = await client.GetAsync("http://backend/api/1", cts.Token); return await response.Content.ReadAsStreamAsync(); } } }
🔔設置延遲時間具備兩種方式
1.構造器參數
public CancellationTokenSource(TimeSpan delay); public CancellationTokenSource(int millisecondsDelay);2.調用實例對象CancelAfter()
public void CancelAfter(TimeSpan delay);
public void CancelAfter(int millisecondsDelay);
因爲在.NET中取消操做必須顯示的傳遞CancellationToken
,因此若是想取消全部調用的異步函數,那麼應該將CancllationToken
傳遞給此調用鏈中的全部函數
❌下面例子在調用ReadAsync時並無傳遞CancellationToken
,因此不能有效的取消
public async Task<string> DoAsyncThing(CancellationToken cancellationToken = default) { byte[] buffer = new byte[1024]; // 使用FileOptions.Asynchronous參數指定異步通訊 using(Stream stream = new FileStream( @"d:\資料\Blogs\Task\TaskTest", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024, options:FileOptions.Asynchronous)) { // 因爲並無將cancellationToken傳遞給ReadAsync,因此沒法進行有效的取消 int read = await stream.ReadAsync(buffer, 0, buffer.Length); return Encoding.UTF8.GetString(buffer, 0, read); } }
☑️因此應該將CancellationToken
傳遞給ReadAsync(),以達到有效的取消
public async Task<string> DoAsyncThing(CancellationToken cancellationToken = default) { byte[] buffer = new byte[1024]; // 使用FileOptions.Asynchronous參數指定異步通訊 using(Stream stream = new FileStream( @"d:\資料\Blogs\Task\TaskTest", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None, 1024, options:FileOptions.Asynchronous)) { // 因爲並無將cancellationToken傳遞給ReadAsync,因此沒法進行有效的取消 int read = await stream.ReadAsync(buffer, 0, buffer.Length,cancellationToken); return Encoding.UTF8.GetString(buffer, 0, read); } }
🔔在使用異步IO時,應該將options參數設置爲FileOptions.Asynchronous,不然會產生額外的線程浪費,詳細信息請參考CLR中28.12節
在異步編程時出現了一種模式cancelling an uncancellable operation,這個用於取消像CancellationTokenRegistry
和timer
這樣的東西,一般是在被取消或超時時建立另一個線程進行操做,而後使用Task.WhenAny進行判斷是完成仍是被取消了
:x: 下面例子使用了Task.delay(-1,token)建立在觸發CancellationToken時觸發的任務,可是若是CancellationToken不觸發,則沒有辦法釋放CancellationTokenRegistry,就有可能會致使內存泄露
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) { // 沒有方法釋放cancellationToken註冊 var delayTask = Task.Delay(-1, cancellationToken); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消異步操做 throw new OperationCanceledException(); } return await task; }
:ballot_box_with_check:因此應該改爲下面這樣,在任務一完成,就釋放CancellationTokenRegistry
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); using (cancellationToken.Register(state => { // 這樣將在其中一個任務觸發時當即釋放CancellationTokenRegistry ((TaskCompletionSource<object>)state).TrySetResult(null); }, tcs)) { var resultTask = await Task.WhenAny(task, tcs.Task); if (resultTask == tcs.Task) { // 取消異步操做 throw new OperationCanceledException(cancellationToken); } return await task; } }
:x:下面這個例子即便在操做完成以後,也不會取消定時器,這也就是說最終會在計時器隊列中產生大量的計時器,從而浪費性能
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout) { var delayTask = Task.Delay(timeout); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消異步操做 throw new OperationCanceledException(); } return await task; }
:ballot_box_with_check:應改爲下面這樣,這樣將在任務完成以後,取消計時器的操做
public static async Task<T> TimeoutAfter<T>(this Task<T> task, TimeSpan timeout) { using (var cts = new CancellationTokenSource()) { var delayTask = Task.Delay(timeout, cts.Token); var resultTask = await Task.WhenAny(task, delayTask); if (resultTask == delayTask) { // 取消異步操做 throw new OperationCanceledException(); } else { // 取消計時器任務 cts.Cancel(); } return await task; } }
當使用Stream和StreamWriter進行異步寫入時,底層數據也有可能被緩衝,當數據被緩衝時,Stream和StreamWriter將使用同步的方式進行write/flush
,這將會致使線程阻塞,而且有可能致使線程池內線程不足(線程池飢餓)
❌下面例子因爲沒有調用FlushAsync(),因此最後是以同步方式進行write/flush的
public async static Task RunAsync() { using (var streamWriter = new StreamWriter(@"C:\資料\Blogs\Task")) { // 因爲沒有調用FlushAsync,因此最後是以同步方式進行write/flush的 await streamWriter.WriteAsync("Hello World"); } }
☑️因此應該改成下面這樣,在Dispose以前調用FlushAsync()
public async static Task RunAsync() { using (var streamWriter = new StreamWriter(@"C:\資料\Blogs\Task")) { await streamWriter.WriteAsync("Hello World"); // 調用FlushAsync() 使其使用異步write/flush await streamWriter.FlushAsync(); } }
使用async/await 代替直接返回Task具備以上好處
using
)❌下面這個錯誤的例子是將Task直接返回給了調用者
public Task<int> RunAsync() { return Task.FromResult(1 + 1); }
☑️因此應該使用async/await來代替返回Task
public async Task<int> RunAsync() { return await Task.FromResult(1 + 1); }
🔔使用async/await來代替返回Task時,還有性能上的考慮,雖然直接Task會更快,可是最終卻改變了異步的行爲,失去了異步狀態機的一些好處
❌下面例子使用一個返回值爲void的異步,將其傳遞給Timer進行,所以,若是其中任務拋出異常,則整個進程將退出
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public async void Heartbeat(object state) { await httpClient.GetAsync("http://mybackend/api/ping"); } }
❌下面例子將阻止計時器回調,這有可能致使線程池中線程耗盡,這也是一個異步差於同步的例子
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public void Heartbeat(object state) { httpClient.GetAsync("http://mybackend/api/ping").GetAwaiter().GetResult(); } }
☑️下面例子是使用基於的異步的方法,並在定時器回調函數中丟棄該任務,而且若是此方法拋出異常,則也不會關閉進程,而是會觸發TaskScheduler.UnobservedTaskException
事件
public class Pinger { private readonly Timer _timer; private readonly HttpClient _client; public Pinger(HttpClient client) { _client = new HttpClient(); _timer = new Timer(Heartbeat, null, 1000, 1000); } public void Heartbeat(object state) { _ = DoAsyncPing(); } private async Task DoAsyncPing() { // 異步等待 await _client.GetAsync("http://mybackend/api/ping"); }
假若有BackgroudQueue
類中有一個接收回調函數的FireAndForget
方法,該方法在某個時候執行調用
❌下面這個錯誤例子將強制調用者要麼阻塞要麼使用async void異步方法
public class BackgroundQueue { public static void FireAndForget(Action action) { } }
static async Task Main(string[] args) { var httpClient = new HttpClient(); // 由於方法類型是Action,因此只能使用async void BackgroundQueue.FireAndForget(async () => { await httpClient.GetAsync("http://pinger/api/1"); }); }
☑️因此應該構建一個回調異步方法的重載
public class BackgroundQueue { public static void FireAndForget(Action action) { } public static void FireAndForget(Func<Task> action) { } }
緩存異步結果是一種很常見的作法,ConcurrentDictionary是一個很好的集合,而GetOrAdd也是一個很方便的方法,它用於嘗試獲取已經存在的項,若是沒有則添加項.由於回調是同步的,因此很容易編寫Task.Result
的代碼,從而生成異步的結果值,可是這樣很容易致使線程池飢餓
❌下面這個例子就有可能致使線程池飢餓,由於當若是沒有緩存人員數據時,將阻塞請求線程
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, Person> _cache = new ConcurrentDictionary<int, Person>(); public PersonController(AppDbContext db) { _db = db; } public IActionResult Get(int id) { // 若是不存在緩存數據,則會進入堵塞狀態 var person = _cache.GetOrAdd(id, (key) => db.People.FindAsync(key).Result); return Ok(person); } }
☑️能夠改爲緩存線程自己,而不是結果,這樣將不會致使線程池飢餓
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, Task<Person>> _cache = new ConcurrentDictionary<int, Task<Person>>(); public PersonController(AppDbContext db) { _db = db; } public async Task<IActionResult> Get(int id) { // 由於緩存的是線程自己,因此沒有進行堵塞,也就不會產生線程池飢餓 var person = await _cache.GetOrAdd(id, (key) => db.People.FindAsync(key)); return Ok(person); } }
🔔這種方法,在最後,GetOrAdd()可能並行屢次來執行緩存回調,這可能致使啓動屢次昂貴的計算
☑️可使用async lazy
模式來取代屢次執行回調問題
public class PersonController : Controller { private AppDbContext _db; private static ConcurrentDictionary<int, AsyncLazy<Person>> _cache = new ConcurrentDictionary<int, AsyncLazy<Person>>(); public PersonController(AppDbContext db) { _db = db; } public async Task<IActionResult> Get(int id) { // 使用Lazy進行了延遲加載(使用時調用),解決了屢次執行回調問題 var person = await _cache.GetOrAdd(id, (key) => new AsyncLazy<Person>(() => db.People.FindAsync(key))); return Ok(person); } private class AsyncLazy<T> : Lazy<Task<T>> { public AsyncLazy(Func<Task<T>> valueFactory) : base(valueFactory) { } }
構造函數是同步,下面看看在構造函數中處理異步狀況
下面是使用客戶端API的例子,固然,在使用API以前須要異步進行鏈接
public interface IRemoteConnectionFactory { Task<IRemoteConnection> ConnectAsync(); } public interface IRemoteConnection { Task PublishAsync(string channel, string message); Task DisposeAsync(); }
❌下面例子使用Task.Result在構造函數中進行鏈接,這有可能致使線程池飢餓和死鎖現象
public class Service : IService { private readonly IRemoteConnection _connection; public Service(IRemoteConnectionFactory connectionFactory) { _connection = connectionFactory.ConnectAsync().Result; } }
☑️正確的方式應該使用靜態工廠模式進行異步鏈接
public class Service : IService { private readonly IRemoteConnection _connection; private Service(IRemoteConnection connection) { _connection = connection; } public static async Task<Service> CreateAsync(IRemoteConnectionFactory connectionFactory) { return new Service(await connectionFactory.ConnectAsync()); } }