線程 Z

原文:http://www.albahari.com/threading/part5.aspxhtml

專題:C#中的多線程前端

1並行編程Permalink

在這一部分,咱們討論 Framework 4.0 加入的多線程 API,它們能夠充分利用多核處理器。程序員

這些 API 能夠統稱爲 PFX(Parallel Framework,並行框架)。Parallel類與任務並行構造一塊兒被稱爲 TPL(Task Parallel Library,任務並行庫)。web

Framework 4.0 也增長了一些更底層的線程構造,它們針對傳統的多線程。咱們以前講過的:算法

在繼續閱讀前,你須要瞭解第 1 部分 - 第 4 部分中的基本原理,特別是線程安全shell

並行編程這一部分提供的全部代碼均可以在LINQPad中試驗。LINQPad 是一個 C# 代碼草稿板,能夠用來測試代碼段,而無需建立類、項目或解決方案。想要獲取這些示例代碼,能夠在 LINQPad 左下方的 Samples 標籤頁中點擊 Download More Samples,而且選擇 C# 4.0 in a Nutshell: More Chapters。(譯者注:如今應該是 C# 5.0 in a Nutshell 和 C# 6.0 in a Nutshell 了)數據庫

2爲什麼須要 PFX?Permalink

近年來,CPU 時鐘頻率發展陷於停滯,製造商已經將重心轉移至增長核心數量。這對咱們程序員來講是個問題,由於標準的單線程代碼沒法自動利用那些增長的核心來提高程序運行速度。編程

利用多個核心對大多數服務端應用程序來講很容易,每一個線程能夠獨立處理單獨的客戶端請求,但在桌面環境下就不那麼容易了,由於一般這須要你優化計算密集型代碼,按以下步驟進行:數組

  1. 將工做分解成塊。
  2. 多線程並行處理這些工做塊。
  3. 以線程安全和高效的方式整理結果。

儘管你能夠使用傳統的多線程構造,但那比較笨拙,尤爲是在分解工做和整理結果的步驟。而且,爲確保線程安全,一般的策略是使用,而它在不少線程同時訪問一份數據時會致使大量競爭。緩存

PFX 庫就是專門被設計用來爲這些場景提供幫助的。

利用多核心或多處理器的編程被稱爲並行編程(parallel programming)。它是多線程這個更寬泛概念的子集。

2.1PFX 概念Permalink

有兩種分解工做的策略:數據並行(data parallelism)和任務並行(task parallelism)。

當一系列任務須要處理不少數據時,可讓每一個線程都執行這一系列(相同的)任務來處理一部分數據(即全部數據的一個子集)。這樣實現的並行化稱爲數據並行,由於咱們是爲線程分解了數據。與此相對,任務並行是指對任務進行分解,換句話說就是讓每一個線程執行不一樣的任務。

一般,對高度並行的硬件來講,數據並行更簡單,可伸縮性也更好,由於它減小或消除了共享數據(也就減小了競爭和線程安全問題)。而且,事實上通常都是數據比任務要多,因此數據並行能夠增長併發的可能。

數據並行也有利於結構化並行(structured parallelism),意思是說並行工做單元的啓動和完成是在程序中的同一位置。相對的,任務並行趨向於非結構化,就是說並行工做單元的啓動和完成可 能分散在程序各處。結構化並行比較簡單,而且不易出錯,也讓你能夠把工做分解和線程協調(甚至包括結果整理)這些複雜的任務交給 PFX 庫來完成。

2.2PFX 組件Permalink

PFX 包含兩層功能。上層是由結構化數據並行 API:PLINQParallel類組成。下層包含任務並行的類,以及一組額外的構造,來幫助你實現並行編程。

並行編程組件

PLINQ 提供了最豐富的功能:它可以自動化並行的全部步驟,包括分解工做、多線程執行、最後把結果整理成一個輸出序列。它被稱爲聲明式(declarative) 的,由於你只是聲明但願並行化你的工做(構造一個 LINQ 查詢),而後讓 Framework 來處理實現細節。相對的,另外一種方式是指令式(imperative)的,這種方式是須要你顯式編寫代碼來處理工做分解和結果整理。例如使用Parallel類時,你必須本身整理結果,而若是使用任務並行構造,你還必須本身分解工做。

  分解工做 整理結果
PLINQ    
Parallel   -
PFX 的任務並行 - -

併發集合自旋基元可 以幫助你實現低層次的並行編程。這很重要,由於 PFX 不只被設計適用於當今的硬件,也適用於將來更多核心的處理器。若是你但願搬運一堆木塊,而且有 32 個工人,最麻煩的是如何讓工人們搬運木塊時不互相擋道。這與把算法分解運行在 32 個核心上相似:若是普通的鎖被用於保護公共資源,所產生的阻塞可能意味着同時只有一小部分核心真正在工做。併發集合專門針對於高併發訪問,致力於最小化或消除阻塞。PLINQ 和 Parallel類就依賴於併發集合和自旋基元來實現高效的工做管理。

PFX 與傳統的多線程Permalink

傳統多線程的場景是,即便在單核的機器上,使用多線程也有好處,而此時並無真正的並行發生。就像咱們以前討論過的:保持用戶界面的響應以及同時下載兩個網頁。

這一部分將要講到的一些構造有時對於傳統多線程也有用。特別是:

2.3什麼時候使用 PFXPermalink

PFX 主要用於並行編程:充分利用多核處理器來加速執行計算密集型代碼。

充分利用多個核心的挑戰在於阿姆達爾定律(Amdahl’s law),它指出經過並行化產生的最大性能提高,取決於有多少必須順序執行的代碼段。例如,若是一個算法只有三分之二的執行時間能夠並行,即便有無數核心,也沒法得到超過三倍的性能提高。

所以,在使用 PFX 前,有必要先檢查可並行代碼中的瓶頸。還須要考慮下,你的代碼是否有必要是計算密集的,優化這裏每每是最簡單有效的方法。然而,這也須要平衡,由於一些優化技術會使代碼難以並行化。

最容易獲益的是「很差意思不併行的問題(embarrassingly parallel problems)」:工做能夠很容易地被分解爲多個任務,每一個任務本身能夠高效執行(結構化並行很是適合這種問題)。例如:不少圖片處理任務、光線跟蹤 算法、數學和密碼學方面的暴力計算和破解。而相反的例子是:實現快速排序算法的優化版本,想把它實現得好須要必定思考,而且可能須要非結構化並行。

3PLINQPermalink

PLINQ 會自動並行化本地的 LINQ 查詢。其優點在於使用簡單,由於將工做分解和結果整理的負擔交給了 Framework。

使用 PLINQ 時,只要在輸入序列上調用AsParallel(),而後像日常同樣繼續 LINQ 查詢就能夠了。下邊的查詢計算 3 到 100,000 內的素數,這會充分利用目標機器上的全部核心。

// 使用一個簡單的(未優化)算法計算素數。 // // 注意:這一部分提供的全部代碼均可以在 LINQPad 中試驗。 IEnumerable<int> numbers = Enumerable.Range (3, 100000-3); var parallelQuery = from n in numbers.AsParallel() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; int[] primes = parallelQuery.ToArray(); 

AsParallelSystem.Linq.ParallelEnumerable中的一個擴展方法。它使用ParallelQuery<TSource>來封裝輸入,就會將你隨後調用的 LINQ 查詢操做符綁定在ParallelEnumerable中定義的另一組擴展方法上。它們提供了全部標準查詢操做符的並行化實現。本質上,它們就是將輸入序列進行分區,造成工做塊,並在不一樣的線程上執行,以後再將結果整理成一個輸出序列:

PLINQ 執行

調用AsSequential()能夠拆封ParallelQuery,使隨後的查詢操做符綁定到標準查詢操做符來順序執行。在調用有反作用或非線程安全的方法前,有必要這樣作。

對於那些接受兩個輸入序列的查詢操做符(JoinGroupJoinContactUnionIntersectZip)來講,必須在這兩個輸入序列上都使用AsParallel()(不然將拋出異常)。然而,不須要爲中間過程的查詢使用AsParallel,由於 PLINQ 的查詢操做符會輸出另外一個ParallelQuery序列。實際上,在這個輸出序列上再次調用AsParallel會下降效率,它會強制對序列進行合併和從新分區。

mySequence.AsParallel() // 使用 ParallelQuery<int> 封裝序列 .Where (n => n > 100) // 輸出另外一個 ParallelQuery<int> .AsParallel() // 不須要,會下降效率! .Select (n => n * n) 

並不是全部的查詢操做符均可以被有效地並行化。對於那些不能的,PLINQ 使用了順序的實現。若是 PLINQ 認爲並行化的開銷實際會使查詢變慢,它也會順序執行。

PLINQ 僅適用於本地集合:它沒法在 LINQ to SQL 或 Entity Framework 中使用,由於在那些場景中,LINQ 會被翻譯成 SQL 語句,而後在數據庫服務器上執行。然而,你能夠使用 PLINQ 對從數據庫查詢得到的結果執行進一步的本地查詢。

若是 PLINQ 查詢拋出異常,它會被封裝進AggregateException從新拋出,其InnerExceptions屬性包含真正的異常。詳見使用 AggregateException

爲何 AsParallel 不是默認的?Permalink

咱們知道AsParallel能夠透明的並行化 LINQ 查詢,那麼問題來了,「微軟爲何不直接並行化標準查詢操做符,使 PLINQ 成爲默認的?」

有不少緣由使其成爲這種選擇使用(opt-in)的方式。首先,要使 PLINQ 有用,必需要有必定數量的計算密集型任務,它們能夠被分配到多個工做線程。大多數 LINQ to Objects 的查詢執行很是快,根本不須要並行化,並行化過程當中的任務分區、結果整理以及線程協調反而會使程序變慢。

其次:

  • PLINQ 查詢的輸出(默認狀況下)在元素排序方面不一樣於 LINQ 查詢
  • PLINQ 將異常封裝在AggregateException中(可以處理拋出的多個異常)。
  • 若是查詢引用了非線程安全的方法,PLINQ 會給出不可靠的結果。

最後,PLINQ 爲了進行微調提供了一些鉤子(hook)。把這些累贅加入標準的 LINQ to Objects 的 API 會增長使用障礙。

3.1並行執行的特徵Permalink

與普通的 LINQ 查詢同樣,PLINQ 查詢也是延遲估值的。這意味着只有當結果開始被使用時,查詢纔會被觸發執行。一般結果是經過一個foreach循環被使用(經過轉換操做符也會觸發,例如ToArray,還有返回單個元素或值的操做符)。

當枚舉結果時,執行過程與普通的順序查詢略有不一樣。順序查詢徹底由使用方經過「拉」的方式驅動:每一個元素都在使用方須要時從輸入序列中被提取。並行 查詢一般使用獨立的線程從輸入序列中提取元素,這可能比使用方的須要稍微提早了一些(很像一個給播報員使用的提詞機,或者 CD 機中的防震緩衝區)。而後經過查詢鏈並行處理這些元素,將結果保存在一個小緩衝區中,以準備在須要的時候提供給使用方。若是使用方在枚舉過程當中暫停或中 斷,查詢也會暫停或中止,這樣能夠不浪費 CPU 時間或內存。

你能夠經過在AsParallel以後調用WithMergeOptions來調整 PLINQ 的緩衝行爲。默認值AutoBuffered一般能產生最佳的總體效果;NotBuffered禁用緩衝,若是你但願儘快看到結果能夠使用這個;FullyBuffered在呈現給使用方前緩存整個查詢的輸出(OrderByReverse操做符天生以這種方式工做,取元素、聚合和轉換操做符也是同樣)。

