併發編程概述--C#併發編程經典實例

優秀軟件的一個關鍵特徵就是具備併發性。過去的幾十年,咱們能夠進行併發編程,可是難度很大。之前,併發性軟件的編寫、調試和維護都很難,這致使不少開發人員爲圖省事放棄了併發編程。新版.NET 中的程序庫和語言特徵,已經讓併發編程變得簡單多了。隨着Visual Studio 2012 的發佈,微軟明顯下降了併發編程的門檻。之前只有專家才能作併發編程,而今天,每個開發人員都可以(並且應該)接受併發編程。react

1.1簡介

首先,我來解釋幾個貫穿本書始終的術語。先來介紹併發。算法

  • 併發 
    同時作多件事情

這個解釋直接代表了併發的做用。終端用戶程序利用併發功能,在輸入數據庫的同時響應用戶輸入。服務器應用利用併發,在處理第一個請求的同時響應第二個請求。只要你但願程序同時作多件事情,你就須要併發。幾乎每一個軟件程序都會受益於併發。大多數開發人員一看到「併發」就會想到「多線程」。對這兩個概念,須要作一下區分。數據庫

  • 多線程 
    併發的一種形式,它採用多個線程來執行程序。

從字面上看,多線程就是使用多個線程,多線程是併發的一種形式,但不是惟一的形式。實際上,直接使用底層線程類型在現代程序中基本不起做用。比起老式的多線程機制,採用高級的抽象機制會讓程序功能更增強大、效率更高所以,這裏儘可能不涉及一些過期的技術。書中全部多線程的方法都採用高級類型,而不是Thread或BackgroundWorker。編程

一旦你輸入new Thread(),那就糟糕了,說明項目中的代碼太過期了。數組

可是,不要認爲多線程已經完全被淘汰了!由於線程池要求多線程繼續存在。線程池存聽任務的隊列,這個隊列可以根據須要自行調整。相應地,線程池產生了另外一個重要的併發形式:並行處理promise

  • 並行處理 
    把正在執行的大量的任務分割成小塊,分配給多個同時運行的線程。

爲了讓處理器的利用效率最大化,並行處理(或並行編程)採用多線程。當現代多核CPU執行大量任務時,若只用一個核執行全部任務,而其餘覈保持空閒,這顯然是不合理的。並行處理把任務分割成小塊並分配給多個線程,讓它們在不一樣的核上獨立運行。並行處理是多線程的一種,而多線程是併發的一種。在現代程序中,還有一種很是重要但不少人還不熟悉的併發類型:異步編程緩存

  • 異步編程 
    併發的一種形式,它採用future 模式或回調(callback)機制,以免產生沒必要要的線程。

一個future(或promise)類型表明一些即將完成的操做。在.NET 中,新版future 類型有Task 和Task。在老式異步編程API 中,採用回調或事件(event),而不是future。異步編程的核心理念是異步操做:啓動了的操做將會在一段時間後完成。這個操做正在執行時,不會阻塞原來的線程。啓動了這個操做的線程,能夠繼續執行其餘任務。當操做完成時,會通知它的future,或者調用回調函數,以便讓程序知道操做已經結束。異步編程是一種功能強大的併發形式,但直至不前,實現異步編程仍須要特別複雜的代碼。VS2012 支持async 和await,這讓異步編程變得幾乎和同步(非併發)編程同樣容易。併發編程的另外一種形式是響應式編程(reactive programming)。異步編程意味着程序啓動一個操做,而該操做將會在一段時間後完成。響應式編程與異步編程很是相似,不過它是基於異步事件(asynchronous event)的,而不是異步操做(asynchronous operation)。異步事件能夠沒有一個實際的「開始」,能夠在任什麼時候間發生,而且能夠發生屢次,例如用戶輸入。安全

  • 響應式編程 
    一種聲明式的編程模式,程序在該模式中對事件作出響應。

