關鍵要點
- 異步編程技術提供了一種提升程序響應能力的方法。
- Async/Await模式在C# 5中首次亮相,但只能返回單個標量值。
- C# 8添加了異步流(Async Streams),容許異步方法返回多個值,從而擴展了其可用性。
- 異步流提供了一種用於表示異步數據源的絕佳方法。
- 異步流是Java和JavaScript中使用的反應式編程模型的替代方案。
C# 5引入了Async/Await,用以提升用戶界面響應能力和對Web資源的訪問能力。換句話說,異步方法用於執行不阻塞線程並返回一個標量結果的異步操做。html
微軟屢次嘗試簡化異步操做,由於Async/Await模式易於理解,因此在開發人員當中得到了良好的承認。git
現有異步方法的一個重要不足是它必須提供一個標量返回結果(一個值)。好比這個方法async Task<int> DoAnythingAsync(),DoAnythingAsync的結果是一個整數(一個值)。github
因爲存在這個限制,你不能將這個功能與yield關鍵字一塊兒使用,而且也不能將其與async IEnumerable<int>(返回異步枚舉)一塊兒使用。數據庫
若是能夠將Async/Await特性與yield操做符一塊兒使用,咱們就可使用很是強大的編程模型(如異步數據拉取或基於拉取的枚舉,在F#中被稱爲異步序列)。編程
C# 8中新提出的Async Streams去掉了標量結果的限制,並容許異步方法返回多個結果。數組
這個變動將使異步模式變得更加靈活,這樣就能夠按照延遲異步序列的方式從數據庫中獲取數據,或者按照異步序列的方式下載數據(這些數據在可用時以塊的形式返回)。服務器
例如:架構
foreach await (var streamChunck in asyncStreams) { Console.WriteLine($「Received data count = {streamChunck.Count}」); }
Reactive Extensions(Rx)是解決異步編程問題的另外一種方法。Rx愈來愈受到開發人員的歡迎。不少其餘編程語言(如Java和JavaScript)已經實現了這種技術(RxJava、RxJS)。Rx基於推送式編程模型(Push Programming Model),也稱爲反應式編程。反應式編程是事件驅動編程的一種類型,它處理的是數據而不是通知。app
一般,在推送式編程模型中,你不須要控制Publisher。數據被異步推送到隊列中,消費者在數據到達時消費數據。與Rx不一樣,Async Streams能夠按需被調用,並生成多個值,直到達到枚舉的末尾。異步
在本文中,我將對拉取模型和推送模型進行比較,並演示每一種技術各自的適用場景。我將使用不少代碼示例向你展現整個概念和它們的優勢,最後,我將討論Async Streams功能,並向你展現示例代碼。
圖-1-拉取式編程模型與推送式編程模型
我使用的例子是著名的生產者和消費者問題,但在咱們的場景中,生產者不是生成食物,而是生成數據,消費者消費的是生成的數據,如圖-1所示。拉取模型很容易理解。消費者詢問並拉取生產者的數據。另外一種方法是使用推送模型。生產者將數據發佈到隊列中,消費者經過訂閱隊列來接收所需的數據。
拉取模型更合適「快生產者和慢消費者」的場景,由於消費者能夠從生產者那裏拉取其所需的數據,避免消費者出現溢出。推送模型更適合「慢生產者和快消費者」的場景,由於生產者能夠將數據推送給消費者,避免消費者沒必要要的等待時間。
Rx和Akka Streams(流式編程模型)使用了回壓技術(一種流量控制機制)。它使用拉取模型或推送模型來解決上面提到的生產者和消費者問題。
在下面的示例中,我使用了一個慢消費者從快生產者那裏異步拉取數據序列。消費者在處理完一個元素後,會向生產者請求下一個元素,依此類推,直到到達序列的末尾。
要了解咱們爲何須要Async Streams,讓咱們來看下面的代碼。
// 對參數(count)進行循環相加操做 static int SumFromOneToCount(int count) { ConsoleExt.WriteLine("SumFromOneToCount called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }
方法調用:
const int count = 5; ConsoleExt.WriteLine($"Starting the application with count: {count}!"); ConsoleExt.WriteLine("Classic sum starting."); ConsoleExt.WriteLine($"Classic sum result: {SumFromOneToCount(count)}"); ConsoleExt.WriteLine("Classic sum completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
咱們能夠經過使用yield運算符讓這個方法變成惰性的,以下所示。
static IEnumerable<int> SumFromOneToCountYield(int count) { ConsoleExt.WriteLine("SumFromOneToCountYield called!"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; yield return sum; } }
調用方法:
const int count = 5; ConsoleExt.WriteLine("Sum with yield starting."); foreach (var i in SumFromOneToCountYield(count)) { ConsoleExt.WriteLine($"Yield sum: {i}"); } ConsoleExt.WriteLine("Sum with yield completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
正如你在輸出窗口中看到的那樣,結果被分紅幾個部分返回,而不是做爲一個值返回。以上顯示的累積結果被稱爲惰性枚舉。可是,仍然存在一個問題,即sum方法阻塞了代碼的執行。若是你查看線程,能夠看到全部東西都在主線程中運行。
如今,讓咱們將async應用於第一個方法SumFromOneToCount上(沒有yield關鍵字)。
static async Task<int> SumFromOneToCountAsync(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsync called!"); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; } return sum; }); return result; }
調用方法:
const int count = 5; ConsoleExt.WriteLine("async example starting."); // 相加操做是異步進行得!這樣還不夠,咱們要求不只是異步的,還必須是惰性的。 var result = await SumFromOneToCountAsync(count); ConsoleExt.WriteLine("async Result: " + result); ConsoleExt.WriteLine("async completed."); ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
咱們能夠看到計算過程是在另外一個線程中運行,但結果仍然是做爲一個值返回!
想象一下,咱們能夠按照命令式風格將惰性枚舉(yield return)與異步方法結合起來。這種組合稱爲Async Streams。這是C# 8中新提出的功能。這個新功能爲咱們提供了一種很好的技術來解決拉取式編程模型問題,例如從網站下載數據或從文件或數據庫中讀取記錄。
讓咱們嘗試使用當前的C# 版本。我將async關鍵字添加到SumFromOneToCountYield方法中,以下所示。
圖-2 組合使用async關鍵字和yield發生錯誤
咱們試着將async添加到SumFromOneToCountYield,但直接出現錯誤,如上所示!
讓咱們試試別的吧。咱們能夠將IEnumerable放入任務中並刪除yield關鍵字,以下所示:
static async Task<IEnumerable<int>> SumFromOneToCountTaskIEnumerable(int count) { ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable called!"); var collection = new Collection<int>(); var result = await Task.Run(() => { var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; collection.Add(sum); } return collection; }); return result; }
調用方法:
const int count = 5; ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable started!"); var scs = await SumFromOneToCountTaskIEnumerable(count); ConsoleExt.WriteLine("SumFromOneToCountAsyncIEnumerable done!"); foreach (var sc in scs) { // 這不是咱們想要的,結果將做爲塊返回!!!! ConsoleExt.WriteLine($"AsyncIEnumerable Result: {sc}"); } ConsoleExt.WriteLine("################################################"); ConsoleExt.WriteLine(Environment.NewLine);
輸出:
能夠看到,咱們異步計算全部的內容,但仍然存在一個問題。結果(全部結果都在集合中累積)做爲一個塊返回,但這不是咱們想要的惰性行爲,咱們的目標是將惰性行爲與異步計算風格相結合。
爲了實現所需的行爲,你須要使用外部庫,如Ix(Rx的一部分),或者你必須使用新提出的C#特性Async Streams。
回到咱們的代碼示例。我使用了一個外部庫來顯示異步行爲。
static async Task ConsumeAsyncSumSeqeunc(IAsyncEnumerable<int> sequence) { ConsoleExt.WriteLineAsync("ConsumeAsyncSumSeqeunc Called"); await sequence.ForEachAsync(value => { ConsoleExt.WriteLineAsync($"Consuming the value: {value}"); // 模擬延遲! Task.Delay(TimeSpan.FromSeconds(1)).Wait(); }); } static IEnumerable<int> ProduceAsyncSumSeqeunc(int count) { ConsoleExt.WriteLineAsync("ProduceAsyncSumSeqeunc Called"); var sum = 0; for (var i = 0; i <= count; i++) { sum = sum + i; // 模擬延遲! Task.Delay(TimeSpan.FromSeconds(0.5)).Wait(); yield return sum; } }
調用方法:
const int count = 5; ConsoleExt.WriteLine("Starting Async Streams Demo!"); // 啓動一個新任務,用於生成異步數據序列! IAsyncEnumerable<int> pullBasedAsyncSequence = ProduceAsyncSumSeqeunc(count).ToAsyncEnumerable(); ConsoleExt.WriteLineAsync("X#X#X#X#X#X#X#X#X#X# Doing some other work X#X#X#X#X#X#X#X#X#X#"); // 啓動另外一個新任務,用於消費異步數據序列! var consumingTask = Task.Run(() => ConsumeAsyncSumSeqeunc(pullBasedAsyncSequence)); // 出於演示目的,等待任務完成! consumingTask.Wait(); ConsoleExt.WriteLineAsync("Async Streams Demo Done!");
輸出:
最後,咱們實現了咱們想要的行爲!咱們能夠在枚舉上進行異步迭代。
源代碼在這裏。
我將使用一個更現實的例子來解釋這個概念。客戶端/服務器端架構是演示這一功能優點的絕佳方法。
客戶端向服務器端發送請求,客戶端必須等待(客戶端被阻塞),直到服務器端作出響應,如圖-3所示。
圖-3 同步數據拉取,客戶端等待請求完成
客戶端發出數據請求而後繼續執行其餘操做。一旦有數據到達,客戶端就繼續處理達到的數據。
圖-4 異步數據拉取,客戶端能夠在請求數據時執行其餘操做
客戶端發出數據塊請求,而後繼續執行其餘操做。一旦數據塊到達,客戶端就處理接收到的數據塊並詢問下一個數據塊,依此類推,直到達到最後一個數據塊爲止。這正是Async Streams想法的來源。圖-5顯示了客戶端能夠在收到任何數據時執行其餘操做或處理數據塊。
圖-5 異步序列數據拉取(Async Streams),客戶端未被阻塞!
與IEnumerable<T>和IEnumerator<T>相似,Async Streams提供了兩個新接口IAsyncEnumerable<T>和IAsyncEnumerator<T>,定義以下:
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); } public interface IAsyncEnumerator<out T> : IAsyncDisposable { Task<bool> MoveNextAsync(); T Current { get; } } // Async Streams Feature能夠被異步銷燬 public interface IAsyncDisposable { Task DiskposeAsync(); }
Jonathan Allen已經在InfoQ網站上介紹過這個主題,我不想在這裏再重複一遍,因此我建議你也閱讀一下他的文章。
關鍵在於Task<bool> MoveNextAsync()的返回值(從bool改成Task<bool>,bool IEnumerator.MoveNext())。這樣可讓整個計算和迭代都保持異步。大多數狀況下,這仍然是拉取模型,即便它是異步的。IAsyncDisposable接口可用於進行異步清理。有關異步的更多信息,請點擊此處。
最終語法應以下所示:
foreach await (var dataChunk in asyncStreams) { // 處理數據塊或作一些其餘的事情! }
如上所示,咱們如今能夠按順序計算多個值,而不僅是計算單個值,同時還可以等待其餘異步操做結束。
我重寫了微軟的演示代碼,你能夠從個人GitHub下載相關代碼。
這個例子背後的想法是建立一個大的MemoryStream(20000字節的數組),並按順序異步迭代集合中的元素或MemoryStream。每次迭代從數組中拉取8K字節。
在(1)處,咱們建立了一個大字節數組並填充了一些虛擬值。在(2)處,咱們定義了一個叫做checksum的變量。咱們將使用checksum來確保計算的總和是正確的。數組和checksum位於內存中,並經過一個元組返回,如(3)所示。
在(4)處,AsEnumarble(或者叫AsAsyncEnumarble)是一種擴展方法,用於模擬由8KB塊組成的異步流( (6)處所示的BufferSize = 8000)。
一般,你沒必要繼承IAsyncEnumerable,但在上面的示例中,微軟這樣作是爲了簡化演示,如(5)處所示。
(7)處是「foreach」,它從異步內存流中拉取8KB的塊數據。當消費者(foreach代碼塊)準備好接收更多數據時,拉取過程是順序進行的,而後它從生產者(內存流數組)中拉取更多的數據。最後,當迭代完成後,應用程序將’c’的校驗和與checksum進行比較,若是它們匹配,就打印出「Checksums match!」,如(8)所示!
微軟演示的輸出窗口:
咱們已經討論過Async Streams,它是一種出色的異步拉取技術,可用於進行生成多個值的異步計算。
Async Streams背後的編程概念是異步拉取模型。咱們請求獲取序列的下一個元素,並最終獲得答覆。這與IObservable<T>的推送模型不一樣,後者生成與消費者狀態無關的值。Async Streams提供了一種表示異步數據源的絕佳方法,例如,當消費者還沒有準備好處理更多數據時。示例包含了Web應用程序或從數據庫中讀取記錄。