3.2PLINQ 與排序Permalink

並行化查詢操做符的一個反作用是:當整理結果時,不必定能與它們提交時的順序保持一致,就如同以前圖中所示的那樣。換句話說,就是沒法像普通的 LINQ 那樣能保證序列的正常順序。

若是你須要保持序列順序,能夠經過在AsParallel後調用AsOrdered()來強制它保證:

myCollection.AsParallel().AsOrdered()... 

在大量元素的狀況下調用AsOrdered會形成必定性能損失,由於 PLINQ 必須跟蹤每一個元素原始位置。

以後你能夠經過調用AsUnordered來取消AsOrdered的效果:這會引入一個「隨機洗牌點(random shuffle point)」,容許查詢從這個點開始更高效的執行。所以,若是你但願僅爲前兩個查詢操做保持輸入序列的順序,能夠這樣作:

inputSequence.AsParallel().AsOrdered() .QueryOperator1() .QueryOperator2() .AsUnordered() // 從這開始順序可有可無 .QueryOperator3() // ... 

AsOrdered不是默認的,由於對於大多數查詢來講,原始的輸入順序可有可無。換句話說,若是AsOrdered是默認的,你就不得不爲大多數並行查詢使用AsUnordered來得到最好的性能,這會成爲負擔。

3.3PLINQ 的限制Permalink

目前,PLINQ 在可以並行化的操做上有些實用性限制。這些限制可能會在以後的更新包或 Framework 版本中解決。

下列查詢操做符會阻止查詢的並行化,除非源元素是在它們原始的索引位置:

  • TakeTakeWhileSkipSkipWhile
  • SelectSelectManyElementAt這幾個操做符的帶索引版本

大多數查詢操做符都會改變元素的索引位置(包括可能移除元素的那些操做符,例如Where)。這意味着若是你但願使用上述操做符,就要在查詢開始的地方使用。

下列查詢操做符能夠並行化,但會使用代價高昂的分區策略,有時可能比順序執行還慢。

  • JoinGroupByGroupJoinDistinctUnionIntersectExcept

Aggregate操做符的帶種子(seed)的重載是不能並行化的,PLINQ 提供了專門的重載來解決。

其它全部操做符都是能夠並行化的,然而使用這些操做符並不能確保你的查詢會被並行化。若是 PLINQ 認爲進行分區的開銷會致使部分查詢變慢,它也許會順序執行查詢。你能夠覆蓋這個行爲,方法是在AsParallel()以後調用以下代碼來強制並行化:

.WithExecutionMode (ParallelExecutionMode.ForceParallelism) 

3.4例:並行拼寫檢查Permalink

假設咱們但願實現一個拼寫檢查程序,它在處理大文檔時,可以經過充分利用全部可用的核心來快速運行。咱們把算法設計成一個 LINQ 查詢,這樣就能夠很容易的並行化它。

第一步是下載英文單詞字典,爲了可以高效查找,將其放在一個HashSet中:

if (!File.Exists ("WordLookup.txt")) // 包含約 150,000 個單詞 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); 

而後,使用wordLookup來建立一個測試「文檔」,該「文檔」是個包含了一百萬個隨機單詞的數組。建立完數組後,引入兩個拼寫錯誤:

var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入兩個 wordsToTest [23456] = "wubsie"; // 拼寫錯誤 

如今,經過對比wordLookup檢查wordsToTest,來完成這個並行的拼寫檢查程序。PLINQ 讓這變得很簡單:

var query = wordsToTest .AsParallel() .Select ((word, index) => new IndexedWord { Word=word, Index=index }) .Where (iword => !wordLookup.Contains (iword.Word)) .OrderBy (iword => iword.Index); query.Dump(); // 在 LINQPad 中顯示輸出 

下邊是 LINQPad 中的顯示的輸出:

LINQPad的查詢輸出

IndexedWord是一個自定義的結構體,定義以下:

struct IndexedWord { public string Word; public int Index; } 

斷定器中的wordLookup.Contains方法做爲查詢的主要部分,它使得這個查詢值得並行化。

咱們能夠使用匿名類型來代替IndexedWord結構體,從而稍微簡化下這個查詢。然而這會下降性能,由於匿名類型(是類,所以是引用類型)會產生分配堆內存的開銷,以及以後的垃圾回收。

這個區別對於順序查詢來講沒太大關係,但對於並行查詢來講,基於棧的內存分配則至關有利。這是由於基於棧的內存分配是能夠高度並行化的(由於每一個線程有其本身的棧),反之基於堆的內存分配會使全部線程競爭同一個堆,它是由單一的內存管理器和垃圾回收器管理的。

使用 ThreadLocal<T>Permalink

來擴展一下咱們的例子,讓建立隨機測試單詞列表的過程並行化。咱們把它做爲 LINQ 查詢來構造,這樣事情就簡單多了。如下是順序執行版本:

string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); 

不幸的是,對Random.Next的調用不是線程安全的,因此實現並行化不是向查詢語句直接插入AsParallel()這麼簡單。一個可能的解決辦法是寫個方法對random.Next加鎖,然而這會限制併發能力。更好的處理辦法是使用ThreadLocal<Random>爲每一個線程建立獨立的Random對象。而後咱們能夠使用以下代碼來並行化查詢:

var localRandom = new ThreadLocal<Random> ( () => new Random (Guid.NewGuid().GetHashCode()) ); string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel() .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)]) .ToArray(); 

在實例化Random對象的工廠方法中,咱們傳遞了一個Guid的散列值,用來確保:若是兩個Random對象在很短的時間範圍內被建立,它們能夠生成不一樣的隨機數序列。

什麼時候使用 PLINQPermalink

在你的程序中尋找 LINQ 查詢,嘗試並行化它們貌似是很誘人的。然而這一般沒什麼用,由於絕大多數明顯應該使用 LINQ 的地方執行都很快,因此並行化並無什麼好處。更好的方法是找到 CPU 密集型工做的瓶頸,而後考慮「這能寫成 LINQ 查詢嗎?」(這樣重構的一個好處是 LINQ 一般能夠使代碼變得更短,而且更具可讀性。)

PLINQ 很是適合於「很差意思不併行的問題(embarrassingly parallel problems)」。它也能很好的應用於結構化阻塞任務(structured blocking tasks),例如同時調用多個 web 服務(見調用阻塞或 I/O 密集型功能)。

對於圖像處理來講 PLINQ 是個糟糕的選擇,由於整理幾百萬個像素到輸出序列將造成瓶頸。更好的方法是把像素直接寫入數組或非託管的內存塊,而後使用Parallel類或任務並行來管理多線程。(也能夠使用ForAll來繞過結果整理。若是該圖像處理算法天生適合 LINQ,這麼作可能有益。)

3.5純方法Permalink

(譯者注:pure function 譯爲純方法,是指一個方法 / 函數不能改變任何狀態,也不能進行任何 I/O 操做,它的返回值不能依賴任何可能被改變的狀態,而且使用相同的輸入調用就會產生相同的輸出。)

由於 PLINQ 會在並行的線程上運行查詢,所以必須注意不要執行非線程安全的操做。特別須要注意,對變量進行寫操做有反作用(side-effecting),是非線程安全的。

// 下列查詢將每一個元素與其索引相乘。 // 給定一個 0 到 999 的輸入序列, 它應該輸出元素的平方。 int i = 0; var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++; 

能夠經過使用鎖或Interlocked來確保i的自增是線程安全的,可是問題仍然存在,i並不能保證對應輸入元素的原始索引。而且加上AsOrdered也沒法解決這個問題,由於AsOrdered僅僅確保輸出是按順序的,就像順序執行的輸出順序同樣。但這並不意味着實際的處理過程也是按順序的。

替代方法是將這個查詢重寫,使用帶索引的Select版本。

var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i); 

爲了達到最佳性能,任何被查詢操做符調用的方法必須是線程安全的:不要給字段或屬性賦值(無反作用,純方法)。若是用來保證線程安全,查詢的並行能力將會受到限制。這個限制能夠經過鎖定的持續時間除以花費在方法上的總時間來計算。

3.6調用阻塞或 I/O 密集型功能Permalink

有時一個查詢的長時間運行並非由於是 CPU 密集型操做,而是由於它在等待某些東西,例如等待網頁下載或是硬件的響應。PLINQ 可以有效地並行化這種類型的查詢,能夠經過在AsParallel後調用WithDegreeOfParallelism來提示這種特徵。例如,假設咱們但願同時 ping 6 個網站。比起使用異步委託或手動讓 6 個線程自旋,使用 PLINQ 查詢能夠輕鬆實現它:

from site in new[] { "www.albahari.com", "www.linqpad.net", "www.oreilly.com", "www.takeonit.com", "stackoverflow.com", "www.rebeccarey.com" } .AsParallel().WithDegreeOfParallelism(6) let p = new Ping().Send (site) select new { site, Result = p.Status, Time = p.RoundtripTime } 

WithDegreeOfParallelism強制 PLINQ 同時運行指定數量的任務。在調用阻塞方法(例如Ping.Send)時有必要這麼作,不然的話,PLINQ 會認爲這個查詢是 CPU 密集型的,並進行相應的任務分配。在雙核機器上,PLINQ 會默認同時運行 2 個任務,對於上述狀況來講這顯然不是咱們但願看到的。

線程池的影響,PLINQ 一般爲每一個任務分配一個線程。能夠經過調用ThreadPool.SetMinThreads來加速初始線程的建立速度。

再給一個例子:假設咱們要實現一個監控系統,但願它不斷未來自 4 個安全攝像頭的圖像合併成一個圖像,並在閉路電視上顯示。使用下邊的類來表示一個攝像頭:

class Camera { public readonly int CameraID; public Camera (int cameraID) { CameraID = cameraID; } // 獲取來自攝像頭的圖像: 返回一個字符串來代替圖像 public string GetNextFrame() { Thread.Sleep (123); // 模擬獲取圖像的時間 return "Frame from camera " + CameraID; } } 

要獲取一個合成圖像,咱們必須分別在 4 個攝像頭對象上調用GetNextFrame。假設操做主要是受 I/O 影響的,經過並行化咱們能將幀率提高 4 倍,即便是在單核機器上。PLINQ 使用一小段程序就能實現它:

Camera[] cameras = Enumerable.Range (0, 4) // 建立 4 個攝像頭對象 .Select (i => new Camera (i)) .ToArray(); while (true) { string[] data = cameras .AsParallel().AsOrdered().WithDegreeOfParallelism (4) .Select (c => c.GetNextFrame()).ToArray(); Console.WriteLine (string.Join (", ", data)); // 顯示數據... } 

GetNextFrame是一個阻塞方法,因此咱們使用了WithDegreeOfParallelism來得到指望的併發度。在咱們的例子中,阻塞是在調用Sleep時發生。而在真實狀況下,阻塞的發生是由於從攝像頭中獲取圖像是 I/O 密集型操做,而不是 CPU 密集型操做。

調用AsOrdered能夠確保圖像按照一致的順序顯示。由於序列中只有 4 個元素,因此它對性能的影響能夠忽略不計。

改變併發度Permalink

在一個 PLINQ 查詢內,僅可以調用WithDegreeOfParallelism一次。若是你須要再次調用它,必須在查詢中經過再次調用AsParallel()強制進行查詢的合併和從新分區:

"The Quick Brown Fox" .AsParallel().WithDegreeOfParallelism (2) .Where (c => !char.IsWhiteSpace (c)) .AsParallel().WithDegreeOfParallelism (3) // 強制合併和從新分區 .Select (c => char.ToUpper (c)) 

3.7取消Permalink

當在foreach循環中使用 PLINQ 查詢的結果時,取消該查詢很簡單:使用break退出循環就能夠了。查詢會被自動取消,由於枚舉器會被隱式銷燬。

對於結束一個使用轉換、取元素或聚合操做符的查詢來講,你能夠在其它線程使用取消標記來取消它。在AsParallel後調用WithCancellation來添加一個標記,並把CancellationTokenSource對象的Token屬性做爲參數傳遞。以後另外一個線程就能夠在這個CancellationTokenSource對象上調用Cancel,它會在查詢的使用方那邊拋出OperationCanceledException異常。

IEnumerable<int> million = Enumerable.Range (3, 1000000); var cancelSource = new CancellationTokenSource(); var primeNumberQuery = from n in million.AsParallel().WithCancellation (cancelSource.Token) where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; new Thread (() => { Thread.Sleep (100); // 在 100 毫秒後 cancelSource.Cancel(); // 取消查詢 } ).Start(); try { // 開始運行查詢 int[] primes = primeNumberQuery.ToArray(); // 永遠到不了這裏,由於其它線程會進行取消操做。 } catch (OperationCanceledException) { Console.WriteLine ("Query canceled"); } 

PLINQ 不會直接停止線程,由於這麼作是危險的。在取消時,它會等待全部工做線程處理完當前的元素,而後結束查詢。這意味着查詢調用的任何外部方法都會執行完成。

3.8優化 PLINQPermalink

輸出端優化Permalink

PLINQ 的一個優勢是它可以很容易地將並行化任務的結果整理成一個輸出序列。然而有時,最終要作的是在輸出序列的每一個元素上運行一些方法:

foreach (int n in parallelQuery) DoSomething (n); 

若是是上述狀況,而且不關心元素的處理順序,那麼能夠使用 PLINQ 的ForAll方法來提升效率。

ForAll方法在ParallelQuery的每一個輸出元素上運行一個委託。它直接掛鉤(hook)到 PLINQ 內部,繞過整理和枚舉結果的步驟。舉個栗子:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write); 

ForAll 方法

整理和枚舉結果的開銷不是很是大,因此當有大量輸入元素且處理執行很快的時候,才能最大化ForAll優化的收益。

輸入端優化Permalink

PLINQ 有 3 種分區策略,用來分配輸入元素到線程:

策略 元素分配 相對性能
塊分區(Chunk partitioning) 動態 平均
範圍分區(Range partitioning) 靜態 差 - 極好
散列分區(Hash partitioning) 靜態

對於那些須要比較元素的查詢操做符(GroupByJoinGroupJoinIntersectExceptUnionDistinct),PLINQ 老是使用散列分區。散列分區相對低效,由於它必須預先計算每一個元素的散列值(擁有一樣散列值的元素會在同一個線程中被處理)。若是發現運行太慢,惟一的選擇是調用AsSequential來禁止並行處理。

對於其它全部查詢操做符,你能夠選擇使用範圍分區或塊分區。默認狀況下:

  • 若是輸入序列能夠經過索引訪問(數組或是IList<T>的實現),PLINQ 選用範圍分區。
  • 不然,PLINQ 選用塊分區。

歸納來說,對於較長的序列且處理每一個元素所需的 CPU 時間比較近似時,範圍分區更快。不然,塊分區一般更快。

若是想強制使用範圍分區:

  • 若是查詢以Enumerable.Range開始,將其替換爲ParallelEnumerable.Range
  • 不然,在輸入序列上調用ToListToArray(顯然,你須要考慮在這裏產生的性能開銷)。

ParallelEnumerable.Range並非對Enumerable.Range(…).AsParallel()的簡單封裝。它經過激活範圍分區改變了查詢的性能。

若是想強制使用塊分區,就經過調用Partitioner.Create(在命名空間System.Collection.Concurrent中)來封裝輸入序列,例如:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 }; var parallelQuery = Partitioner.Create (numbers, true).AsParallel() .Where (...) 

Partitioner.Create的第二個參數表示:但願對查詢開啓負載均衡(load-balance),這是另外一個使用塊分區的動機。

塊分區的工做方式是按期從輸入序列中抓取小塊元素來處理。PLINQ 一開始會分配很是小的塊(一次 1 到 2 個元素),而後隨着查詢的進行增長塊的大小:這確保小序列可以被有效地並行化,而大序列不會致使過多的抓取工做。若是一個工做線程碰巧拿到了一些「容易」 的元素(處理很快),它最終將拿到更多的塊。這個系統使每一個線程保持均等的繁忙程度(使核心負載均衡)。惟一的不利因素是從共享的輸入序列中獲取元素須要 同步(一般使用一個排它鎖),這會產生必定的開銷和競爭。

分區策略

範圍分區會繞過正常的輸入端枚舉,而且爲每一個工做線程預分配相同數量的元素,避免了在輸入序列上的競爭。可是若是某些線程拿到了容易的元素並很早就 完成了處理,在其它工做線程仍在繼續工做的時候它就會是空閒的。咱們以前的素數計算的例子在使用範圍分區時就性能不高。舉個範圍分區適用的例子,計算 1000 萬之內數字的平方和:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i)) 

ParallelEnumerable.Range返回一個ParallelQuery<T>,所以不須要在以後調用AsParallel

範圍分區不是必須把元素分紅相鄰的塊,它也許會選用一種 「條紋式(striping)」策略。例如,有兩個工做線程,一個工做線程可能會處理奇數位置的元素,而另外一個工做線程處理偶數位置的元素。TakeWhile操做符幾乎必定會觸發條紋式策略,用來避免處理序列後邊沒必要要的元素。

3.9並行化自定義聚合Permalink

PLINQ 能夠在無需額外干預的狀況下有效地並行化SumAverageMinMax操做符。然而,Aggregate操做符對於 PLINQ 來講是個特殊的麻煩。

若是不熟悉Aggregate操做符,你能夠認爲它就是一個SumAverageMinMax的泛化版本,換句話說,就是一個能夠使你經過自定義的聚合算法實現非一般聚合操做的操做符。以下代碼展示了Aggregate如何實現Sum操做符的工做:

int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate (0, (total, n) => total + n); // 6 

Aggregate的第一個參數是 seed(種子,初值),聚合操做從這裏開始。第二個參數是一個用於更新聚合值的表達式,該表達式生成一個新的元素。第三個參數是可選的,用來表示如何經過聚合值生成最終的結果值。

大多數Aggregate被設計用來解決的問題都可以使用foreach循環輕鬆解決,而且這也是更熟悉的語法。而Aggregate的優勢在於對龐大或複雜的聚合操做能夠使用 PLINQ 來進行聲明式的並行化。

無種子的聚合Permalink

調用Aggregate時能夠省略種子值,這種狀況下第一個元素會被隱式看成種子,以後聚合處理會從第二個元素開始進行。下邊是一個無種子的例子:

int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate ((total, n) => total + n); // 6 

這獲得了與以前相同的結果,然而實際上倒是進行了不一樣的計算。以前例子計算的是 0+1+2+3,而如今計算的是1+2+3。經過乘法運算來代替加法運算可以更好地說明這個不一樣:

int[] numbers = { 1, 2, 3 }; int x = numbers.Aggregate (0, (prod, n) => prod * n); // 0*1*2*3 = 0 int y = numbers.Aggregate ( (prod, n) => prod * n); // 1*2*3 = 6 

如同咱們立刻將要看到的,無種子的聚合的優勢在於被並行化時不須要使用特殊的重載。然而,無種子的聚合存在一個陷阱:無種子的聚合方法指望使用的委 託中的計算應知足交換律和結合律。若是用在別的狀況下,結果要否則是反直覺的(普通查詢),要否則是不肯定的(PLINQ 並行化查詢)。例如考慮以下函數:

(total, n) => total + n * n 

它既不知足交換律也不知足結合律。(例如:1+2*2 != 2+1*1)。咱們來看一下使用它來對數字 二、三、4 計算平方和時會發生什麼:

int[] numbers = { 2, 3, 4 }; int sum = numbers.Aggregate ((total, n) => total + n * n); // 27 

原本的計算應該是:

2*2 + 3*3 + 4*4 // 29 

但如今的計算是:

2 + 3*3 + 4*4 // 27 

能夠經過多種方法解決這個問題。首先,咱們能夠在序列最前端加入 0 做爲第一個元素:

int[] numbers = { 0, 2, 3, 4 }; 

這不只不優雅,並且在並行執行的狀況下仍然會產生錯誤的結果,由於 PLINQ 會選擇多個元素做爲種子,這至關於假定了計算知足結合律。爲說明這個問題,用以下方式表示咱們的聚合函數:

f(total, n) => total + n * n 

LINQ to Objects 會這樣計算:

f(f(f(0, 2),3),4) 

PLINQ 可能會這樣計算:

f(f(0,2),f(3,4)) 

結果是:

第一個分區:   a = 0 + 2*2  (= 4)
第二個分區:   b = 3 + 4*4  (= 19)
最終結果:     a + b*b  (= 365)
甚至多是:    b + a*a  (= 35)

有兩種好的解決方案:第一種是將其轉換爲有種子的聚合,使用 0 做爲種子。這種方案帶來的複雜度的提高僅僅是使用 PLINQ 時,咱們須要使用特殊的重載,確保查詢並行執行(立刻會看到)。

第二種解決方案是:重構查詢,使聚合函數知足交換律和結合律:

int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n); 

固然,在這種簡單的場景下你能夠(而且應該)使用Sum操做符來代替Aggregate

int sum = numbers.Sum (n => n * n); 

實際上能夠更進一步使用SumAverage。例如,能夠使用Average來計算均方根(root-mean-square):

Math.Sqrt (numbers.Average (n => n * n)) 

甚至是標準差:

double mean = numbers.Average(); double sdev = Math.Sqrt (numbers.Average (n => { double dif = n - mean; return dif * dif; })); 

上述兩個方法都是安全、高效而且可徹底並行化的。

並行化聚合Permalink

咱們剛剛看到了無種子的聚合,提供的委託必須知足交換律和結合律。若是違反這個規則,PLINQ 會給出錯誤的結果,由於它可能使用輸入序列中多個的元素做爲種子,來同時聚合多個分區。

指定種子的聚合也許看起來像是使用 PLINQ 的安全選擇,然而不幸的是,這樣一般會致使順序執行,由於它依賴於單獨一個種子。爲減緩這個問題,PLINQ 提供了另外一個Aggregate的重載,容許你指定多個種子,或者是一個種子工廠方法。對每一個線程,它執行這個方法來生成一個獨立的種子,這就造成了一個線程局部的累加器,經過它在聚合局部元素。

你必須再提供一個方法來指示如何合併局部累加器至主累加器。最後,Aggregate的這個重載還須要一個委託,用來對結果進行任意的最終變換(有些不必,你能夠以後對結果運行一些代碼完成一樣操做)。因此,這裏有 4 個委託,按照它們被傳遞的順序:

  • 種子工廠(seedFactory):
    返回一個新的局部累加器
  • 更新累加器方法(updateAccumulatorFunc):
    聚合元素至局部累加器
  • 合併累加器方法(combineAccumulatorFunc):
    合併局部累加器至主累加器
  • 結果選擇器(resultSelector):
    在結果上應用任意最終變換