若是把一個程序看做一個大型的狀態機,則該程序的行爲即可視爲它對一系列事件作出響應,即每換一個事件,它就更新一次本身的狀態。這聽起來很抽象和空洞,但實際上並不是如此。利用現代的程序框架,響應式編程已經在實際開發中普遍使用。響應式編程不必定是併發的,但它與併發編程聯繫緊密,所以本書介紹了響應式編程的基礎知識。一般狀況下,一個併發程序要使用多種技術。大多數程序至少使用了多線程(經過線程池)和異步編程。要大膽地把各類併發編程形式進行混合和匹配,在程序的各個部分使用合適的工具。服務器

 

1.2 異步編程簡介

異步編程有兩大好處。第一個好處是對於面向終端用戶的GUI程序:異步編程提升了響應能力。咱們都遇到過在運行時會臨時鎖定界面的程序,異步編程可使程序在執行任務時仍能響應用戶的輸入。第二個好處是對於服務器端應用:異步編程實現了可擴展性。服務器應用能夠利用線程池知足其可擴展性,使用異步編程後,可擴展性一般能夠提升一個數量級。markdown

現代的異步.NET程序使用兩個關鍵字:asyncawaitasync關鍵字加在方法聲明上,它的主要目的是使方法內的await關鍵字生效(爲了保持向後兼容,同時引入了這兩個關鍵字)。若是async方法有返回值,應返回Task<T>;若是沒有返回值,應返回Task。這些task類型至關於future,用來在異步方法結束時通知主程序。

圖像說明文字不要用void做爲async方法的返回類型!async方法能夠返回void,可是這僅限於編寫事件處理程序。一個普通的async方法若是沒有返回值,要返回Task,而不是void

有了上述背景知識,咱們來快速看一個例子:

async Task DoSomethingAsync() { int val = 13; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)); val *= 2; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)); Trace.WriteLine(val); }

和其餘方法同樣,async方法在開始時以同步方式執行。在async方法內部,await關鍵字對它的參數執行一個異步等待。它首先檢查操做是否已經完成,若是完成了,就繼續運行(同步方式)。不然,它會暫停async方法,並返回,留下一個未完成的task。一段時間後,操做完成,async方法就恢復運行。

一個async方法是由多個同步執行的程序塊組成的,每一個同步程序塊之間由await語句分隔。第一個同步程序塊在調用這個方法的線程中運行,但其餘同步程序塊在哪裏運行呢?狀況比較複雜。

最多見的狀況是,用await語句等待一個任務完成,當該方法在await處暫停時,就能夠捕捉上下文(context)。若是當前SynchronizationContext不爲空,這個上下文就是當前SynchronizationContext。若是當前SynchronizationContext爲空,則這個上下文爲當前TaskScheduler。該方法會在這個上下文中繼續運行。通常來講,運行UI線程時採用UI上下文,處理ASP.NET請求時採用ASP.NET請求上下文,其餘不少狀況下則採用線程池上下文。

所以,在上面的代碼中,每一個同步程序塊會試圖在原始的上下文中恢復運行。若是在UI線程中調用DoSomethingAsync,這個方法的每一個同步程序塊都將在此UI線程上運行。可是,若是在線程池線程中調用,每一個同步程序塊將在線程池線程上運行。

要避免這種錯誤行爲,能夠在await中使用ConfigureAwait方法,將參數continueOnCapturedContext設爲false。接下來的代碼剛開始會在調用的線程裏運行,在被await暫停後,則會在線程池線程裏繼續運行:

async Task DoSomethingAsync() { int val = 13; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); val *= 2; //異步方式等待1秒 await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); Trace.WriteLine(val.ToString()); }

圖像說明文字最好的作法是,在覈心庫代碼中一直使用ConfigureAwait。在外圍的用戶界面代碼中,只在須要時才恢復上下文。

關鍵字await不只能用於任務,還能用於全部遵循特定模式的awaitable類型。例如,Windows Runtime API定義了本身專用的異步操做接口。這些接口不能轉化爲Task類型,但確實遵循了可等待的(awaitable)模式,所以能夠直接使用await。這種awaitable類型在Windows應用商店程序中更加常見,可是在大多數狀況下,await使用TaskTask<T>