在簡單的場景中,你能夠指定一個種子值來代替種子工廠。當種子是你須要改變的引用類型時這種策略行不通,由於同一個實例將在線程間共享。

提供一個簡單的例子,下邊的代碼對numbers數組中的值進行求和:

numbers.AsParallel().Aggregate ( () => 0, // 種子工廠 (localTotal, n) => localTotal + n, // 更新累加器方法 (mainTot, localTot) => mainTot + localTot, // 合併累加器方法 finalResult => finalResult) // 結果選擇器 

這個例子有些刻意,咱們能夠使用更簡單的方式獲取相同的結果(例如無種子的聚合,或者更好的選擇是使用Sum操做符)。給一個更加實際的例子,假設咱們要計算字符串中每一個英文字母的出現頻率。簡單的順序執行方案看起來是這樣:

string text = "Let’s suppose this is a really long string"; var letterFrequencies = new int[26]; foreach (char c in text) { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; }; 

基因序列是一個輸入文本可能會很是長的例子,它的「字母表」是由字母 acgt 組成。

爲了將它並行化,咱們能夠把foreach替換爲Parallel.ForEach(在接下來的一節會講到),但這會致使共享數組上的併發問題。對數組的訪問加能夠解決問題,但會下降併發的可能性。

Aggregate提供了一個好的解決方案。這種狀況下,累加器是一個數組,就像是以前例子中letterFrequencies數組。使用Aggregate的順序執行版本以下:

int[] result = text.Aggregate ( new int[26], // 建立「累加器」 (letterFrequencies, c) => // 聚合一個字母至累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; return letterFrequencies; }); 

下面是並行版本,它使用 PLINQ 的專門重載:

int[] result = text.AsParallel().Aggregate ( () => new int[26], // 新建局部累加器 (localFrequencies, c) => // 聚合至局部累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) localFrequencies [index]++; return localFrequencies; }, // 聚合局部累加器至主累加器 (mainFreq, localFreq) => mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(), finalResult => finalResult // 對結果進行 ); // 最終變換 

注意:局部累加方法會改動localFrequencies數組。這個優化是很是重要的,也是合法的,由於localFrequencies是每一個線程的局部變量。

4Parallel 類Permalink

PFX 經過Parallel類上的三個靜態方法提供告終構化並行的基本形式:

Parallel.Invoke
並行執行一組委託
Parallel.For
C# for循環的並行版本
Parallel.ForEach
C# foreach循環的並行版本

三個方法都是在工做完成前會阻塞。相似於PLINQ,若是有未處理的異常,其它工做線程會在當前迭代完成以後中止,異常會被封裝在AggregateException中拋給調用方。

4.1Parallel.InvokePermalink

Parallel.Invoke並行執行一組Action類型的委託,而後等待它們完成。這個方法最簡單的版本以下:

public static void Invoke (params Action[] actions); 

下面是使用Parallel.Invoke來同時下載兩個網頁:

Parallel.Invoke ( () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"), () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html")); 

這表面上看起來像是建立了兩個Task對象(或異步委託)並等待它們。可是有個重要的區別:Parallel.Invoke在你傳遞一百萬個委託時仍然能高效工做。這是由於它會對大量元素進行分區(partition),造成多個塊,再對其分配底層的Task。而不是直接對每個委託建立獨立的Task

使用Parallel上的全部方法時,都須要自行實現整理結果的代碼。這意味着你須要注意線程安全。例如,下面的代碼不是線程安全的:

var data = new List<string>(); Parallel.Invoke ( () => data.Add (new WebClient().DownloadString ("http://www.foo.com")), () => data.Add (new WebClient().DownloadString ("http://www.far.com"))); 

對添加的過程加能夠解決問題,可是若是你的委託數量更多,它們每個執行的又很快,那麼鎖可能形成瓶頸。更好的解決方案是使用線程安全的集合,好比ConcurrentBag就是這裏的理想方案。

Parallel.Invoke也有接受ParallelOptions對象的重載:

public static void Invoke (ParallelOptions options, params Action[] actions); 

經過ParallelOptions,你能夠添加取消標記、限制最大併發數量和指定自定義任務調度器。若是要執行的委託數量(大體上)大於核心數,那麼使用取消標記纔有意義:在取消時,全部未啓動的委託都會被拋棄。而全部已經在執行的委託會繼續完成。對於如何使用取消標記,能夠參考取消中的例子。

4.2Parallel.For 和 Parallel.ForEachPermalink

Parallel.ForParallel.ForEach與 C# forforeach相似,但會並行執行,而不是順序執行。下面是它們(最簡單的)方法簽名:

public static ParallelLoopResult For ( int fromInclusive, int toExclusive, Action<int> body) public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource> body) 

對於下面的for循環:

for (int i = 0; i < 100; i++) Foo (i); 

並行版本是這樣:

Parallel.For (0, 100, i => Foo (i)); 

或更簡潔的:

Parallel.For (0, 100, Foo); 

而對於下面的foreach循環:

foreach (char c in "Hello, world") Foo (c); 

並行版本是這樣:

Parallel.ForEach ("Hello, world", Foo); 

給一個實際點的例子。引入System.Security.Cryptography命名空間,而後咱們能夠像這樣並行生成六組密鑰對的字符串形式:

var keyPairs = new string[6]; Parallel.For (0, keyPairs.Length, i => keyPairs[i] = RSA.Create().ToXmlString (true)); 

Parallel.Invoke一樣,咱們也可讓Parallel.ForParallel.ForEach執行大量工做項,它們也會被分區,分配給任務高效執行。

上面的例子也能夠使用PLINQ來實現:

string[] keyPairs = ParallelEnumerable.Range (0, 6) .Select (i => RSA.Create().ToXmlString (true)) .ToArray(); 

外循環 vs 內循環Permalink

Parallel.ForParallel.ForEach一般更適合用於外循環,而不是內循環。這是由於前者會帶來更大的分區塊,就稀釋了管理並行的開銷。通常沒有必要同時並行內外循環。對於下面的例子,咱們須要 100 個核心才能讓內循環的並行有益處:

Parallel.For (0, 100, i => { Parallel.For (0, 50, j => Foo (i, j)); // 對於內循環, }); // 順序執行更好。 

帶索引的 Parallel.ForEachPermalink

有時須要獲知循環迭代的索引。在順序的foreach中這很簡單:

int i = 0; foreach (char c in "Hello, world") Console.WriteLine (c.ToString() + i++); 

然而在並行環境中,讓共享變量自增並非線程安全的。你必須使用下面這個ForEach版本:

public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body) 

先忽略ParallelLoopState(下一節會講)。如今咱們關注的是Action的第三個long類型的參數,它表明了循環的索引:

Parallel.ForEach ("Hello, world", (c, state, i) => { Console.WriteLine (c.ToString() + i); }); 

爲了把它用到實際場景中,咱們來回顧下使用 PLINQ 的拼寫檢查。下面的代碼加載了一個字典,並生成了一個用來測試的數組,有一百萬個測試項:

if (!File.Exists ("WordLookup.txt")) // 包含約 150,000 個單詞 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入兩個 wordsToTest [23456] = "wubsie"; // 拼寫錯誤 

咱們能夠使用帶索引的Parallel.ForEach來對wordsToTest數組進行拼寫檢查,以下:

var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 

注意,必須使用線程安全的集合來整理結果:這一點是相對於使用 PLINQ 的劣勢。而優點是咱們能夠避免使用帶索引的Select查詢操做符,它沒有帶索引的ForEach高效。

ParallelLoopState:提早退出循環Permalink

由於對於並行的ForForEach循環,循環體是一個委託,因此就沒法使用break語句來提早退出循環。在這裏,你必須使用ParallelLoopState對象上的BreakStop

public class ParallelLoopState { public void Break(); public void Stop(); public bool IsExceptional { get; } public bool IsStopped { get; } public long? LowestBreakIteration { get; } public bool ShouldExitCurrentIteration { get; } } 

獲取ParallelLoopState很容易:全部版本的ForForEach都有重載能夠接受Action<TSource,ParallelLoopState>類型的循環體。因此,若是要並行化:

foreach (char c in "Hello, world") if (c == ',') break; else Console.Write (c); 

能夠使用:

Parallel.ForEach ("Hello, world", (c, loopState) => { if (c == ',') loopState.Break(); else Console.Write (c); }); 

輸出:

Hlloe

從結果中能夠發現,循環體會以隨機順序完成。除這點不一樣之外,調用Break會給出與順序循環至少相同數量的元素:在上例中老是以必定順序至少輸出 Hello 這幾個字母。而若是改成調用Stop,會強制全部線程在當前迭代完成後當即結束。在上例中,若是有些線程滯後了,調用Stop可能給出 Hello 的子集。當發現已經找到了須要的東西時,或是發現出錯了不想看結果的狀況下,Stop比較適用。

Parallel.ForParallel.ForEach方法都返回一個ParallelLoopResult對象,它暴露了IsCompletedLowestBreakIteration屬性。它們能夠告知循環是否完成,若是沒有完成,是在哪一個迭代中斷的。

若是LowestBreakIteration返回null,意味着在循環中調用了Stop(而不是Break)。

若是你的循環體很長,可能會但願其它線程可以在執行中途中斷循環體,來讓使用BreakStop時更快的退出。實現方法是,在代碼中多個地方查詢ShouldExitCurrentIteration屬性,它會在調用Stop後當即爲true,或者是在Break後很快爲true

ShouldExitCurrentIteration在請求取消或者循環中有異常拋出時也會爲true

IsExceptional屬性能夠告知其它線程上是否有異常產生。任何未處理的異常都會致使循環在全部線程完成當前迭代後結束:若是想要避免,必須在代碼中顯式處理異常。

使用局部值進行優化Permalink

Parallel.ForParallel.ForEach都提供了擁有TLocal泛型變量的重載。這是爲了協助你優化密集迭代的循環中的數據整理工做。最簡單的形式以下:

public static ParallelLoopResult For <TLocal> ( int fromInclusive, int toExclusive, Func <TLocal> localInit, Func <int, ParallelLoopState, TLocal, TLocal> body, Action <TLocal> localFinally); 

這些方法在實際中不多用到,由於它們的目標場景基本都被PLINQ覆蓋了(好開森,由於這些重載真可怕!)。

本質上,問題在於:假設咱們要計算從 1 到 10,000,000 的平方根的和。並行計算一千萬個平方根很容易,可是求和是個問題,由於必須像這樣加才能更新和值:

object locker = new object(); double total = 0; Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); }); 

並行化的收益都被獲取一千萬個鎖的開銷抵消了,還不算致使的阻塞。

然而,實際上並不須要一千萬個鎖。想象一隊志願者撿一大堆垃圾的場景,若是你們都共享單獨一個垃圾桶,那衝突就會使整個過程極端低效。明顯的方案是每一個人都有本身「局部」的垃圾桶,偶爾去一趟主垃圾桶傾倒乾淨。

ForForEachTLocal版本就是這樣工做的。志願者就是內部的工做線程,局部值(local value)就是局部垃圾桶。想要讓Parallel以這種方式工做,那麼必須提供兩個額外的委託:

  1. 如何初始化新的局部值
  2. 如何將局部的聚合值合併到主值

另外,循環體委託如今不能返回void,而是應該返回局部值新的聚合結果。下面是重構後的例子:

object locker = new object(); double grandTotal = 0; Parallel.For (1, 10000000, () => 0.0, // 初始化局部值 (i, state, localTotal) => // 循環體委託。注意如今 localTotal + Math.Sqrt (i), // 返回新的局部值 localTotal => // 把局部值 { lock (locker) grandTotal += localTotal; } // 加入主值 ); 

咱們仍是須要鎖,可是隻須要鎖定將局部和加入總和的過程。這讓處理效率有了極大的提高。

前面說過,PLINQ 通常更適合這些場景。咱們的例子若是使用 PLINQ 來並行會很簡單:

ParallelEnumerable.Range(1, 10000000) .Sum (i => Math.Sqrt (i)) 

(注意咱們使用了ParallelEnumerable來強制範圍分區:在這裏能夠提升性能,由於對全部數字的計算都是相等時間的。)

更復雜的場景中,你可能會用到 LINQ 的Aggregate操做符而不是Sum。若是指定了局部種子工廠,那狀況就和使用局部值的Parallel.For差很少了。

5任務並行Permalink

任務並行(task parallelism)是 PFX 中最底層的並行方式。這一層次的類定義在System.Threading.Tasks命名空間中,以下所示:

做用
Task 管理工做單元
Task<TResult> 管理有返回值的工做單元
TaskFactory 建立任務
TaskFactory<TResult> 建立有相同返回類型的任務和任務延續
TaskScheduler 管理任務調度
TaskCompletionSource 手動控制任務的工做流

本質上,任務是用來管理可並行工做單元的輕量級對象。任務使用 CLR 的線程池來避免啓動獨立線程的開銷:它和ThreadPool.QueueUserWorkItem使用的是同一個線程池,在 CLR 4.0 中這個線程池被調節過,讓Task工做的更有效率(通常來講)。

須要並行執行代碼時均可以使用任務。然而,它們是爲了充分利用多核而調節的:事實上,Parallel類和PLINQ內部就是基於任務並行構建的。

任務並不僅是提供了簡單高效的使用線程池的方式。它們還提供了一些強大的功能來管理工做單元,包括:

任務也實現了局部工做隊列(local work queues),這個優化可以讓你高效的建立不少快速執行的子任務,而不會帶來單一工做隊列會致使的競爭開銷。

TPL 可讓你使用極小的開銷建立幾百個(甚至幾千個)任務,但若是你要建立上百萬個任務,那須要把這些任務分紅大一些的工做單元纔能有效率。Parallel類和 PLINQ 能夠自動實現這種工做分解。

Visual Studio 2010 提供了一個新的窗口來監視任務(調試 | 窗口 | 並行任務)。它和線程窗口相似,只是用於任務。並行棧窗口也有一個專門的模式用於任務。

5.1建立與啓動任務Permalink

如同咱們在第 1 部分線程池的討論中那樣,你能夠調用Task.Factory.StartNew,並給它傳遞一個Action委託來建立並啓動Task

Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!")); 

泛型的版本Task<TResult>Task的子類)可讓你在任務結束時得到返回的數據:

Task<string> task = Task.Factory.StartNew<string> (() => // 開始任務 { using (var wc = new System.Net.WebClient()) return wc.DownloadString ("http://www.linqpad.net"); }); RunSomeOtherMethod(); // 咱們能夠並行的作其它工做... string result = task.Result; // 等待任務結束並獲取結果 

Task.Factory.StartNew是一步建立並啓動任務。你也能夠分解它,先建立Task實例,再調用Start

var task = new Task (() => Console.Write ("Hello")); // ... task.Start(); 

使用這種方式建立的任務也能夠同步運行(在當前線程上):使用RunSynchronously替代Start

能夠使用Status屬性來追蹤任務的執行狀態。

指定狀態對象Permalink

當建立任務實例或調用Task.Factory.StartNew時,能夠指定一個狀態對象(state object),它會被傳遞給目標方法。若是你但願直接調用方法而不是 lambda 表達式,則能夠使用它。

static void Main() { var task = Task.Factory.StartNew (Greet, "Hello"); task.Wait(); // 等待任務結束 } static void Greet (object state) { Console.Write (state); } // 打印 "Hello" 

由於 C# 中有 lambda 表達式,咱們能夠更好的使用狀態對象,用它來給任務賦予一個有意義的名字。而後就能夠使用AsyncState屬性來查詢這個名字:

static void Main() { var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting"); Console.WriteLine (task.AsyncState); // 打印 "Greeting" task.Wait(); } static void Greet (string message) { Console.Write (message); } 

Visual Studio 會在並行任務窗口顯示每一個任務的AsyncState屬性,因此指定有意義的名字能夠很大程度的簡化調試。

TaskCreationOptionsPermalink

在調用StartNew(或實例化Task)時,能夠指定一個TaskCreationOptions枚舉來調節線程的執行。TaskCreationOptions是一個按位組合的枚舉,它有下列(可組合的)值:LongRunningPreferFairnessAttachedToParent

LongRunning向調度器建議爲任務使用一個獨立的線程。這對長時間運行的任務有好處,由於它們可能會「霸佔」隊列,強迫短期任務等待過長的時間後才能被調度。LongRunning對於會阻塞的任務也有好處。

因爲任務調度器通常會試圖保持恰好足夠數量的任務在線程上運行,來保持全部 CPU 核心都工做。因此不要超額分配(oversubscribing) CPU,或者說不要使用過多的活動線程,以免因爲操做系統被迫進行大量耗時的時間切片和上下文切換致使的性能降低。

PreferFairness讓調度器試圖確保任務以它們啓動的順序被調度。默認狀況下是使用另外一種方式,由於內部使用了局部工做竊取隊列來優化任務調度。這個優化對於很是小的(細粒度)任務有實際的好處。

AttachedToParent用來建立子任務。

子任務Permalink

當一個任務啓動另外一個任務時,你能夠經過指定TaskCreationOptions.AttachedToParent選擇性地創建父子關係:

Task parent = Task.Factory.StartNew (() => { Console.WriteLine ("I am a parent"); Task.Factory.StartNew (() => // 分離的任務 { Console.WriteLine ("I am detached"); }); Task.Factory.StartNew (() => // 子任務 { Console.WriteLine ("I am a child"); }, TaskCreationOptions.AttachedToParent); }); 

子任務的特殊之處在於,當你等待父任務結束時,也一樣會等待全部子任務。這對於子任務是一個延續任務時很是有用,稍後咱們會看到。

5.2等待任務Permalink

有兩種方式能夠顯式等待任務完成:

  • 調用Wait方法(可選擇指定超時時間)
  • 訪問Result屬性(當使用Task<TResult>時)

也能夠同時等待多個任務:經過靜態方法Task.WaitAll(等待全部指定任務完成)和Task.WaitAny(等待任意一個任務完成)。

WaitAll和依次等待每一個任務相似,但它更高效,由於它只須要(至多)一次上下文切換。而且,若是有一個或多個任務拋出未處理的異常,WaitAll仍然可以等待全部任務,並在以後從新拋出一個AggregateException異常,它聚合了全部出錯任務的異常,功能至關於下面的代碼:

// 假設 t一、t2 和 t3 是任務: var exceptions = new List<Exception>(); try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } if (exceptions.Count > 0) throw new AggregateException (exceptions); 

調用WaitAny至關於在一個ManualResetEventSlim上等待,每一個任務結束時都對它發信號。

除了使用超時時間,你也能夠傳遞一個取消標記Wait方法:這樣能夠取消等待。注意這不是取消任務。

5.3異常處理Permalink

當你等待一個任務結束時(經過調用Wait方法或訪問其Result屬性),全部未處理的異常都會用一個AggregateException對象封裝,方便從新拋給調用方。通常就無需在任務代碼中處理異常,而是這麼作:

int x = 0; Task<int> calc = Task.Factory.StartNew (() => 7 / x); try { Console.WriteLine (calc.Result); } catch (AggregateException aex) { Console.Write (aex.InnerException.Message); // 試圖以 0 爲除數 } 

你仍然須要對獨立的任務(無父任務而且沒有在等待它)進行異常處理,以避免當任務失去做用域被垃圾回收時(見如下注釋)有未處理的異常,那會致使程序結束。若是對任務的等待指定了超時時間,那也是如此,由於全部超時時間事後拋出的異常都是未處理的。

TaskScheduler.UnobservedTaskException靜態事件提供了應對未處理的任務異常的最後手段。經過掛接這個事件,你就能夠攔截這些本來會致使程序結束的異常,而且使用本身的邏輯對它們進行處理。

對於有父子關係的任務,在父任務上等待也會隱式的等待子任務,全部子任務的異常也會傳遞出來。

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; var parent = Task.Factory.StartNew (() => { Task.Factory.StartNew (() => // 子 { Task.Factory.StartNew (() => { throw null; }, atp); // 孫 }, atp); }); // 下面的調用會拋出 NullReferenceException 異常 (封裝在 // 嵌套的 AggregateExceptions 中): parent.Wait(); 

有趣的是,若是你在任務拋出異常後檢查它的Exception屬性,這個讀取屬性的動做會防止由於該異常致使程序結束。基本原則是:PFX 的設計者不但願你忽略異常,只要採起某種方式接收異常,就不會受到結束程序的懲罰。

任務中的未處理異常不會致使程序當即結束:它會延遲直到垃圾回收器處理到這個任務,並調用它的析構方法時。這個延遲是由於在進行垃圾回收前,還沒法判斷是否會調用Wait,或檢查ResultException屬性。它有時也會誤導你對錯誤源頭的判斷(Visual Studio 的調試器若是開啓了在首個異常處中斷,能夠幫助進行判斷)。

立刻咱們會看處處理異常的另外一種策略,就是使用任務延續

5.4取消任務Permalink

啓動任務時能夠可選的傳遞一個取消標記(cancellation token)。它可讓你經過協做取消模式取消任務,像以前描述的那樣:

var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 作些事情... token.ThrowIfCancellationRequested(); // 檢查取消請求 // 作些事情... }, token); // ... cancelSource.Cancel(); 

若是要檢測任務取消,能夠用以下方式捕捉AggregateException,並檢查它的內部異常:

try { task.Wait(); } catch (AggregateException ex) { if (ex.InnerException is OperationCanceledException) Console.Write ("Task canceled!"); } 

若是但願顯式的拋出OperationCanceledException異常(而不是經過調用ThrowIfCancellationRequested),那麼必須把取消標記傳遞給OperationCanceledException的構造方法。若是不這麼作,這個任務就不會以TaskStatus.Canceled狀態結束,而且也不會觸發使用OnlyOnCanceled條件的任務延續。

若是任務在啓動前被取消,它就不會被調度,而是直接在任務中拋出OperationCanceledException

由於取消標記也能夠被其它 API 識別,因此能夠在其它構造中無縫使用:

var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 傳遞取消標記給 PLINQ 查詢: var query = someSequence.AsParallel().WithCancellation (token)... // ... enumerate query ... }); 

調用cancelSource上的Cancel方法就能夠取消該 PLINQ 查詢,它會在任務中拋出OperationCanceledException異常,從而取消該任務。

也能夠給WaitCancelAndWait這類方法傳遞取消標記,它可讓你取消等待操做,而不是任務自己。

5.5任務延續Permalink

有時,在一個任務完成(或失敗)後立刻啓動另外一個任務會頗有用。Task類上的ContinueWith方法正是實現了這種功能:

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant..")); Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation")); 

一旦task1(前項,antecedent)完成、失敗或取消,task2(延續,continuation)會自動啓動。(若是task1在運行第二行代碼前已經結束,那麼task2會被當即調度執行。)傳遞給延續的 lambda 表達式的ant參數是對前項任務的引用。

咱們的例子演示了最簡單的延續,它和如下代碼功能相似:

Task task = Task.Factory.StartNew (() => { Console.Write ("antecedent.."); Console.Write ("..continuation"); }); 

可是經過延續的方式能夠更加靈活,好比先等待task1完成,以後再等待task2。若是task1返回數據,這樣就很是有用。

另外一個(不明顯的)差別是:默認狀況下,前項和延續任務多是在不一樣的線程上執行。你能夠在調用ContinueWith時指定TaskContinuationOptions.ExecuteSynchronously來強制它們在同一個線程執行:若是延續是很是細粒度的,這樣作能夠經過減小開銷來提高性能。

延續和 Task<TResult>Permalink

像普通任務同樣,延續也能夠使用Task<TResult>類型並返回數據。下面的例子中,咱們使用鏈狀任務來計算Math.Sqrt(8*2)並打印結果:

Task.Factory.StartNew<int> (() => 8) .ContinueWith (ant => ant.Result * 2) .ContinueWith (ant => Math.Sqrt (ant.Result)) .ContinueWith (ant => Console.WriteLine (ant.Result)); // 4 

咱們的例子比較簡單,實際應用中,這些 lambda 表達式可能會調用計算密集型的方法。

延續與異常Permalink

延續能夠經過前項的Exception屬性來獲取前項拋出的異常。下面的代碼會輸出NullReferenceException信息:

Task task1 = Task.Factory.StartNew (() => { throw null; }); Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception)); 

若是前項拋出了異常但延續沒有檢查前項的Exception屬性(而且也沒有在等待前項),那麼異常會被認爲是未處理的,就會致使程序結束(除非使用TaskScheduler.UnobservedTaskException進行了處理)。

安全的模式是從新拋出前項的異常。只要延續被Wait等待,異常就可以傳播並從新拋出給等待方。

Task continuation = Task.Factory.StartNew (() => { throw null; }) .ContinueWith (ant => { if (ant.Exception != null) throw ant.Exception; // 繼續處理... }); continuation.Wait(); // 異常被拋回調用方 

另外一種處理異常的方法是爲異常和正常狀況指定不一樣的延續。須要用到TaskContinuationOptions

Task task1 = Task.Factory.StartNew (() => { throw null; }); Task error = task1.ContinueWith (ant => Console.Write (ant.Exception), TaskContinuationOptions.OnlyOnFaulted); Task ok = task1.ContinueWith (ant => Console.Write ("Success!"), TaskContinuationOptions.NotOnFaulted); 

這種模式在結合子任務使用時很是有用,咱們立刻會看到

下面的擴展方法會「吞掉」任務的未處理異常:

public static void IgnoreExceptions (this Task task) { task.ContinueWith (t => { var ignore = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted); } 

(能夠添加對異常的日誌記錄來進一步改進它。)如下是用法:

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions(); 

延續與子任務Permalink

延續的一個強大功能是它僅在全部子任務都完成時纔會啓動。這時,全部子任務拋出的異常都會被封送給延續。

接下來的例子中,咱們啓動三個子任務,每一個都拋出NullReferenceException。而後使用父任務的延續來一次性捕捉這些異常:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; Task.Factory.StartNew (() => { Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); }) .ContinueWith (p => Console.WriteLine (p.Exception), TaskContinuationOptions.OnlyOnFaulted); 

延續

條件延續Permalink

默認狀況下,延續是被無條件調度的,也就是說不管前項是完成、拋出異常仍是取消,延續都會執行。你能夠經過設置TaskContinuationOptions枚舉中的標識(可組合)來改變這種行爲。三種控制條件延續的核心標識是:

NotOnRanToCompletion = 0x10000, NotOnFaulted = 0x20000, NotOnCanceled = 0x40000, 

這些標識是作減法的,也就是組合的越多,延續越不可能被執行。爲了方便使用,也提供瞭如下預先組合好的值:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled, OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled, OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted 

(組合全部Not*標識[NotOnRanToCompletion, NotOnFaulted, NotOnCanceled]沒有意義,這會致使延續始終被取消。)

RanToCompletion表明前項成功完成,沒有被取消,也沒有未處理的異常。

Faulted表明前項中有未處理的異常拋出。

Canceled表明如下兩種狀況之一:

  • 前項經過其取消標記被取消。換句話說,OperationCanceledException在前項中拋出,它的CancellationToken屬性與啓動時傳遞給前項的標記取消匹配。

  • 前項被隱式的取消,由於沒法知足指定的延續條件。

特別須要注意的是,若是這些標識致使延續沒法執行,延續並非被忘記或拋棄,而是被取消。這意味着全部延續任務上的延續就會開始運行,除非你指定了NotOnCanceled。例如:

Task t1 = Task.Factory.StartNew (...); Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"), TaskContinuationOptions.OnlyOnFaulted); Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3")); 

像以前說的同樣,t3始終會被調度,即便是t1沒有拋出異常也是如此。由於t1成功完成,fault任務會被取消,而t3上並無定義任何限制延續的條件,因此t3就會被無條件執行。

條件延續

若是但願僅在fault真正運行的狀況下執行t3,須要把代碼改爲:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"), TaskContinuationOptions.NotOnCanceled); 

(此外,也能夠指定OnlyOnRanToCompletion,不一樣之處就是t3fault拋出異常的狀況下不會執行。)

多前項的延續Permalink

延續的另外一個有用的功能是它能夠在多個前項完成後調度執行。ContinueWhenAll是在多個前項都完成後調度,而ContinueWhenAny是在任意一個前項完成後調度。這兩個方法都定義在TaskFactory類上:

var task1 = Task.Factory.StartNew (() => Console.Write ("X")); var task2 = Task.Factory.StartNew (() => Console.Write ("Y")); var continuation = Task.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => Console.WriteLine ("Done")); 

上面的例子會在打印 「 XY 「 或 「 YX 「 以後打印 「 Done 「。Lambda 表達式中的tasks參數能夠用來訪問完成的任務數組,當前項返回數據時能夠用到。下面的例子對兩個前項返回的數字求和:

// 真實場景中 task1 和 task2 可能調用複雜的功能: Task<int> task1 = Task.Factory.StartNew (() => 123); Task<int> task2 = Task.Factory.StartNew (() => 456); Task<int> task3 = Task<int>.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result)); Console.WriteLine (task3.Result); // 579 

在這個例子中,咱們使用了<int>類型參數來調用Task.Factory是爲了演示得到了一個泛型的任務工廠。這個類型參數不是必須的,它能夠被編譯器推斷。

單前項的多個延續Permalink

對一個任務調用一次以上的ContinueWith會建立單前項的多個延續。當該前項完成時,全部延續會一塊兒啓動(除非指定了TaskContinuationOptions.ExecuteSynchronously,這會致使延續順序執行)。

下面的代碼會等待一秒,而後打印 「 XY 「 或者 「 YX 「:

var t = Task.Factory.StartNew (() => Thread.Sleep (1000)); t.ContinueWith (ant => Console.Write ("X")); t.ContinueWith (ant => Console.Write ("Y")); 

5.6任務調度器與 UIPermalink

任務調度器(task scheduler)爲任務分配線程,其由抽象類TaskScheduler類表明,全部任務都會和一個任務調度器關聯。Framework 提供了兩種具體實現:默認調度器(default scheduler)是使用 CLR 線程池工做,還有同步上下文調度器(synchronization context scheduler),它(主要)是爲了對於使用 WPF 和 Windows Forms 的場景提供幫助,這裏的線程模型須要 UI 控件只能在建立它們的線程上訪問。例如,假設咱們須要在後臺從一個 web 服務獲取數據,而後使用它更新一個叫作lblResult的 WPF 標籤。這能夠分解爲兩個任務:

  1. 調用方法從 web 服務獲取數據(前項任務)。
  2. 使用結果更新lblResult延續任務)。

若是對延續任務指定了窗口建立時獲取的同步上下文調度器,那麼就能夠安全的更新lblResult

public partial class MyWindow : Window { TaskScheduler _uiScheduler; // 定義一個字段以便於 // 在類中使用 public MyWindow() { InitializeComponent(); // 從建立窗口的線程獲取 UI 調度器: _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNew<string> (SomeComplexWebService) .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler); } string SomeComplexWebService() { ... } } 

也能夠實現本身的任務調度器(經過繼承TaskScheduler),可是通常只會在很是特殊的場景下才會這麼作。對於自定義調度,須要常用TaskCompletionSource,咱們立刻會講到。

5.7TaskFactoryPermalink

當調用Task.Factory時,就是經過Task上的靜態屬性獲取了默認的TaskFactory對象。這個任務工廠的做用就是建立任務,具體的說,有三種任務:

  • 普通任務(經過StartNew
  • 多前項的延續(經過ContinueWhenAllContinueWhenAny
  • 封裝了異步編程模型(APM)的任務(經過FromAsync

有趣的是,TaskFactory是建立後兩種任務的惟一方法。而對於StartNewTaskFactory純粹是爲了方便,技術上說是多餘的,這徹底等同於建立Task對象而後調用其Start方法。

建立本身的任務工廠Permalink

TaskFactory不是抽象工廠:你能夠實例化這個類,在但願重複使用一樣的(非默認的)TaskCreationOptions值、TaskContinuationOptions值或者TaskScheduler時有用。例如,若是但願重複建立長時間運行的子任務,咱們能夠這樣建立一個自定義工廠:

var factory = new TaskFactory ( TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); 

而後建立任務就能夠僅調用這個工廠上的StartNew

Task task1 = factory.StartNew (Method1); Task task2 = factory.StartNew (Method2); // ... 

在調用ContinueWhenAllContinueWhenAny時,自定義的延續選項會被應用。

5.8TaskCompletionSourcePermalink

Task類作了兩件事情:

  • 它能夠調度一個委託到線程池線程上運行。
  • 它提供了管理工做項的豐富功能(延續、子任務、異常封送等等)。

有趣的是,這兩件事能夠是分離的:能夠只利用任務的管理工做項的功能而不讓它調度到線程池上運行。TaskCompletionSource類開啓了這個模式。

使用TaskCompletionSource時,就建立它的實例。它暴露一個Task屬性來返回一個任務,你能夠對其等待或附加延續,就和對通常的任務同樣。然而這個任務能夠經過TaskCompletionSource對象的下列方法進行徹底控制:

public class TaskCompletionSource<TResult> { public void SetResult (TResult result); public void SetException (Exception exception); public void SetCanceled(); public bool TrySetResult (TResult result); public bool TrySetException (Exception exception); public bool TrySetCanceled(); // ... } 

若是調用屢次,SetResultSetExceptionSetCanceled會拋出異常,而Try*方法會返回false

TResult對應任務的返回類型,因此TaskCompletionSource<int>會給你一個Task<int>。若是須要不返回結果的任務,能夠使用object類型來建立TaskCompletionSource,並在調用SetResult時傳遞null。能夠把Task<object>轉換爲Task類型來使用。

下面的代碼在等待五秒以後打印 「 123 「:

var source = new TaskCompletionSource<int>(); new Thread (() => { Thread.Sleep (5000); source.SetResult (123); }) .Start(); Task<int> task = source.Task; // 咱們的「奴隸」任務 Console.WriteLine (task.Result); // 123 

稍後,咱們會展現使用如何BlockingCollection寫一個生產者 / 消費者隊列。而後會演示使用TaskCompletionSource來改進這個方案,它能夠使隊列中的工做項能夠被等待和取消。

6使用 AggregateExceptionPermalink

如前所屬,PLINQParallel類和Task都會自動封送異常給使用者。爲了明白這麼作的重要性,考慮如下 LINQ 查詢,它在第一次迭代時會拋出DivideByZeroException

try { var query = from i in Enumerable.Range (0, 1000000) select 100 / i; // ... } catch (DivideByZeroException) { // ... } 

若是咱們使用 PLINQ 來並行化查詢而假設它並無進行異常處理,那麼DivideByZeroException可能會在一個線程中被拋出,就會無視catch塊從而致使程序結束。

所以,異常會被自動捕捉並從新拋給調用方。然而不幸的是,狀況並非就像捕捉一個DivideByZeroException那般簡單。由於這些類庫會利用不少線程,極可能有兩個或更多的異常被同時拋出。爲了確保可以報告全部異常,就使用了AggregateException做爲容器來封裝它們,並經過InnerExceptions屬性來暴露:

try { var query = from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // 對查詢進行枚舉 // ... } catch (AggregateException aex) { foreach (Exception ex in aex.InnerExceptions) Console.WriteLine (ex.Message); } 

PLINQ 和Parallel類都會在遇到第一個異常時中止查詢或循環執行,它使用的方式是不處理以後的元素或循環體。而在本輪循環結束前,還有可能拋出更多的異常。第一個異常能夠經過AggregateException上的InnerException屬性獲取。

6.1Flatten 和 HandlePermalink

AggregateException類提供了一對方法來簡化異常處理:FlattenHandle

FlattenPermalink

AggregateException常常會包含其它的AggregateException。好比在子任務拋出異常時就可能如此。你能夠經過調用Flatten來消除任意層級的嵌套以簡化處理。這個方法會返回一個新的AggregateException,它的InnerExceptions就是展平以後的結果:

catch (AggregateException aex) { foreach (Exception ex in aex.Flatten().InnerExceptions) myLogWriter.LogException (ex); } 

HandlePermalink

有時只須要捕捉特定類型的異常,並從新拋出其它類型的異常。AggregateException上的Handle方法提供了一個快捷方案。它接受一個異常斷定器,來對全部封裝的異常進行斷定:

public void Handle (Func<Exception, bool> predicate) 

若是斷定器返回true,則該異常被認爲是「已處理」。對於全部異常都運行斷定以後,接下來會發生:

  • 若是全部異常都「已處理」(斷定器返回true),則不會從新拋出異常。
  • 若是有異常被斷定爲false(「未處理」),則會生成一個新的AggregateException來封裝這些異常,並從新拋出。

例如,下面的代碼最後會從新拋出一個AggregateException,而且其中僅包含一個NullReferenceException

var parent = Task.Factory.StartNew (() => { // 咱們使用 3 個子任務同時拋出 3 個異常: int[] numbers = { 0 }; var childFactory = new TaskFactory (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); childFactory.StartNew (() => 5 / numbers[0]); // 除數爲零 childFactory.StartNew (() => numbers [1]); // 索引越界 childFactory.StartNew (() => { throw null; }); // 空引用 }); try { parent.Wait(); } catch (AggregateException aex) { aex.Flatten().Handle (ex => // 注意這裏仍是須要調用 Flatten { if (ex is DivideByZeroException) { Console.WriteLine ("Divide by zero"); return true; // 該異常「已處理」 } if (ex is IndexOutOfRangeException) { Console.WriteLine ("Index out of range"); return true; // 該異常「已處理」 } return false; // 其它全部異常會被從新拋出 }); } 

7併發集合Permalink

Framework 4.0 在System.Collections.Concurrent命名空間中提供了一組新的集合。它們都是徹底線程安全的:

併發集合 對應的非併發集合
ConcurrentStack<T> Stack<T>
ConcurrentQueue<T> Queue<T>
ConcurrentBag<T> ( none )
BlockingCollection<T> ( none )
ConcurrentDictionary<TKey,TValue> Dictionary<TKey,TValue>

在通常的多線程場景中,須要線程安全的集合時可能會用到這些併發集合。可是,有些注意事項:

  • 併發集合是爲了並行編程而調整的。除了高併發場景,傳統的集合都比它們更高效。
  • 線程安全的集合並不能確保使用它的代碼也是線程安全的
  • 若是在對併發集合進行枚舉的同時有其它線程修改了集合,並不會產生異常,而是會獲得一個新舊內容的混合結果。
  • 沒有List<T>的併發版本。
  • 併發的棧、隊列和包(bag)類內部都是使用鏈表實現的。這使得它們的空間效率不如非併發的StackQueue類,可是這對於併發訪問更好,由於鏈表有助於實現無鎖或更少的鎖。(這是由於向鏈表中插入一個節點只須要更新兩個引用,而對於List<T>這種結構插入一個元素可能須要移動幾千個已存在的元素。)

換句話說,這些集合並非提供了加鎖使用普通集合的快捷辦法。爲了演示這一點,若是咱們在單一線程上執行如下代碼:

var d = new ConcurrentDictionary<int,int>(); for (int i = 0; i < 1000000; i++) d[i] = 123; 

它會比下面的代碼慢三倍:

var d = new Dictionary<int,int>(); for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123; 

(可是對ConcurrentDictionary讀取會更快,由於讀是無鎖的。)

併發集合與普通集合的另外一個不一樣之處是它們暴露了一些特殊的方法,來進行原子的檢查並行動(test-and-act)的操做,例如TryPop。這些方法中的大部分都是由IProducerConsumerCollection<T>接口統一的。

7.1IProducerConsumerCollection<T>Permalink

生產者 / 消費者集合有兩個主要用例:

  • 添加一個元素(「生產」)
  • 獲取一個元素並移除它(「消費」)

典型的例子是棧和隊列。生產者 / 消費者集合在並行編程中很是重要,由於它有助於高效的無鎖實現。

IProducerConsumerCollection<T>接口表明了線程安全的生產者 / 消費者集合。如下類實現了該接口:ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>

IProducerConsumerCollection<T>擴展自ICollection,並加入瞭如下方法:

void CopyTo (T[] array, int index); T[] ToArray(); bool TryAdd (T item); bool TryTake (out T item); 

TryAddTryTake方法檢查是否能進行添加 / 移除操做,若是能夠,就進行添加 / 移除。檢查和操做是原子的,因此無需像普通集合那樣使用鎖:

int result; lock (myStack) if (myStack.Count > 0) result = myStack.Pop(); 

TryTake在集合爲空時返回falseTryAdd在三種實現中都總會成功並返回true。而若是你要寫本身的不容許重複元素的併發集合,就能夠在元素已存在時讓TryAdd返回false(好比本身寫併發集(set))。

TryTake移除的具體元素是在子類中定義的:

  • 對於棧,TryTake移除最新添加的元素。
  • 對於隊列,TryTake移除最先添加的元素。
  • 對於包,TryTake移除能夠最快移除的元素。

這三個具體類基本都是顯式實現了TryTakeTryAdd方法,也經過更具體的的名字暴露了一樣的功能,好比TryDequeueTryPop

7.2ConcurrentBag<T>Permalink

ConcurrentBag<T>用來存儲一組無序的對象(容許重複)。它適用於你不關心調用TakeTryTake會返回哪一個元素的場景。

ConcurrentBag<T>相比並發隊列和棧的好處是它的Add方法被不少線程同時調用時幾乎沒有競爭衝突。而對於併發隊列和棧,並行調用Add會有一些競爭衝突(可是比對非併發集合加鎖的方式要小得多)。併發包的Take方法也很是高效,只要每一個線程不要拿出比它添加的數量更多的元素。

在併發包的內部,每個線程都有其私有的鏈表。元素會加入到調用Add的線程對應的私有鏈表中,就消除了競爭衝突。在對包進行枚舉時,枚舉器會遍歷全部線程的私有鏈表,返回其中的每個元素。

調用Take時,包會首先檢查當前線程的私有鏈表。若是其中有至少一個元素,就能夠沒有衝突的輕鬆完成任務(大多數狀況都是如此)。可是若是鏈表沒有元素,它就必須從其它線程的私有鏈表中「偷」一個元素,就可能致使競爭衝突。

因此,準確的說,調用Take會返回當前線程最新添加的元素,若是當前線程沒有對應的元素,就會隨機取一個其它線程,返回它最新添加的元素。

若是你的並行操做基本都是在添加元素,或者每一個線程的AddTake是平衡的,那麼使用併發包就很理想。咱們來看前面的一個例子,是使用Parallel.ForEach來實現並行拼寫檢查:

var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); }); 

對於實現生產者 / 消費者隊列,併發包就不是一個好的選擇,由於元素是在不一樣的線程進行添加和移除的。

7.3BlockingCollection<T>Permalink

若是在ConcurrentStack<T>ConcurrentQueue<T>ConcurrentBag<T>這些生產者 / 消費者集合上調用TryTake時,集合爲空,該方法會返回false。這種場景下,有時可能等待一個元素被添加會更有用。

與其重載TryTake方法來實現這個功能(若是還要容許取消和超時就可能須要大量成員),不如使用 PFX 的設計者已經實現好的BlockingCollection<T>類。阻塞集合能夠封裝任意實現了IProducerConsumerCollection<T>接口的對象,就能夠調用這個封裝上面的Take方法,它在沒有元素時會阻塞。

阻塞集合也可讓你限制集合的大小,若是超過限制就阻塞生產者。這樣限制了大小的集合被稱爲有界阻塞集合(bounded blocking collection)。

使用BlockingCollection<T>時:

  1. 建立其實例,可選的指定一個IProducerConsumerCollection<T>來封裝,還有集合的最大大小(上界)。
  2. 調用AddTryAdd來對底層集合添加元素。
  3. 調用TakeTryTake來移除(消費)底層集合中的元素。

若是調用構造方法的時候沒有指定目標集合,就會自動使用一個ConcurrentQueue<T>的實例。進行生成和消費的方法均可以指定取消標記和超時時間。AddTryAdd在集合有界時可能會阻塞,TakeTryTake在集合爲空時會阻塞。

另外一種消費元素的方式是調用GetConsumingEnumerable。它會返回一個(可能)無限的序列,當有元素時就能夠返回它。你能夠調用CompleteAdding來強行結束這個序列,它也會阻止以後再添加元素。

前面咱們寫過一個使用 WaitPulse的生產者 / 消費者隊列。這裏使用BlockingCollection<T>來重構同一個類(不考慮異常處理):

public class PCQueue : IDisposable { BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); public PCQueue (int workerCount) { // 爲每一個消費者建立並啓動單獨的任務: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask (Action action) { _taskQ.Add (action); } void Consume() { // 沒有元素時,對序列的枚舉就會被阻塞, // 而調用 CompleteAdding 能夠結束枚舉。 foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // 進行任務 } } 

由於沒有給BlockingCollection的構造方法傳遞任何參數,因此會自動建立一個併發隊列。而若是傳遞一個ConcurrentStack,咱們就會獲得生產者 / 消費者棧。

BlockingCollection還提供了AddToAnyTakeFromAny這些靜態方法,它們可讓你對指定的多個阻塞集合進行添加或移除元素。操做會對第一個可以進行操做的集合進行。

利用 TaskCompletionSourcePermalink

咱們以前實現的生產者 / 消費者模式還不夠靈活,由於工做項添加後沒法追蹤它們。若是可以實現如下功能會更好:

  • 可以獲知工做項的完成。
  • 取消未啓動的工做項。
  • 優雅的處理工做項拋出的異常。

理想的解決方案是讓EnqueueTask方法返回一個對象,來提供咱們上面描述的功能。好消息是這個類已經存在,正是Task類。咱們須要作的只是經過TaskCompletionSource來操控它:

public class PCQueue : IDisposable { class WorkItem { public readonly TaskCompletionSource<object> TaskSource; public readonly Action Action; public readonly CancellationToken? CancelToken; public WorkItem ( TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken) { TaskSource = taskSource; Action = action; CancelToken = cancelToken; } } BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>(); public PCQueue (int workerCount) { // 爲每一個消費者建立並啓動單獨的任務: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public Task EnqueueTask (Action action) { return EnqueueTask (action, null); } public Task EnqueueTask (Action action, CancellationToken? cancelToken) { var tcs = new TaskCompletionSource<object>(); _taskQ.Add (new WorkItem (tcs, action, cancelToken)); return tcs.Task; } void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else try { workItem.Action(); workItem.TaskSource.SetResult (null); // 表示完成 } catch (OperationCanceledException ex) { if (ex.CancellationToken == workItem.CancelToken) workItem.TaskSource.SetCanceled(); else workItem.TaskSource.SetException (ex); } catch (Exception ex) { workItem.TaskSource.SetException (ex); } } } 

EnqueueTask中,咱們入隊一個工做項,它封裝了目標委託和任務完成源,從而讓咱們以後能夠控制返回給消費者的任務。

Consume中,咱們在出隊一個工做項後先檢查任務是否被取消。若是沒有,就運行委託而後調用任務完成源上的SetResult來表示任務完成。

下面是如何使用這個類:

var pcQ = new PCQueue (1); Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!")); // ... 

咱們如今能夠對task等待、附加延續、讓延續中的異常傳播給父任務等等。換句話說,咱們得到了任務模型的豐富功能,同時也至關於自行實現了一個調度器。

8SpinLock 和 SpinWaitPermalink

在並行編程中,短暫的自旋常常比阻塞更好,由於它避免了上下文切換和內核模式轉換的開銷。SpinLockSpinWait被設計用來在這種場景下提供幫助。它們的主要用途是實現自定義的同步構造。

SpinLockSpinWait是結構體而不是類!這個設計是一種避免間址和垃圾回收的極限優化技術。它意味着你必須當心,不能不經意地複製了實例,好比不使用ref修飾符把它們傳遞給另外一個方法,或者把它們定義成了readonly的字段。這在使用SpinLock時十分重要。

8.1SpinLockPermalink

SpinLock結構體可讓你進行鎖定,而無需上下文切換的開銷,它的代價是保持一個線程自旋(空忙)。這種方式適用於高競爭的場景下鎖定很是短暫的狀況(好比,從頭寫一個線程安全的鏈表)。

若是讓自旋鎖等待的過久(最可能是幾毫秒),它會和普通的鎖同樣出讓其時間片,致使上下文切換。再被從新調度後,它會繼續出讓,就這樣不斷的「自旋出讓」。這比徹底使用自旋消耗的 CPU 資源要少得多,可是比阻塞要高。

在單核的機器上,自旋鎖在遇到競爭時會當即開始「自旋出讓」。

使用SpinLock和普通的鎖差很少,除了如下幾點:

  • 自旋鎖是結構體(前面有提到)。
  • 自旋鎖不可重入,意味着不能在一個線程上連續兩次調用同一個SpinLock上的Enter方法。若是違反了這條規則,要否則會拋出異常(啓用全部者追蹤(owner tracking)時),要否則會死鎖(禁用全部者追蹤時)。在構造自旋鎖時,能夠指定是否啓用全部者追蹤,啓用會影響性能。
  • SpinLock可讓你經過IsHeld屬性查詢鎖是否已被獲取,若是啓用了全部者追蹤,那麼使用IsHeldByCurrentThread屬性。
  • 沒有像 C# 的lock語句那樣的語法糖來簡化SpinLock的使用。

另外一個不一樣之處是當調用Enter時,你必須遵循提供 lockTaken 參數的健壯模式(幾乎老是使用try / finally一塊兒實現)。

下面是個例子:

var spinLock = new SpinLock (true); // 啓用全部者追蹤 bool lockTaken = false; try { spinLock.Enter (ref lockTaken); // 作些事情... } finally { if (lockTaken) spinLock.Exit(); } 

和普通的鎖同樣,當(且僅當)Enter方法拋出異常而且鎖沒有被獲取時,lockTaken會爲false。這種場景很是罕見(當調用該線程的Abort,或者OutOfMemoryException異常被拋出時),但可讓你肯定以後是否須要調用Exit

SpinLock也提供了接受超時時間的TryEnter方法。

因爲SpinLock笨拙的值類型語義和缺少語法支持,幾乎每次想用它都是受罪!在替換掉普通的鎖前請三思。

SpinLock在須要寫本身的可重用同步構造時最有意義。即使如此,自旋鎖也不像看上去那麼有用。它仍然限制了併發。而且會什麼都不作的浪費 CPU 時間。常常更好的選擇都是把時間花在一些「投機」的事情上,並使用SpinWait來輔助。(譯者注:這裏「投機」是指先進行操做並檢測搶佔,若是發現被搶佔就重試,詳見SpinWait

8.2SpinWaitPermalink

SpinWait能夠幫助實現無鎖的代碼,使用自旋而非阻塞。它實現了安全措施來避免普通自旋可能會形成的資源飢餓和優先級倒置。

使用SpinWait的無鎖編程是多線程中最難的,它是爲了應對沒有其它高層構造能夠使用的場景。先決條件是理解非阻塞同步

爲何須要 SpinWaitPermalink

假設咱們寫了一個純粹基於一個簡單標識的自旋信號系統:

bool _proceed; void Test() { // 自旋直到其它線程把 _proceed 設置爲 true: while (!_proceed) Thread.MemoryBarrier(); // ... } 

若是Test運行時_proceed已經爲true,或者幾回循環內就能變爲true,那麼這個實現就會很高效。可是如今假設_proceed在幾秒內保持false,而且有四個線程同時調用Test。這個自旋就會徹底佔用一個四核的 CPU!這會致使其它線程運行緩慢(資源飢餓),包括那個會把_proceed設置爲true的線程(優先級倒置)。在單核機器上,情況會進一步惡化,由於自旋幾乎老是致使優先級倒置。(雖然如今單核機器已經不多見了,但是單核的虛擬機並很多。)

SpinWait使用兩種方法解決這個問題。首先,它會限制消耗 CPU 的自旋,在通過必定次數的自旋後,就會每次循環都出讓其時間片(經過調用Thread.YieldThread.Sleep),從而減小資源消耗。其次,它會檢測是不是在單核機器上運行,若是是,就會每次循環都出讓其時間片。

如何使用 SpinWaitPermalink

有兩種方式使用SpinWait。第一種是調用靜態方法SpinUntil。這個方法接受一個斷定器(和一個可選的超時時間):

bool _proceed; void Test() { SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; }); // ... } 

另外一種(更靈活)的方式是建立SpinWait結構體的實例,並在循環中調用SpinOnce

bool _proceed; void Test() { var spinWait = new SpinWait(); while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); } // ... } 

前者就是使用後者提供的快捷方式。

SpinWait 如何工做Permalink

在當前的實現中,SpinWait會在出讓以前進行 10 次消耗 CPU 的迭代。但它並不會在每次迭代後當即返回調用方,而是調用Thread.SpinWait來 經過 CLR(最終是經過操做系統)再自旋必定時間。這個時間最初是幾十納秒,每次迭代都會加倍,直到 10 次迭代結束。這在必定程度上保證了消耗 CPU 的自旋階段的總時間的可預測性,CLR 和操做系統能夠根據狀況來調節。通常來講,這會在幾十微秒的區間,很小,可是要大於上下文切換的開銷。

在單核機器上,SpinWait每次迭代都會出讓。能夠經過NextSpinWillYield屬性來檢查SpinWait在下一次自旋時會不會出讓。

若是SpinWait在自旋出讓模式保持了好久(大概 20 次),就會按期Sleep幾微秒來進一步節約資源給其它線程使用。

使用 SpinWait 和 Interlocked.CompareExchange 進行無鎖更新Permalink

結合SpinWaitInterlocked.CompareExchange能夠原子的更新一個經過本身的值進行計算的字段(讀 - 改 - 寫)。例如,假設咱們要把字段 x 乘 10。非線程安全的簡單代碼就是:

x = x * 10; 

它不是線程同步的緣由就和咱們在非阻塞同步中看到的對字段自增不是線程同步的緣由同樣。

正確的無鎖方式以下:

  1. 使用局部變量獲取 x 的一個「快照」。
  2. 計算新值(這裏就是將快照乘 10)。
  3. 若是快照仍是最新的,就將計算後的值寫回(這一步必須是原子的,經過調用Interlocked.CompareExchange實現)。
  4. 若是快照過時了,自旋並返回第 1 步。

例如:

int x; void MultiplyXBy (int factor) { var spinWait = new SpinWait(); while (true) { int snapshot1 = x; Thread.MemoryBarrier(); int calc = snapshot1 * factor; int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1); if (snapshot1 == snapshot2) return; // 沒有被搶佔 spinWait.SpinOnce(); } } 

咱們能夠去掉對Thread.MemoryBarrier的調用來略微提升性能。這是由於CompareExchange也會生成內存屏障。最壞的狀況就是若是snapshot1在第一次迭代時就讀取了一個過時的值,那麼會多進行一次自旋。

Interlocked.CompareExchange是在字段的當前值與第三個參數相等時使用指定的值來更新字段。它會返回字段的舊值,就能夠用來與原快照比較,檢查是否過時。若是值不相等,意味着被另外一個線程搶佔,就須要自旋重試。

CompareExchange也有重載能夠對於object類型使用。咱們能夠利用這個重載來實現對全部引用類型的無鎖更新方法:

static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction) where T : class { var spinWait = new SpinWait(); while (true) { T snapshot1 = field; T calc = updateFunction (snapshot1); T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1); if (snapshot1 == snapshot2) return; spinWait.SpinOnce(); } } 

下面是如何使用這個方法來寫一個無鎖的線程安全的事件(實際上,這是 C# 4.0 的編譯器對於事件默認的處理):

EventHandler _someDelegate; public event EventHandler SomeEvent { add { LockFreeUpdate (ref _someDelegate, d => d + value); } remove { LockFreeUpdate (ref _someDelegate, d => d - value); } } 

SpinWait vs SpinLockPermalink

咱們也能夠經過把對共享的字段的訪問放進SpinLock裏來解決上面的問題。問題是自旋鎖同一時間只容許一個線程進入,儘管它(一般)可以消除上下文切換的開銷。而使用SpinWait時,咱們能夠假設沒有競爭,投機的運行。若是被搶佔就重試。花費 CPU 時間作事情也許比在自旋鎖中浪費 CPU 時間好!

最後,考慮下面的類:

class Test { ProgressStatus _status = new ProgressStatus (0, "Starting"); class ProgressStatus // 不可變類 { public readonly int PercentComplete; public readonly string StatusMessage; public ProgressStatus (int percentComplete, string statusMessage) { PercentComplete = percentComplete; StatusMessage = statusMessage; } } } 

咱們能夠使用LockFreeUpdate方法來增長_statusPercentComplete字段的值:

LockFreeUpdate (ref _status, s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage)); 

注意咱們基於現有值建立了新的ProgressStatus對象。要感謝LockFreeUpdate方法,讀取PercentComplete的值、增長它並寫回的操做不會被不安全的搶佔:任何搶佔均可以被可靠的檢測到,觸發自旋重試。

相關文章
相關標籤/搜索