有兩種基本的方法能夠建立Task實例。有些任務表示CPU須要實際執行的指令,建立這種計算類的任務時,使用Task.Run(如須要按照特定的計劃運行,則用TaskFactory.StartNew)。其餘的任務表示一個通知(notification),建立這種基於事件的任務時,使用TaskCompletionSource<T>。大部分I/O型任務採用TaskCompletionSource<T>

使用asyncawait時,天然要處理錯誤。在下面的代碼中,PossibleExceptionAsync會拋出一個NotSupportedException異常,而TrySomethingAsync方法可很順利地捕捉到這個異常。這個捕捉到的異常完整地保留了棧軌跡,沒有人爲地將它封裝進TargetInvocationExceptionAggregateException類:

async Task TrySomethingAsync() { try { await PossibleExceptionAsync(); } catch(NotSupportedException ex) { LogException(ex); throw; } }

一旦異步方法拋出(或傳遞出)異常,該異常會放在返回的Task對象中,而且這個Task對象的狀態變爲「已完成」。當await調用該Task對象時,await會得到並(從新)拋出該異常,而且保留着原始的棧軌跡。所以,若是PossibleExceptionAsync是異步方法,如下代碼就能正常運行:

async Task TrySomethingAsync() { //發生異常時,任務結束。不會直接拋出異常。 Task task = PossibleExceptionAsync(); try { //Task對象中的異常,會在這條await語句中引起 await task; } catch(NotSupportedException ex) { LogException(ex); throw; } }

關於異步方法,還有一條重要的準則:你一旦在代碼中使用了異步,最好一直使用。調用異步方法時,應該(在調用結束時)用await等待它返回的task對象。必定要避免使用Task.WaitTask<T>.Result方法,由於它們會致使死鎖。參考一下下面這個方法:

async Task WaitAsync() { //這裏awati會捕獲當前上下文…… await Task.Delay(TimeSpan.FromSeconds(1)); // ……這裏會試圖用上面捕獲的上下文繼續執行 } void Deadlock() { //開始延遲 Task task = WaitAsync(); //同步程序塊,正在等待異步方法完成 task.Wait(); }

若是從UI或ASP.NET的上下文調用這段代碼,就會發生死鎖。這是由於,這兩種上下文每次只能運行一個線程。Deadlock方法調用WaitAsync方法,WaitAsync方法開始調用delay語句。而後,Deadlock方法(同步)等待WaitAsync方法完成,同時阻塞了上下文線程。當delay語句結束時,await試圖在已捕獲的上下文中繼續運行WaitAsync方法,但這個步驟沒法成功,由於上下文中已經有了一個阻塞的線程,而且這種上下文只容許同時運行一個線程。這裏有兩個方法能夠避免死鎖:在WaitAsync中使用ConfigureAwait(false)(致使await忽略該方法的上下文),或者用await語句調用WaitAsync方法(讓Deadlock變成一個異步方法)。

圖像說明文字若是使用了async,最好就一直使用它。

若想更全面地瞭解關於異步編程的知識,可參閱Alex Davies(O'Reilly)編寫的Async in C# 5.0,這本書很是不錯。另外,微軟公司有關異步編程的在線文檔也很不錯,建議你至少讀一讀「async overview」和「Task-based Asynchronous Pattern(TAP) overview」這兩篇。若是要深刻了解,官方FAQ和博客上也有大量的信息。

1.3 並行編程簡介

若是程序中有大量的計算任務,而且這些任務能分割成幾個互相獨立的任務塊,那就應該使用並行編程。並行編程可臨時提升CPU利用率,以提升吞吐量,若客戶端系統中的CPU常常處於空閒狀態,這個方法就很是有用,但一般並不適合服務器系統。大多數服務器自己具備並行處理能力,例如ASP.NET可並行地處理多個請求。某些狀況下,在服務器系統中編寫並行代碼仍然有用(若是你知道併發用戶數量會一直是少數)。但一般狀況下,在服務器系統上進行並行編程,將下降自己的並行處理能力,而且不會有實際的好處。

並行的形式有兩種:數據並行(data parallelism)和任務並行(task parallelim)。數據並行是指有大量的數據須要處理,而且每一塊數據的處理過程基本上是彼此獨立的。任務並行是指須要執行大量任務,而且每一個任務的執行過程基本上是彼此獨立的。任務並行能夠是動態的,若是一個任務的執行結果會產生額外的任務,這些新增的任務也能夠加入任務池。

實現數據並行有幾種不一樣的作法。一種作法是使用Parallel.ForEach方法,它相似於foreach循環,應儘量使用這種作法。在3.1節將會詳細介紹Parallel.ForEach方法。Parallel類也提供Parallel.For方法,這相似於for循環,當數據處理過程基於一個索引時,可以使用這個方法。下面是使用Parallel.ForEach的代碼例子:

void RotateMatrices(IEnumerable<Matrix> matrices, float degrees) { Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees)); }

另外一種作法是使用PLINQ(Parallel LINQ),它爲LINQ查詢提供了AsParallel擴展。跟PLINQ相比,Parallel對資源更加友好,Parallel與系統中的其餘進程配合得比較好,而PLINQ會試圖讓全部的CPU來執行本進程。Parallel的缺點是它太明顯。不少狀況下,PLINQ的代碼更加優美。PLINQ在3.5節有詳細介紹:

IEnumerable<bool> PrimalityTest(IEnumerable<int> values) { return values.AsParallel().Select(val => IsPrime(val)); }

無論選用哪一種方法,在並行處理時有一個很是重要的準則。

圖像說明文字每一個任務塊要儘量的互相獨立。

只要任務塊是互相獨立的,並行性就能作到最大化。一旦你在多個線程中共享狀態,就必須以同步方式訪問這些狀態,那樣程序的並行性就變差了。第11章將詳細講述同步。

有多種方式能夠控制並行處理的輸出。能夠把結果存在某些併發集合,或者對結果進行聚合。聚合在並行處理中很常見,Parallel類的重載方法,也支持這種map/reduce函數。關於聚合的詳細內容在3.2節。

下面講任務並行。數據並行重點在處理數據,任務並行則關注執行任務。

Parallel類的Parallel.Invoke方法能夠執行「分叉/聯合」(fork/join)方式的任務並行。3.3節將詳細介紹這個方法。調用該方法時,把要並行執行的委託(delegate)做爲傳入參數:

void ProcessArray(double[] array) { Parallel.Invoke( () => ProcessPartialArray(array, 0, array.Length / 2), () => ProcessPartialArray(array, array.Length / 2, array.Length) ); } void ProcessPartialArray(double[] array, int begin, int end) { // CPU密集型的操做…… }

如今Task這個類也被用於異步編程,但當初它是爲了任務並行而引入的。任務並行中使用的一個Task實例表示一些任務。可使用Wait方法等待任務完成,還可使用ResultException屬性來檢查任務執行的結果。直接使用Task類型的代碼比使用Parallel類要複雜,可是,若是在運行前不知道並行任務的結構,就須要使用Task類型。若是使用動態並行機制,在開始處理時,任務塊的個數是不肯定的,只有繼續執行後才能肯定。一般狀況下,一個動態任務塊要啓動它所需的全部子任務,而後等待這些子任務執行完畢。爲實現這個功能,可使用Task類型中的一個特殊標誌:TaskCreationOptions.AttachedToParent。動態並行機制在3.4節中詳述。

跟數據並行同樣,任務並行也強調任務塊的獨立性。委託(delegate)的獨立性越強,程序的執行效率就越高。在編寫任務並行程序時,要格外留意下閉包(closure)捕獲的變量。記住閉包捕獲的是引用(不是值),所以能夠在結束時以不明顯地方式地分享這些變量。

對全部並行處理類型來說,錯誤處理的方法都差很少。因爲操做是並行執行的,多個異常就會同時發生,系統會把這些異常封裝在AggregateException類裏,在程序中拋給代碼。這一特色對全部方法都是同樣的,包括Parallel.ForEachParalle.InvokeTask.Wait等。AggregateException類型有幾個實用的FlattenHandle方法,用來簡化錯誤處理的代碼:

try { Parallel.Invoke(() => { throw new Exception(); }, () => { throw new Exception(); }); } catch (AggregateException ex) { ex.Handle(exception => { Trace.WriteLine(exception); return true; // "已經處理" }); }

一般狀況下,不必關心線程池處理任務的具體作法。數據並行和任務並行都使用動態調整的分割器,把任務分割後分配給工做線程。線程池在須要的時候會增長線程數量。線程池線程使用工做竊取隊列(work-stealing queue)。微軟公司爲了讓每一個部分儘量高效,作了不少優化。要讓程序獲得最佳的性能,有不少參數能夠調節。只要任務時長不是特別短,採用默認設置就會運行得很好。

圖像說明文字任務不要特別短,也不要特別長。

若是任務過短,把數據分割進任務和在線程池中調度任務的開銷會很大。若是任務太長,線程池就不能進行有效的動態調整以達到工做量的平衡。很難肯定「過短」和「太長」的判斷標準,這取決於程序所解決問題的類型以及硬件的性能。根據一個通用的準則,只要沒有致使性能問題,我會讓任務儘量短(若是任務過短,程序性能會忽然下降)。更好的作法是使用Parallel類型或者PLINQ,而不是直接使用任務。這些並行處理的高級形式,自帶有自動分配任務的算法(而且會在運行時自動調整)。

要更深刻的瞭解並行編程,這方面最好的書是Colin Campbell等人編寫的Parallel Programming with Microsoft.NET(微軟出版社)。

1.4 響應式編程簡介

跟併發編程的其餘形式相比,響應式編程的學習難度較大。若是對響應式編程不是很是熟悉,代碼維護相對會更難一點。一旦你學會了,就會發現響應式編程的功能特別強大。響應式編程能夠像處理數據流同樣處理事件流。根據經驗,若是事件中帶有參數,那麼最好採用響應式編程,而不是常規的事件處理程序。

響應式編程基於「可觀察的流」(observable stream)這一律念。你一旦申請了可觀察流,就能夠收到任意數量的數據項(OnNext),而且流在結束時會發出一個錯誤(OnError)或一個「流結束」的通知(OnCompleted)。有些可觀察流是不會結束的。實際的接口就像這樣:

interface IObserver<in T> { void OnNext(T item); void OnCompleted(); void OnError(Exception error); } interface IObservable<out T> { IDisposable Subscribe(IObserver<T> observer); }

不過,開發人員不須要實現這些接口。微軟的Reactive Extensions(Rx)庫已經實現了全部接口。響應式編程的最終代碼很是像LINQ,能夠認爲它就是「LINQ to events」。下面的代碼中,前面是咱們不熟悉的操做符(IntervalTimestamp),最後是一個Subscribe,可是中間部分是咱們在LINQ中熟悉的操做符:WhereSelect。LINQ具備的特性,Rx也都有。Rx在此基礎上增長了不少它本身的操做符,特別是與時間有關的操做符:

Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x));

上面的代碼中,首先是一個延時一段時間的計數器(Interval),隨後爲每一個事件加了一個時間戳(Timestamp)。接着對事件進行過濾,只包含偶數值(Where),選擇了時間戳的值(Timestamp),而後當每一個時間戳值到達時,把它輸入調試器(Subscribe)。若是沒有理解上述新的操做符(例如Interval),沒關係,咱們會在後面講述。如今只要記住這是一個LINQ查詢,與你之前見過的LINQ查詢很相似。主要區別在於:LINQ to Object和LINQ to Entity使用「拉取」模式,LINQ的枚舉經過查詢拉出數據。而LINQ to event(Rx)使用「推送」模式,事件到達後就自行穿過查詢。

可觀察流的定義和其訂閱是互相獨立的。上面最後一個例子與下面的代碼等效:

IObservable<DateTimeOffset> timestamps = Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp); timestamps.Subscribe(x => Trace.WriteLine(x));

一種常規的作法是把可觀察流定義爲一種類型,而後將其做爲IObservable<T>資源使用。其餘類型能夠訂閱這些流,或者把這些流與其餘操做符組合,建立另外一個可觀察流。

Rx的訂閱也是一個資源。Subscribe操做符返回一個IDisposable,即表示訂閱完成。當你響應了那個可觀察流,就得處理這個訂閱。

對於hot observable(熱可觀察流)和cold observable(冷可觀察流)這兩種對象,訂閱的作法各有不一樣。一個hot observable對象是指一直在發生的事件流,若是在事件到達時沒有訂閱者,事件就丟失了。例如,鼠標的移動就是一個hot observable對象。cold observable對象是始終沒有輸入事件(不會主動產生事件)的觀察流,它只會經過啓動一個事件隊列來響應訂閱。例如,HTTP下載是一個cold observable對象,只有在訂閱後纔會發出HTTP請求。

一樣,全部Subscribe操做符都須要有處理錯誤的參數。前面的例子沒有錯誤處理參數。下面則是一個更好的例子,在可觀察流發生錯誤時,它能正確處理:

Observable.Interval(TimeSpan.FromSeconds(1)) .Timestamp() .Where(x => x.Value % 2 == 0) .Select(x => x.Timestamp) .Subscribe(x => Trace.WriteLine(x), ex => Trace.WriteLine(ex));

在進行Rx實驗性編程時,Subject<T>這個類型頗有用。這個「subject」就像手動實現一個可觀察流。能夠在代碼中調用OnNextOnErrorOnCompleted,這個subject會把這些調用傳遞給訂閱者。Subject<T>用於實驗時效果很是不錯,但在實際產品開發時,應該使用第5章介紹的操做符。

Rx的操做符很是多,本書只介紹了一部分。想了解關於Rx的更多信息,建議閱讀優秀的在線圖書Introduction to Rx。

1.5 數據流簡介

TPL數據流頗有意思,它把異步編程和並行編程這兩種技術結合起來。若是須要對數據進行一連串的處理,TPL數據流就頗有用。例如,須要從一個URL上下載數據,接着解析數據,而後把它與其餘數據一塊兒作並行處理。TPL數據流一般做爲一個簡易的管道,數據從管道的一端進入,在管道中穿行,最後從另外一端出來。不過,TPL數據流的功能比普通管道要強大多了。對於處理各類類型的網格(mesh),在網格中定義分叉(fork)、鏈接(join)、循環(loop)的工做,TPL數據流都能正確地處理。固然了,大多數時候TPL數據流網格仍是被用做管道。

數據流網格的基本組成單元是數據流塊(dataflow block)。數據流塊能夠是目標塊(接收數據)或源塊(生成數據),或二者皆可。源塊能夠鏈接到目標塊,建立網格。鏈接的具體內容在4.1節介紹。數據流塊是半獨立的,當數據到達時,數據流塊會試圖對數據進行處理,而且把處理結果推送給下一個流程。使用TPL數據流的常規方法是建立全部的塊,再把它們連接起來,而後開始在一端填入數據。而後,數據會自行從另外一端出來。再強調一次,數據流的功能比這要強大得多,數據穿過的同時,可能會斷開鏈接、建立新的塊並加入到網格,不過這是很是高級的使用場景。

目標塊帶有緩衝區,用來存放收到的數據。所以,在還來不及處理數據的時候,它仍能接收新的數據項,這就讓數據能夠持續地在網格上流動。在有分叉的狀況下,一個源塊連接了兩個目標塊,這種緩衝機制就會產生問題。當源塊有數據須要傳遞下去時,它會把數據傳給與它連接的塊,而且一次只傳一個數據。默認狀況下,第一個目標塊會接收數據並緩存起來,而第二個目標塊就收不到任何數據。解決這個問題的方法是把目標塊設置爲「非貪婪」模式,以限制緩衝區的數量,這部分將在4.4節介紹。

若是某些步驟出錯,例如委託在處理數據項時拋出異常,數據流塊就會出錯。數據流塊出錯後就會中止接收數據。默認狀況下,一個塊出錯不會摧毀整個網格。這讓程序有能力重建部分網格,或者對數據從新定向。然而這是一個高級用法。一般來說,你是但願這些錯誤經過連接傳遞給目標塊。數據流也提供這個選擇,惟一比較難辦的地方是當異常經過連接傳遞時,它就會被封裝在AggregateException類中。所以,若是管道很長,最後異常的嵌套層次會很是多,這時就可使用AggregateException.Flatten方法:

try { var multiplyBlock = new TransformBlock<int, int>(item => { if (item == 1) throw new InvalidOperationException("Blech."); return item * 2; }); var subtractBlock = new TransformBlock<int, int>(item => item - 2); multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true }); multiplyBlock.Post(1); subtractBlock.Completion.Wait(); } catch (AggregateException exception) { AggregateException ex = exception.Flatten(); Trace.WriteLine(ex.InnerException); }

數據流錯誤的處理方法將在4.2節詳細介紹。

數據流網格給人的第一印象是與可觀察流很是相似,實際上它們確實有不少共同點。網格和流都有「數據項」這一律念,數據項從網格或流的中間穿過。還有,網格和流都有「正常完成」(表示沒有更多數據須要接收時發出的通知)和「不正常完成」(在處理數據中發生錯誤時發出的通知)這兩個概念。可是,Rx和TPL數據流的性能並不相同。若是執行須要計時的任務,最好使用Rx的observable對象,而不是數據流塊。若是進行並行處理,最好使用數據流塊,而不是Rx的observable對象。從概念上說,Rx更像是創建回調函數:observable對象中的每一個步驟都會直接調用下一步。相反,數據流網格中的每一塊都是互相獨立的。Rx和TPL數據流有各自的應用領域,也有一些交叉的領域。另外一方面,Rx和TPL數據流也很是適合同時使用。Rx和TPL數據流的互操做性將在7.7節詳細介紹。

最經常使用的塊類型有TransformBlock<TInput, TOutput>(與LINQ的Select相似)、TransformManyBlock<TInput, Toutput>(與LINQ的SelectMany相似)和ActionBlock<T>(爲每一個數據項運行一個委託)。要了解TPL數據流的更多知識,建議閱讀MSDN的文檔和Guide to Implementing Custom TPL Dataflow Blocks。

1.6 多線程編程簡介

線程是一個獨立的運行單元,每一個進程內部有多個線程,每一個線程能夠各自同時執行指令。每一個線程有本身獨立的棧,可是與進程內的其餘線程共享內存。對某些程序來講,其中有一個線程是特殊的,例如用戶界面程序有一個UI線程,控制檯程序有一個main線程。

每一個.NET程序都有一個線程池,線程池維護着必定數量的工做線程,這些線程等待着執行分配下來的任務。線程池能夠隨時監測線程的數量。配置線程池的參數多達幾十個,可是建議採用默認設置,線程池的默認設置是通過仔細調整的,適用於絕大多數現實中的應用場景。

應用程序幾乎不須要自行建立新的線程。你若要爲COM interop程序建立STA線程,就得建立線程,這是惟一須要線程的狀況。

線程是低級別的抽象,線程池是稍微高級一點的抽象,當代碼段遵循線程池的規則運行時,線程池就會在須要時建立線程。本書介紹的技術抽象級別更高:並行和數據流的處理隊列會根據狀況遵循線程池運行。抽象級別更高,正確代碼的編寫就更容易。

基於這個緣由,本書根本不介紹ThreadBackgroundWorker這兩種類型。它們曾經很是流行,但那個時代已通過去了。

1.7 併發編程的集合

併發編程所用到的集合有兩類:併發集合和不可變集合。這兩種類別的集合將在第8章詳細介紹。多個線程能夠用安全的方式同時更新併發集合。大多數併發集合使用快照(snapshot),當一個線程在增長或刪除數據時,另外一個線程也能枚舉數據。比起給常規集合加鎖以保護數據的方式,採用併發集合的方式要高效得多。

不可變集合則有些不一樣。不可變集合其實是沒法修改的。要修改一個不可變集合,須要創建一個新的集合來表明這個被修改了的集合。這看起來效率很是低,可是不可變集合的各個實例之間儘量多地共享存儲區,所以實際上效率沒想象得那麼差。不可變集合的優勢之一,就是全部的操做都是簡潔的,所以特別適合在函數式代碼中使用。

1.8 現代設計

大多數併發編程技術有一個相似點:它們本質上都是函數式(functional)的。這裏「functional」的意思不是「實用,能完成任務」1,而是把它做爲一種基於函數組合的編程模式。若是你接受函數式的編程理念,併發編程的設計就會簡單得多。

1英文中「函數式」和「實用」是同一個單詞functional。——譯者注

函數式編程的一個原則就是簡潔(換言之,就是避免反作用)。解決方案中的每個片斷都用一些值做爲輸入,生成一些值做爲輸出。應該儘量避免讓這些段落依賴於全局(或共享)變量,或者修改全局(或共享)數據結構。不論這個片斷是異步方法、並行任務、Rx操做仍是數據流塊,都應該這麼作。固然了,具體作法早晚會受到計算內容的影響,但若是能用簡潔的段落來處理,而後用結果來執行更新,代碼就會更加清晰。

函數式編程的另外一個原則是不變性。不變性是指一段數據是不能被修改的。在併發編程中使用不可變數據的緣由之一,是程序永遠不須要對不可變數據進行同步。數據不能修改,這一事實讓同步變得沒有必要。不可變數據也能避免反作用。在編寫本書時(2014年),雖然不可變數據尚未被普遍接受,但本書中有幾節會介紹不可變數據結構。

1.9 技術要點總結

在.NET剛推出時,就對異步編程提供了必定的支持。可是異步編程一直是很難的,直到2012年.NET 4.5(同時發佈C# 5.0和VB 2012)引入asyncawait這兩個關鍵字。本書中的異步編程方法,將所有采用現代的async/await。同時介紹一些方法,來實現async和老式異步編程模式的交互。要支持老式平臺的話,須要下載NuGet包Microsoft.Bcl.Async

圖像說明文字不要在基於.NET 4.0的ASP.NET代碼中使用Microsoft.Bcl.Async進行異步編程!在.NET中,ASP.NET管道已經進行修改以支持async。對於異步ASP.NET項目,必須使用.NET 4.5或更高版本。

.NET 4.0引入了任務並行庫(TPL),徹底支持數據並行和任務並行。可是一些資源較少的平臺(例如手機),一般不支持TPL。TPL是.NET框架自帶的。

Reactive Extensions團隊已經讓它儘量多地支持多種平臺。Reactive Extensions和asyncawait同樣,對全部類型的應用都有好處,包括客戶端和服務器端應用。Rx在NuGet包Rx-Main中。

TPL數據流庫只支持較新的平臺,它的官方版本在NuGet包Microsoft.Tpl.Dataflow中。

併發編程的集合是.NET框架的一部分,可是不可變集合在NuGet包Microsoft.Bcl.Immutable中。表1-1列出了各主流平臺對各類技術的支持狀況。

表1-1:各平臺對併發編程的支持

平臺

async

並行編程

Rx

數據流

併發集合

不可變集合

.NET 4.5

.NET 4.0

×

×

Mono iOS/Droid

Windows Store

Windows Phone Apps 8.1

Windows Phone SL 8.0

×

×

Windows Phone SL 7.1

×

×

×

×

Silverlight 5

×

×

×

×

目錄

相關文章
相關標籤/搜索