原文:http://www.albahari.com/threading/part5.aspxhtml
專題:C#中的多線程前端
在這一部分,咱們討論 Framework 4.0 加入的多線程 API,它們能夠充分利用多核處理器。程序員
Parallel
類SpinLock
和 SpinWait
這些 API 能夠統稱爲 PFX(Parallel Framework,並行框架)。Parallel
類與任務並行構造一塊兒被稱爲 TPL(Task Parallel Library,任務並行庫)。web
Framework 4.0 也增長了一些更底層的線程構造,它們針對傳統的多線程。咱們以前講過的:算法
SemaphoreSlim
、ManualResetEventSlim
、CountdownEvent
以及Barrier
)ThreadLocal<T>
在繼續閱讀前,你須要瞭解第 1 部分 - 第 4 部分中的基本原理,特別是鎖和線程安全。shell
並行編程這一部分提供的全部代碼均可以在LINQPad中試驗。LINQPad 是一個 C# 代碼草稿板,能夠用來測試代碼段,而無需建立類、項目或解決方案。想要獲取這些示例代碼,能夠在 LINQPad 左下方的 Samples 標籤頁中點擊 Download More Samples,而且選擇 C# 4.0 in a Nutshell: More Chapters。(譯者注:如今應該是 C# 5.0 in a Nutshell 和 C# 6.0 in a Nutshell 了)數據庫
近年來,CPU 時鐘頻率發展陷於停滯,製造商已經將重心轉移至增長核心數量。這對咱們程序員來講是個問題,由於標準的單線程代碼沒法自動利用那些增長的核心來提高程序運行速度。編程
利用多個核心對大多數服務端應用程序來講很容易,每一個線程能夠獨立處理單獨的客戶端請求,但在桌面環境下就不那麼容易了,由於一般這須要你優化計算密集型代碼,按以下步驟進行:數組
儘管你能夠使用傳統的多線程構造,但那比較笨拙,尤爲是在分解工做和整理結果的步驟。而且,爲確保線程安全,一般的策略是使用鎖,而它在不少線程同時訪問一份數據時會致使大量競爭。緩存
PFX 庫就是專門被設計用來爲這些場景提供幫助的。
利用多核心或多處理器的編程被稱爲並行編程(parallel programming)。它是多線程這個更寬泛概念的子集。
有兩種分解工做的策略:數據並行(data parallelism)和任務並行(task parallelism)。
當一系列任務須要處理不少數據時,可讓每一個線程都執行這一系列(相同的)任務來處理一部分數據(即全部數據的一個子集)。這樣實現的並行化稱爲數據並行,由於咱們是爲線程分解了數據。與此相對,任務並行是指對任務進行分解,換句話說就是讓每一個線程執行不一樣的任務。
一般,對高度並行的硬件來講,數據並行更簡單,可伸縮性也更好,由於它減小或消除了共享數據(也就減小了競爭和線程安全問題)。而且,事實上通常都是數據比任務要多,因此數據並行能夠增長併發的可能。
數據並行也有利於結構化並行(structured parallelism),意思是說並行工做單元的啓動和完成是在程序中的同一位置。相對的,任務並行趨向於非結構化,就是說並行工做單元的啓動和完成可 能分散在程序各處。結構化並行比較簡單,而且不易出錯,也讓你能夠把工做分解和線程協調(甚至包括結果整理)這些複雜的任務交給 PFX 庫來完成。
PFX 包含兩層功能。上層是由結構化數據並行 API:PLINQ和Parallel
類組成。下層包含任務並行的類,以及一組額外的構造,來幫助你實現並行編程。
PLINQ 提供了最豐富的功能:它可以自動化並行的全部步驟,包括分解工做、多線程執行、最後把結果整理成一個輸出序列。它被稱爲聲明式(declarative) 的,由於你只是聲明但願並行化你的工做(構造一個 LINQ 查詢),而後讓 Framework 來處理實現細節。相對的,另外一種方式是指令式(imperative)的,這種方式是須要你顯式編寫代碼來處理工做分解和結果整理。例如使用Parallel
類時,你必須本身整理結果,而若是使用任務並行構造,你還必須本身分解工做。
分解工做 | 整理結果 | |
---|---|---|
PLINQ | ||
Parallel類 | - | |
PFX 的任務並行 | - | - |
併發集合和自旋基元可 以幫助你實現低層次的並行編程。這很重要,由於 PFX 不只被設計適用於當今的硬件,也適用於將來更多核心的處理器。若是你但願搬運一堆木塊,而且有 32 個工人,最麻煩的是如何讓工人們搬運木塊時不互相擋道。這與把算法分解運行在 32 個核心上相似:若是普通的鎖被用於保護公共資源,所產生的阻塞可能意味着同時只有一小部分核心真正在工做。併發集合專門針對於高併發訪問,致力於最小化或消除阻塞。PLINQ 和 Parallel
類就依賴於併發集合和自旋基元來實現高效的工做管理。
傳統多線程的場景是,即便在單核的機器上,使用多線程也有好處,而此時並無真正的並行發生。就像咱們以前討論過的:保持用戶界面的響應以及同時下載兩個網頁。
這一部分將要講到的一些構造有時對於傳統多線程也有用。特別是:
Parallel
類在你並行執行操做以及等待它們完成(結構化並行)時有用。這包括非計算密集型任務,例如調用 web 服務。BlockingCollection
提供了一個簡單的工具來實現生產者 / 消費者結構。PFX 主要用於並行編程:充分利用多核處理器來加速執行計算密集型代碼。
充分利用多個核心的挑戰在於阿姆達爾定律(Amdahl’s law),它指出經過並行化產生的最大性能提高,取決於有多少必須順序執行的代碼段。例如,若是一個算法只有三分之二的執行時間能夠並行,即便有無數核心,也沒法得到超過三倍的性能提高。
所以,在使用 PFX 前,有必要先檢查可並行代碼中的瓶頸。還須要考慮下,你的代碼是否有必要是計算密集的,優化這裏每每是最簡單有效的方法。然而,這也須要平衡,由於一些優化技術會使代碼難以並行化。
最容易獲益的是「很差意思不併行的問題(embarrassingly parallel problems)」:工做能夠很容易地被分解爲多個任務,每一個任務本身能夠高效執行(結構化並行很是適合這種問題)。例如:不少圖片處理任務、光線跟蹤 算法、數學和密碼學方面的暴力計算和破解。而相反的例子是:實現快速排序算法的優化版本,想把它實現得好須要必定思考,而且可能須要非結構化並行。
PLINQ 會自動並行化本地的 LINQ 查詢。其優點在於使用簡單,由於將工做分解和結果整理的負擔交給了 Framework。
使用 PLINQ 時,只要在輸入序列上調用AsParallel()
,而後像日常同樣繼續 LINQ 查詢就能夠了。下邊的查詢計算 3 到 100,000 內的素數,這會充分利用目標機器上的全部核心。
// 使用一個簡單的(未優化)算法計算素數。 // // 注意:這一部分提供的全部代碼均可以在 LINQPad 中試驗。 IEnumerable<int> numbers = Enumerable.Range (3, 100000-3); var parallelQuery = from n in numbers.AsParallel() where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; int[] primes = parallelQuery.ToArray();
AsParallel
是System.Linq.ParallelEnumerable
中的一個擴展方法。它使用ParallelQuery<TSource>
來封裝輸入,就會將你隨後調用的 LINQ 查詢操做符綁定在ParallelEnumerable
中定義的另一組擴展方法上。它們提供了全部標準查詢操做符的並行化實現。本質上,它們就是將輸入序列進行分區,造成工做塊,並在不一樣的線程上執行,以後再將結果整理成一個輸出序列:
調用AsSequential()
能夠拆封ParallelQuery
,使隨後的查詢操做符綁定到標準查詢操做符來順序執行。在調用有反作用或非線程安全的方法前,有必要這樣作。
對於那些接受兩個輸入序列的查詢操做符(Join
、GroupJoin
、Contact
、Union
、Intersect
和Zip
)來講,必須在這兩個輸入序列上都使用AsParallel()
(不然將拋出異常)。然而,不須要爲中間過程的查詢使用AsParallel
,由於 PLINQ 的查詢操做符會輸出另外一個ParallelQuery
序列。實際上,在這個輸出序列上再次調用AsParallel
會下降效率,它會強制對序列進行合併和從新分區。
mySequence.AsParallel() // 使用 ParallelQuery<int> 封裝序列 .Where (n => n > 100) // 輸出另外一個 ParallelQuery<int> .AsParallel() // 不須要,會下降效率! .Select (n => n * n)
並不是全部的查詢操做符均可以被有效地並行化。對於那些不能的,PLINQ 使用了順序的實現。若是 PLINQ 認爲並行化的開銷實際會使查詢變慢,它也會順序執行。
PLINQ 僅適用於本地集合:它沒法在 LINQ to SQL 或 Entity Framework 中使用,由於在那些場景中,LINQ 會被翻譯成 SQL 語句,而後在數據庫服務器上執行。然而,你能夠使用 PLINQ 對從數據庫查詢得到的結果執行進一步的本地查詢。
若是 PLINQ 查詢拋出異常,它會被封裝進AggregateException
從新拋出,其InnerExceptions
屬性包含真正的異常。詳見使用 AggregateException
。
咱們知道AsParallel
能夠透明的並行化 LINQ 查詢,那麼問題來了,「微軟爲何不直接並行化標準查詢操做符,使 PLINQ 成爲默認的?」
有不少緣由使其成爲這種選擇使用(opt-in)的方式。首先,要使 PLINQ 有用,必需要有必定數量的計算密集型任務,它們能夠被分配到多個工做線程。大多數 LINQ to Objects 的查詢執行很是快,根本不須要並行化,並行化過程當中的任務分區、結果整理以及線程協調反而會使程序變慢。
其次:
AggregateException
中(可以處理拋出的多個異常)。最後,PLINQ 爲了進行微調提供了一些鉤子(hook)。把這些累贅加入標準的 LINQ to Objects 的 API 會增長使用障礙。
與普通的 LINQ 查詢同樣,PLINQ 查詢也是延遲估值的。這意味着只有當結果開始被使用時,查詢纔會被觸發執行。一般結果是經過一個foreach
循環被使用(經過轉換操做符也會觸發,例如ToArray
,還有返回單個元素或值的操做符)。
當枚舉結果時,執行過程與普通的順序查詢略有不一樣。順序查詢徹底由使用方經過「拉」的方式驅動:每一個元素都在使用方須要時從輸入序列中被提取。並行 查詢一般使用獨立的線程從輸入序列中提取元素,這可能比使用方的須要稍微提早了一些(很像一個給播報員使用的提詞機,或者 CD 機中的防震緩衝區)。而後經過查詢鏈並行處理這些元素,將結果保存在一個小緩衝區中,以準備在須要的時候提供給使用方。若是使用方在枚舉過程當中暫停或中 斷,查詢也會暫停或中止,這樣能夠不浪費 CPU 時間或內存。
你能夠經過在AsParallel
以後調用WithMergeOptions
來調整 PLINQ 的緩衝行爲。默認值AutoBuffered
一般能產生最佳的總體效果;NotBuffered
禁用緩衝,若是你但願儘快看到結果能夠使用這個;FullyBuffered
在呈現給使用方前緩存整個查詢的輸出(OrderBy
和Reverse
操做符天生以這種方式工做,取元素、聚合和轉換操做符也是同樣)。
並行化查詢操做符的一個反作用是:當整理結果時,不必定能與它們提交時的順序保持一致,就如同以前圖中所示的那樣。換句話說,就是沒法像普通的 LINQ 那樣能保證序列的正常順序。
若是你須要保持序列順序,能夠經過在AsParallel
後調用AsOrdered()
來強制它保證:
myCollection.AsParallel().AsOrdered()...
在大量元素的狀況下調用AsOrdered
會形成必定性能損失,由於 PLINQ 必須跟蹤每一個元素原始位置。
以後你能夠經過調用AsUnordered
來取消AsOrdered
的效果:這會引入一個「隨機洗牌點(random shuffle point)」,容許查詢從這個點開始更高效的執行。所以,若是你但願僅爲前兩個查詢操做保持輸入序列的順序,能夠這樣作:
inputSequence.AsParallel().AsOrdered() .QueryOperator1() .QueryOperator2() .AsUnordered() // 從這開始順序可有可無 .QueryOperator3() // ...
AsOrdered
不是默認的,由於對於大多數查詢來講,原始的輸入順序可有可無。換句話說,若是AsOrdered
是默認的,你就不得不爲大多數並行查詢使用AsUnordered
來得到最好的性能,這會成爲負擔。
目前,PLINQ 在可以並行化的操做上有些實用性限制。這些限制可能會在以後的更新包或 Framework 版本中解決。
下列查詢操做符會阻止查詢的並行化,除非源元素是在它們原始的索引位置:
Take
、TakeWhile
、Skip
和SkipWhile
Select
、SelectMany
和ElementAt
這幾個操做符的帶索引版本大多數查詢操做符都會改變元素的索引位置(包括可能移除元素的那些操做符,例如Where
)。這意味着若是你但願使用上述操做符,就要在查詢開始的地方使用。
下列查詢操做符能夠並行化,但會使用代價高昂的分區策略,有時可能比順序執行還慢。
Join
、GroupBy
、GroupJoin
、Distinct
、Union
、Intersect
和Except
Aggregate
操做符的帶種子(seed)的重載是不能並行化的,PLINQ 提供了專門的重載來解決。
其它全部操做符都是能夠並行化的,然而使用這些操做符並不能確保你的查詢會被並行化。若是 PLINQ 認爲進行分區的開銷會致使部分查詢變慢,它也許會順序執行查詢。你能夠覆蓋這個行爲,方法是在AsParallel()
以後調用以下代碼來強制並行化:
.WithExecutionMode (ParallelExecutionMode.ForceParallelism)
假設咱們但願實現一個拼寫檢查程序,它在處理大文檔時,可以經過充分利用全部可用的核心來快速運行。咱們把算法設計成一個 LINQ 查詢,這樣就能夠很容易的並行化它。
第一步是下載英文單詞字典,爲了可以高效查找,將其放在一個HashSet
中:
if (!File.Exists ("WordLookup.txt")) // 包含約 150,000 個單詞 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase);
而後,使用wordLookup
來建立一個測試「文檔」,該「文檔」是個包含了一百萬個隨機單詞的數組。建立完數組後,引入兩個拼寫錯誤:
var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入兩個 wordsToTest [23456] = "wubsie"; // 拼寫錯誤
如今,經過對比wordLookup
檢查wordsToTest
,來完成這個並行的拼寫檢查程序。PLINQ 讓這變得很簡單:
var query = wordsToTest .AsParallel() .Select ((word, index) => new IndexedWord { Word=word, Index=index }) .Where (iword => !wordLookup.Contains (iword.Word)) .OrderBy (iword => iword.Index); query.Dump(); // 在 LINQPad 中顯示輸出
下邊是 LINQPad 中的顯示的輸出:
IndexedWord
是一個自定義的結構體,定義以下:
struct IndexedWord { public string Word; public int Index; }
斷定器中的wordLookup.Contains
方法做爲查詢的主要部分,它使得這個查詢值得並行化。
咱們能夠使用匿名類型來代替IndexedWord
結構體,從而稍微簡化下這個查詢。然而這會下降性能,由於匿名類型(是類,所以是引用類型)會產生分配堆內存的開銷,以及以後的垃圾回收。
這個區別對於順序查詢來講沒太大關係,但對於並行查詢來講,基於棧的內存分配則至關有利。這是由於基於棧的內存分配是能夠高度並行化的(由於每一個線程有其本身的棧),反之基於堆的內存分配會使全部線程競爭同一個堆,它是由單一的內存管理器和垃圾回收器管理的。
來擴展一下咱們的例子,讓建立隨機測試單詞列表的過程並行化。咱們把它做爲 LINQ 查詢來構造,這樣事情就簡單多了。如下是順序執行版本:
string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray();
不幸的是,對Random.Next
的調用不是線程安全的,因此實現並行化不是向查詢語句直接插入AsParallel()
這麼簡單。一個可能的解決辦法是寫個方法對random.Next
加鎖,然而這會限制併發能力。更好的處理辦法是使用ThreadLocal<Random>
爲每一個線程建立獨立的Random
對象。而後咱們能夠使用以下代碼來並行化查詢:
var localRandom = new ThreadLocal<Random> ( () => new Random (Guid.NewGuid().GetHashCode()) ); string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel() .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)]) .ToArray();
在實例化Random
對象的工廠方法中,咱們傳遞了一個Guid
的散列值,用來確保:若是兩個Random
對象在很短的時間範圍內被建立,它們能夠生成不一樣的隨機數序列。
在你的程序中尋找 LINQ 查詢,嘗試並行化它們貌似是很誘人的。然而這一般沒什麼用,由於絕大多數明顯應該使用 LINQ 的地方執行都很快,因此並行化並無什麼好處。更好的方法是找到 CPU 密集型工做的瓶頸,而後考慮「這能寫成 LINQ 查詢嗎?」(這樣重構的一個好處是 LINQ 一般能夠使代碼變得更短,而且更具可讀性。)
PLINQ 很是適合於「很差意思不併行的問題(embarrassingly parallel problems)」。它也能很好的應用於結構化阻塞任務(structured blocking tasks),例如同時調用多個 web 服務(見調用阻塞或 I/O 密集型功能)。
對於圖像處理來講 PLINQ 是個糟糕的選擇,由於整理幾百萬個像素到輸出序列將造成瓶頸。更好的方法是把像素直接寫入數組或非託管的內存塊,而後使用Parallel
類或任務並行來管理多線程。(也能夠使用ForAll
來繞過結果整理。若是該圖像處理算法天生適合 LINQ,這麼作可能有益。)
(譯者注:pure function 譯爲純方法,是指一個方法 / 函數不能改變任何狀態,也不能進行任何 I/O 操做,它的返回值不能依賴任何可能被改變的狀態,而且使用相同的輸入調用就會產生相同的輸出。)
由於 PLINQ 會在並行的線程上運行查詢,所以必須注意不要執行非線程安全的操做。特別須要注意,對變量進行寫操做有反作用(side-effecting),是非線程安全的。
// 下列查詢將每一個元素與其索引相乘。 // 給定一個 0 到 999 的輸入序列, 它應該輸出元素的平方。 int i = 0; var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;
能夠經過使用鎖或Interlocked
來確保i
的自增是線程安全的,可是問題仍然存在,i
並不能保證對應輸入元素的原始索引。而且加上AsOrdered
也沒法解決這個問題,由於AsOrdered
僅僅確保輸出是按順序的,就像順序執行的輸出順序同樣。但這並不意味着實際的處理過程也是按順序的。
替代方法是將這個查詢重寫,使用帶索引的Select
版本。
var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);
爲了達到最佳性能,任何被查詢操做符調用的方法必須是線程安全的:不要給字段或屬性賦值(無反作用,純方法)。若是用鎖來保證線程安全,查詢的並行能力將會受到限制。這個限制能夠經過鎖定的持續時間除以花費在方法上的總時間來計算。
有時一個查詢的長時間運行並非由於是 CPU 密集型操做,而是由於它在等待某些東西,例如等待網頁下載或是硬件的響應。PLINQ 可以有效地並行化這種類型的查詢,能夠經過在AsParallel
後調用WithDegreeOfParallelism
來提示這種特徵。例如,假設咱們但願同時 ping 6 個網站。比起使用異步委託或手動讓 6 個線程自旋,使用 PLINQ 查詢能夠輕鬆實現它:
from site in new[] { "www.albahari.com", "www.linqpad.net", "www.oreilly.com", "www.takeonit.com", "stackoverflow.com", "www.rebeccarey.com" } .AsParallel().WithDegreeOfParallelism(6) let p = new Ping().Send (site) select new { site, Result = p.Status, Time = p.RoundtripTime }
WithDegreeOfParallelism
強制 PLINQ 同時運行指定數量的任務。在調用阻塞方法(例如Ping.Send
)時有必要這麼作,不然的話,PLINQ 會認爲這個查詢是 CPU 密集型的,並進行相應的任務分配。在雙核機器上,PLINQ 會默認同時運行 2 個任務,對於上述狀況來講這顯然不是咱們但願看到的。
受線程池的影響,PLINQ 一般爲每一個任務分配一個線程。能夠經過調用ThreadPool.SetMinThreads
來加速初始線程的建立速度。
再給一個例子:假設咱們要實現一個監控系統,但願它不斷未來自 4 個安全攝像頭的圖像合併成一個圖像,並在閉路電視上顯示。使用下邊的類來表示一個攝像頭:
class Camera { public readonly int CameraID; public Camera (int cameraID) { CameraID = cameraID; } // 獲取來自攝像頭的圖像: 返回一個字符串來代替圖像 public string GetNextFrame() { Thread.Sleep (123); // 模擬獲取圖像的時間 return "Frame from camera " + CameraID; } }
要獲取一個合成圖像,咱們必須分別在 4 個攝像頭對象上調用GetNextFrame
。假設操做主要是受 I/O 影響的,經過並行化咱們能將幀率提高 4 倍,即便是在單核機器上。PLINQ 使用一小段程序就能實現它:
Camera[] cameras = Enumerable.Range (0, 4) // 建立 4 個攝像頭對象 .Select (i => new Camera (i)) .ToArray(); while (true) { string[] data = cameras .AsParallel().AsOrdered().WithDegreeOfParallelism (4) .Select (c => c.GetNextFrame()).ToArray(); Console.WriteLine (string.Join (", ", data)); // 顯示數據... }
GetNextFrame
是一個阻塞方法,因此咱們使用了WithDegreeOfParallelism
來得到指望的併發度。在咱們的例子中,阻塞是在調用Sleep
時發生。而在真實狀況下,阻塞的發生是由於從攝像頭中獲取圖像是 I/O 密集型操做,而不是 CPU 密集型操做。
調用AsOrdered
能夠確保圖像按照一致的順序顯示。由於序列中只有 4 個元素,因此它對性能的影響能夠忽略不計。
在一個 PLINQ 查詢內,僅可以調用WithDegreeOfParallelism
一次。若是你須要再次調用它,必須在查詢中經過再次調用AsParallel()
強制進行查詢的合併和從新分區:
"The Quick Brown Fox" .AsParallel().WithDegreeOfParallelism (2) .Where (c => !char.IsWhiteSpace (c)) .AsParallel().WithDegreeOfParallelism (3) // 強制合併和從新分區 .Select (c => char.ToUpper (c))
當在foreach
循環中使用 PLINQ 查詢的結果時,取消該查詢很簡單:使用break
退出循環就能夠了。查詢會被自動取消,由於枚舉器會被隱式銷燬。
對於結束一個使用轉換、取元素或聚合操做符的查詢來講,你能夠在其它線程使用取消標記來取消它。在AsParallel
後調用WithCancellation
來添加一個標記,並把CancellationTokenSource
對象的Token
屬性做爲參數傳遞。以後另外一個線程就能夠在這個CancellationTokenSource
對象上調用Cancel
,它會在查詢的使用方那邊拋出OperationCanceledException
異常。
IEnumerable<int> million = Enumerable.Range (3, 1000000); var cancelSource = new CancellationTokenSource(); var primeNumberQuery = from n in million.AsParallel().WithCancellation (cancelSource.Token) where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0) select n; new Thread (() => { Thread.Sleep (100); // 在 100 毫秒後 cancelSource.Cancel(); // 取消查詢 } ).Start(); try { // 開始運行查詢 int[] primes = primeNumberQuery.ToArray(); // 永遠到不了這裏,由於其它線程會進行取消操做。 } catch (OperationCanceledException) { Console.WriteLine ("Query canceled"); }
PLINQ 不會直接停止線程,由於這麼作是危險的。在取消時,它會等待全部工做線程處理完當前的元素,而後結束查詢。這意味着查詢調用的任何外部方法都會執行完成。
PLINQ 的一個優勢是它可以很容易地將並行化任務的結果整理成一個輸出序列。然而有時,最終要作的是在輸出序列的每一個元素上運行一些方法:
foreach (int n in parallelQuery) DoSomething (n);
若是是上述狀況,而且不關心元素的處理順序,那麼能夠使用 PLINQ 的ForAll
方法來提升效率。
ForAll
方法在ParallelQuery
的每一個輸出元素上運行一個委託。它直接掛鉤(hook)到 PLINQ 內部,繞過整理和枚舉結果的步驟。舉個栗子:
"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);
整理和枚舉結果的開銷不是很是大,因此當有大量輸入元素且處理執行很快的時候,才能最大化ForAll
優化的收益。
PLINQ 有 3 種分區策略,用來分配輸入元素到線程:
策略 | 元素分配 | 相對性能 |
---|---|---|
塊分區(Chunk partitioning) | 動態 | 平均 |
範圍分區(Range partitioning) | 靜態 | 差 - 極好 |
散列分區(Hash partitioning) | 靜態 | 差 |
對於那些須要比較元素的查詢操做符(GroupBy
、Join
、GroupJoin
、Intersect
、Except
、Union
和Distinct
),PLINQ 老是使用散列分區。散列分區相對低效,由於它必須預先計算每一個元素的散列值(擁有一樣散列值的元素會在同一個線程中被處理)。若是發現運行太慢,惟一的選擇是調用AsSequential
來禁止並行處理。
對於其它全部查詢操做符,你能夠選擇使用範圍分區或塊分區。默認狀況下:
IList<T>
的實現),PLINQ 選用範圍分區。歸納來說,對於較長的序列且處理每一個元素所需的 CPU 時間比較近似時,範圍分區更快。不然,塊分區一般更快。
若是想強制使用範圍分區:
Enumerable.Range
開始,將其替換爲ParallelEnumerable.Range
。ToList
或ToArray
(顯然,你須要考慮在這裏產生的性能開銷)。ParallelEnumerable.Range
並非對Enumerable.Range(…).AsParallel()
的簡單封裝。它經過激活範圍分區改變了查詢的性能。
若是想強制使用塊分區,就經過調用Partitioner.Create
(在命名空間System.Collection.Concurrent
中)來封裝輸入序列,例如:
int[] numbers = { 3, 4, 5, 6, 7, 8, 9 }; var parallelQuery = Partitioner.Create (numbers, true).AsParallel() .Where (...)
Partitioner.Create
的第二個參數表示:但願對查詢開啓負載均衡(load-balance),這是另外一個使用塊分區的動機。
塊分區的工做方式是按期從輸入序列中抓取小塊元素來處理。PLINQ 一開始會分配很是小的塊(一次 1 到 2 個元素),而後隨着查詢的進行增長塊的大小:這確保小序列可以被有效地並行化,而大序列不會致使過多的抓取工做。若是一個工做線程碰巧拿到了一些「容易」 的元素(處理很快),它最終將拿到更多的塊。這個系統使每一個線程保持均等的繁忙程度(使核心負載均衡)。惟一的不利因素是從共享的輸入序列中獲取元素須要 同步(一般使用一個排它鎖),這會產生必定的開銷和競爭。
範圍分區會繞過正常的輸入端枚舉,而且爲每一個工做線程預分配相同數量的元素,避免了在輸入序列上的競爭。可是若是某些線程拿到了容易的元素並很早就 完成了處理,在其它工做線程仍在繼續工做的時候它就會是空閒的。咱們以前的素數計算的例子在使用範圍分區時就性能不高。舉個範圍分區適用的例子,計算 1000 萬之內數字的平方和:
ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))
ParallelEnumerable.Range
返回一個ParallelQuery<T>
,所以不須要在以後調用AsParallel
。
範圍分區不是必須把元素分紅相鄰的塊,它也許會選用一種 「條紋式(striping)」策略。例如,有兩個工做線程,一個工做線程可能會處理奇數位置的元素,而另外一個工做線程處理偶數位置的元素。TakeWhile
操做符幾乎必定會觸發條紋式策略,用來避免處理序列後邊沒必要要的元素。
PLINQ 能夠在無需額外干預的狀況下有效地並行化Sum
、Average
、Min
和Max
操做符。然而,Aggregate
操做符對於 PLINQ 來講是個特殊的麻煩。
若是不熟悉Aggregate
操做符,你能夠認爲它就是一個Sum
、Average
、Min
和Max
的泛化版本,換句話說,就是一個能夠使你經過自定義的聚合算法實現非一般聚合操做的操做符。以下代碼展示了Aggregate
如何實現Sum
操做符的工做:
int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate (0, (total, n) => total + n); // 6
Aggregate
的第一個參數是 seed(種子,初值),聚合操做從這裏開始。第二個參數是一個用於更新聚合值的表達式,該表達式生成一個新的元素。第三個參數是可選的,用來表示如何經過聚合值生成最終的結果值。
大多數Aggregate
被設計用來解決的問題都可以使用foreach
循環輕鬆解決,而且這也是更熟悉的語法。而Aggregate
的優勢在於對龐大或複雜的聚合操做能夠使用 PLINQ 來進行聲明式的並行化。
調用Aggregate
時能夠省略種子值,這種狀況下第一個元素會被隱式看成種子,以後聚合處理會從第二個元素開始進行。下邊是一個無種子的例子:
int[] numbers = { 1, 2, 3 }; int sum = numbers.Aggregate ((total, n) => total + n); // 6
這獲得了與以前相同的結果,然而實際上倒是進行了不一樣的計算。以前例子計算的是 0+1+2+3
,而如今計算的是1+2+3
。經過乘法運算來代替加法運算可以更好地說明這個不一樣:
int[] numbers = { 1, 2, 3 }; int x = numbers.Aggregate (0, (prod, n) => prod * n); // 0*1*2*3 = 0 int y = numbers.Aggregate ( (prod, n) => prod * n); // 1*2*3 = 6
如同咱們立刻將要看到的,無種子的聚合的優勢在於被並行化時不須要使用特殊的重載。然而,無種子的聚合存在一個陷阱:無種子的聚合方法指望使用的委 託中的計算應知足交換律和結合律。若是用在別的狀況下,結果要否則是反直覺的(普通查詢),要否則是不肯定的(PLINQ 並行化查詢)。例如考慮以下函數:
(total, n) => total + n * n
它既不知足交換律也不知足結合律。(例如:1+2*2 != 2+1*1
)。咱們來看一下使用它來對數字 二、三、4 計算平方和時會發生什麼:
int[] numbers = { 2, 3, 4 }; int sum = numbers.Aggregate ((total, n) => total + n * n); // 27
原本的計算應該是:
2*2 + 3*3 + 4*4 // 29
但如今的計算是:
2 + 3*3 + 4*4 // 27
能夠經過多種方法解決這個問題。首先,咱們能夠在序列最前端加入 0 做爲第一個元素:
int[] numbers = { 0, 2, 3, 4 };
這不只不優雅,並且在並行執行的狀況下仍然會產生錯誤的結果,由於 PLINQ 會選擇多個元素做爲種子,這至關於假定了計算知足結合律。爲說明這個問題,用以下方式表示咱們的聚合函數:
f(total, n) => total + n * n
LINQ to Objects 會這樣計算:
f(f(f(0, 2),3),4)
PLINQ 可能會這樣計算:
f(f(0,2),f(3,4))
結果是:
第一個分區: a = 0 + 2*2 (= 4)
第二個分區: b = 3 + 4*4 (= 19)
最終結果: a + b*b (= 365)
甚至多是: b + a*a (= 35)
有兩種好的解決方案:第一種是將其轉換爲有種子的聚合,使用 0 做爲種子。這種方案帶來的複雜度的提高僅僅是使用 PLINQ 時,咱們須要使用特殊的重載,確保查詢並行執行(立刻會看到)。
第二種解決方案是:重構查詢,使聚合函數知足交換律和結合律:
int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n);
固然,在這種簡單的場景下你能夠(而且應該)使用Sum
操做符來代替Aggregate
:
int sum = numbers.Sum (n => n * n);
實際上能夠更進一步使用Sum
和Average
。例如,能夠使用Average
來計算均方根(root-mean-square):
Math.Sqrt (numbers.Average (n => n * n))
甚至是標準差:
double mean = numbers.Average(); double sdev = Math.Sqrt (numbers.Average (n => { double dif = n - mean; return dif * dif; }));
上述兩個方法都是安全、高效而且可徹底並行化的。
咱們剛剛看到了無種子的聚合,提供的委託必須知足交換律和結合律。若是違反這個規則,PLINQ 會給出錯誤的結果,由於它可能使用輸入序列中多個的元素做爲種子,來同時聚合多個分區。
指定種子的聚合也許看起來像是使用 PLINQ 的安全選擇,然而不幸的是,這樣一般會致使順序執行,由於它依賴於單獨一個種子。爲減緩這個問題,PLINQ 提供了另外一個Aggregate
的重載,容許你指定多個種子,或者是一個種子工廠方法。對每一個線程,它執行這個方法來生成一個獨立的種子,這就造成了一個線程局部的累加器,經過它在聚合局部元素。
你必須再提供一個方法來指示如何合併局部累加器至主累加器。最後,Aggregate
的這個重載還須要一個委託,用來對結果進行任意的最終變換(有些不必,你能夠以後對結果運行一些代碼完成一樣操做)。因此,這裏有 4 個委託,按照它們被傳遞的順序:
seedFactory
):updateAccumulatorFunc
):combineAccumulatorFunc
):resultSelector
):在簡單的場景中,你能夠指定一個種子值來代替種子工廠。當種子是你須要改變的引用類型時這種策略行不通,由於同一個實例將在線程間共享。
提供一個簡單的例子,下邊的代碼對numbers
數組中的值進行求和:
numbers.AsParallel().Aggregate ( () => 0, // 種子工廠 (localTotal, n) => localTotal + n, // 更新累加器方法 (mainTot, localTot) => mainTot + localTot, // 合併累加器方法 finalResult => finalResult) // 結果選擇器
這個例子有些刻意,咱們能夠使用更簡單的方式獲取相同的結果(例如無種子的聚合,或者更好的選擇是使用Sum
操做符)。給一個更加實際的例子,假設咱們要計算字符串中每一個英文字母的出現頻率。簡單的順序執行方案看起來是這樣:
string text = "Let’s suppose this is a really long string"; var letterFrequencies = new int[26]; foreach (char c in text) { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; };
基因序列是一個輸入文本可能會很是長的例子,它的「字母表」是由字母 a、c、g、t 組成。
爲了將它並行化,咱們能夠把foreach
替換爲Parallel.ForEach
(在接下來的一節會講到),但這會致使共享數組上的併發問題。對數組的訪問加鎖能夠解決問題,但會下降併發的可能性。
Aggregate
提供了一個好的解決方案。這種狀況下,累加器是一個數組,就像是以前例子中letterFrequencies
數組。使用Aggregate
的順序執行版本以下:
int[] result = text.Aggregate ( new int[26], // 建立「累加器」 (letterFrequencies, c) => // 聚合一個字母至累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) letterFrequencies [index]++; return letterFrequencies; });
下面是並行版本,它使用 PLINQ 的專門重載:
int[] result = text.AsParallel().Aggregate ( () => new int[26], // 新建局部累加器 (localFrequencies, c) => // 聚合至局部累加器 { int index = char.ToUpper (c) - 'A'; if (index >= 0 && index <= 26) localFrequencies [index]++; return localFrequencies; }, // 聚合局部累加器至主累加器 (mainFreq, localFreq) => mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(), finalResult => finalResult // 對結果進行 ); // 最終變換
注意:局部累加方法會改動localFrequencies
數組。這個優化是很是重要的,也是合法的,由於localFrequencies
是每一個線程的局部變量。
PFX 經過Parallel
類上的三個靜態方法提供告終構化並行的基本形式:
Parallel.Invoke
Parallel.For
for
循環的並行版本
Parallel.ForEach
foreach
循環的並行版本
三個方法都是在工做完成前會阻塞。相似於PLINQ,若是有未處理的異常,其它工做線程會在當前迭代完成以後中止,異常會被封裝在AggregateException
中拋給調用方。
Parallel.Invoke
並行執行一組Action
類型的委託,而後等待它們完成。這個方法最簡單的版本以下:
public static void Invoke (params Action[] actions);
下面是使用Parallel.Invoke
來同時下載兩個網頁:
Parallel.Invoke ( () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"), () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));
這表面上看起來像是建立了兩個Task
對象(或異步委託)並等待它們。可是有個重要的區別:Parallel.Invoke
在你傳遞一百萬個委託時仍然能高效工做。這是由於它會對大量元素進行分區(partition),造成多個塊,再對其分配底層的Task
。而不是直接對每個委託建立獨立的Task
。
使用Parallel
上的全部方法時,都須要自行實現整理結果的代碼。這意味着你須要注意線程安全。例如,下面的代碼不是線程安全的:
var data = new List<string>(); Parallel.Invoke ( () => data.Add (new WebClient().DownloadString ("http://www.foo.com")), () => data.Add (new WebClient().DownloadString ("http://www.far.com")));
對添加的過程加鎖能夠解決問題,可是若是你的委託數量更多,它們每個執行的又很快,那麼鎖可能形成瓶頸。更好的解決方案是使用線程安全的集合,好比ConcurrentBag
就是這裏的理想方案。
Parallel.Invoke
也有接受ParallelOptions
對象的重載:
public static void Invoke (ParallelOptions options, params Action[] actions);
經過ParallelOptions
,你能夠添加取消標記、限制最大併發數量和指定自定義任務調度器。若是要執行的委託數量(大體上)大於核心數,那麼使用取消標記纔有意義:在取消時,全部未啓動的委託都會被拋棄。而全部已經在執行的委託會繼續完成。對於如何使用取消標記,能夠參考取消中的例子。
Parallel.For
和Parallel.ForEach
與 C# for
和foreach
相似,但會並行執行,而不是順序執行。下面是它們(最簡單的)方法簽名:
public static ParallelLoopResult For ( int fromInclusive, int toExclusive, Action<int> body) public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource> body)
對於下面的for
循環:
for (int i = 0; i < 100; i++) Foo (i);
並行版本是這樣:
Parallel.For (0, 100, i => Foo (i));
或更簡潔的:
Parallel.For (0, 100, Foo);
而對於下面的foreach
循環:
foreach (char c in "Hello, world") Foo (c);
並行版本是這樣:
Parallel.ForEach ("Hello, world", Foo);
給一個實際點的例子。引入System.Security.Cryptography
命名空間,而後咱們能夠像這樣並行生成六組密鑰對的字符串形式:
var keyPairs = new string[6]; Parallel.For (0, keyPairs.Length, i => keyPairs[i] = RSA.Create().ToXmlString (true));
與Parallel.Invoke
一樣,咱們也可讓Parallel.For
和Parallel.ForEach
執行大量工做項,它們也會被分區,分配給任務高效執行。
上面的例子也能夠使用PLINQ來實現:
string[] keyPairs = ParallelEnumerable.Range (0, 6) .Select (i => RSA.Create().ToXmlString (true)) .ToArray();
Parallel.For
和Parallel.ForEach
一般更適合用於外循環,而不是內循環。這是由於前者會帶來更大的分區塊,就稀釋了管理並行的開銷。通常沒有必要同時並行內外循環。對於下面的例子,咱們須要 100 個核心才能讓內循環的並行有益處:
Parallel.For (0, 100, i => { Parallel.For (0, 50, j => Foo (i, j)); // 對於內循環, }); // 順序執行更好。
有時須要獲知循環迭代的索引。在順序的foreach
中這很簡單:
int i = 0; foreach (char c in "Hello, world") Console.WriteLine (c.ToString() + i++);
然而在並行環境中,讓共享變量自增並非線程安全的。你必須使用下面這個ForEach
版本:
public static ParallelLoopResult ForEach<TSource> ( IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)
先忽略ParallelLoopState
(下一節會講)。如今咱們關注的是Action
的第三個long
類型的參數,它表明了循環的索引:
Parallel.ForEach ("Hello, world", (c, state, i) => { Console.WriteLine (c.ToString() + i); });
爲了把它用到實際場景中,咱們來回顧下使用 PLINQ 的拼寫檢查。下面的代碼加載了一個字典,並生成了一個用來測試的數組,有一百萬個測試項:
if (!File.Exists ("WordLookup.txt")) // 包含約 150,000 個單詞 new WebClient().DownloadFile ( "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt"); var wordLookup = new HashSet<string> ( File.ReadAllLines ("WordLookup.txt"), StringComparer.InvariantCultureIgnoreCase); var random = new Random(); string[] wordList = wordLookup.ToArray(); string[] wordsToTest = Enumerable.Range (0, 1000000) .Select (i => wordList [random.Next (0, wordList.Length)]) .ToArray(); wordsToTest [12345] = "woozsh"; // 引入兩個 wordsToTest [23456] = "wubsie"; // 拼寫錯誤
咱們能夠使用帶索引的Parallel.ForEach
來對wordsToTest
數組進行拼寫檢查,以下:
var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); });
注意,必須使用線程安全的集合來整理結果:這一點是相對於使用 PLINQ 的劣勢。而優點是咱們能夠避免使用帶索引的Select
查詢操做符,它沒有帶索引的ForEach
高效。
由於對於並行的For
和ForEach
循環,循環體是一個委託,因此就沒法使用break
語句來提早退出循環。在這裏,你必須使用ParallelLoopState
對象上的Break
或Stop
:
public class ParallelLoopState { public void Break(); public void Stop(); public bool IsExceptional { get; } public bool IsStopped { get; } public long? LowestBreakIteration { get; } public bool ShouldExitCurrentIteration { get; } }
獲取ParallelLoopState
很容易:全部版本的For
和ForEach
都有重載能夠接受Action<TSource,ParallelLoopState>
類型的循環體。因此,若是要並行化:
foreach (char c in "Hello, world") if (c == ',') break; else Console.Write (c);
能夠使用:
Parallel.ForEach ("Hello, world", (c, loopState) => { if (c == ',') loopState.Break(); else Console.Write (c); });
輸出:
Hlloe
從結果中能夠發現,循環體會以隨機順序完成。除這點不一樣之外,調用Break
會給出與順序循環至少相同數量的元素:在上例中老是以必定順序至少輸出 H、e、l、l、o 這幾個字母。而若是改成調用Stop
,會強制全部線程在當前迭代完成後當即結束。在上例中,若是有些線程滯後了,調用Stop
可能給出 H、e、l、l、o 的子集。當發現已經找到了須要的東西時,或是發現出錯了不想看結果的狀況下,Stop
比較適用。
Parallel.For
和Parallel.ForEach
方法都返回一個ParallelLoopResult
對象,它暴露了IsCompleted
和LowestBreakIteration
屬性。它們能夠告知循環是否完成,若是沒有完成,是在哪一個迭代中斷的。
若是LowestBreakIteration
返回null
,意味着在循環中調用了Stop
(而不是Break
)。
若是你的循環體很長,可能會但願其它線程可以在執行中途中斷循環體,來讓使用Break
或Stop
時更快的退出。實現方法是,在代碼中多個地方查詢ShouldExitCurrentIteration
屬性,它會在調用Stop
後當即爲true
,或者是在Break
後很快爲true
。
ShouldExitCurrentIteration
在請求取消或者循環中有異常拋出時也會爲true
。
IsExceptional
屬性能夠告知其它線程上是否有異常產生。任何未處理的異常都會致使循環在全部線程完成當前迭代後結束:若是想要避免,必須在代碼中顯式處理異常。
Parallel.For
和Parallel.ForEach
都提供了擁有TLocal
泛型變量的重載。這是爲了協助你優化密集迭代的循環中的數據整理工做。最簡單的形式以下:
public static ParallelLoopResult For <TLocal> ( int fromInclusive, int toExclusive, Func <TLocal> localInit, Func <int, ParallelLoopState, TLocal, TLocal> body, Action <TLocal> localFinally);
這些方法在實際中不多用到,由於它們的目標場景基本都被PLINQ覆蓋了(好開森,由於這些重載真可怕!)。
本質上,問題在於:假設咱們要計算從 1 到 10,000,000 的平方根的和。並行計算一千萬個平方根很容易,可是求和是個問題,由於必須像這樣加鎖才能更新和值:
object locker = new object(); double total = 0; Parallel.For (1, 10000000, i => { lock (locker) total += Math.Sqrt (i); });
並行化的收益都被獲取一千萬個鎖的開銷抵消了,還不算致使的阻塞。
然而,實際上並不須要一千萬個鎖。想象一隊志願者撿一大堆垃圾的場景,若是你們都共享單獨一個垃圾桶,那衝突就會使整個過程極端低效。明顯的方案是每一個人都有本身「局部」的垃圾桶,偶爾去一趟主垃圾桶傾倒乾淨。
For
和ForEach
的TLocal
版本就是這樣工做的。志願者就是內部的工做線程,局部值(local value)就是局部垃圾桶。想要讓Parallel
以這種方式工做,那麼必須提供兩個額外的委託:
另外,循環體委託如今不能返回void
,而是應該返回局部值新的聚合結果。下面是重構後的例子:
object locker = new object(); double grandTotal = 0; Parallel.For (1, 10000000, () => 0.0, // 初始化局部值 (i, state, localTotal) => // 循環體委託。注意如今 localTotal + Math.Sqrt (i), // 返回新的局部值 localTotal => // 把局部值 { lock (locker) grandTotal += localTotal; } // 加入主值 );
咱們仍是須要鎖,可是隻須要鎖定將局部和加入總和的過程。這讓處理效率有了極大的提高。
前面說過,PLINQ 通常更適合這些場景。咱們的例子若是使用 PLINQ 來並行會很簡單:
ParallelEnumerable.Range(1, 10000000) .Sum (i => Math.Sqrt (i))
(注意咱們使用了ParallelEnumerable
來強制範圍分區:在這裏能夠提升性能,由於對全部數字的計算都是相等時間的。)
更復雜的場景中,你可能會用到 LINQ 的Aggregate
操做符而不是Sum
。若是指定了局部種子工廠,那狀況就和使用局部值的Parallel.For
差很少了。
任務並行(task parallelism)是 PFX 中最底層的並行方式。這一層次的類定義在System.Threading.Tasks
命名空間中,以下所示:
類 | 做用 |
---|---|
Task |
管理工做單元 |
Task<TResult> |
管理有返回值的工做單元 |
TaskFactory |
建立任務 |
TaskFactory<TResult> |
建立有相同返回類型的任務和任務延續 |
TaskScheduler |
管理任務調度 |
TaskCompletionSource |
手動控制任務的工做流 |
本質上,任務是用來管理可並行工做單元的輕量級對象。任務使用 CLR 的線程池來避免啓動獨立線程的開銷:它和ThreadPool.QueueUserWorkItem
使用的是同一個線程池,在 CLR 4.0 中這個線程池被調節過,讓Task
工做的更有效率(通常來講)。
須要並行執行代碼時均可以使用任務。然而,它們是爲了充分利用多核而調節的:事實上,Parallel
類和PLINQ內部就是基於任務並行構建的。
任務並不僅是提供了簡單高效的使用線程池的方式。它們還提供了一些強大的功能來管理工做單元,包括:
任務也實現了局部工做隊列(local work queues),這個優化可以讓你高效的建立不少快速執行的子任務,而不會帶來單一工做隊列會致使的競爭開銷。
TPL 可讓你使用極小的開銷建立幾百個(甚至幾千個)任務,但若是你要建立上百萬個任務,那須要把這些任務分紅大一些的工做單元纔能有效率。Parallel
類和 PLINQ 能夠自動實現這種工做分解。
Visual Studio 2010 提供了一個新的窗口來監視任務(調試 | 窗口 | 並行任務)。它和線程窗口相似,只是用於任務。並行棧窗口也有一個專門的模式用於任務。
如同咱們在第 1 部分線程池的討論中那樣,你能夠調用Task.Factory.StartNew
,並給它傳遞一個Action
委託來建立並啓動Task
:
Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!"));
泛型的版本Task<TResult>
(Task
的子類)可讓你在任務結束時得到返回的數據:
Task<string> task = Task.Factory.StartNew<string> (() => // 開始任務 { using (var wc = new System.Net.WebClient()) return wc.DownloadString ("http://www.linqpad.net"); }); RunSomeOtherMethod(); // 咱們能夠並行的作其它工做... string result = task.Result; // 等待任務結束並獲取結果
Task.Factory.StartNew
是一步建立並啓動任務。你也能夠分解它,先建立Task
實例,再調用Start
:
var task = new Task (() => Console.Write ("Hello")); // ... task.Start();
使用這種方式建立的任務也能夠同步運行(在當前線程上):使用RunSynchronously
替代Start
。
能夠使用Status
屬性來追蹤任務的執行狀態。
當建立任務實例或調用Task.Factory.StartNew
時,能夠指定一個狀態對象(state object),它會被傳遞給目標方法。若是你但願直接調用方法而不是 lambda 表達式,則能夠使用它。
static void Main() { var task = Task.Factory.StartNew (Greet, "Hello"); task.Wait(); // 等待任務結束 } static void Greet (object state) { Console.Write (state); } // 打印 "Hello"
由於 C# 中有 lambda 表達式,咱們能夠更好的使用狀態對象,用它來給任務賦予一個有意義的名字。而後就能夠使用AsyncState
屬性來查詢這個名字:
static void Main() { var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting"); Console.WriteLine (task.AsyncState); // 打印 "Greeting" task.Wait(); } static void Greet (string message) { Console.Write (message); }
Visual Studio 會在並行任務窗口顯示每一個任務的AsyncState
屬性,因此指定有意義的名字能夠很大程度的簡化調試。
在調用StartNew
(或實例化Task
)時,能夠指定一個TaskCreationOptions
枚舉來調節線程的執行。TaskCreationOptions
是一個按位組合的枚舉,它有下列(可組合的)值:LongRunning
、PreferFairness
和AttachedToParent
。
LongRunning
向調度器建議爲任務使用一個獨立的線程。這對長時間運行的任務有好處,由於它們可能會「霸佔」隊列,強迫短期任務等待過長的時間後才能被調度。LongRunning
對於會阻塞的任務也有好處。
因爲任務調度器通常會試圖保持恰好足夠數量的任務在線程上運行,來保持全部 CPU 核心都工做。因此不要超額分配(oversubscribing) CPU,或者說不要使用過多的活動線程,以免因爲操做系統被迫進行大量耗時的時間切片和上下文切換致使的性能降低。
PreferFairness
讓調度器試圖確保任務以它們啓動的順序被調度。默認狀況下是使用另外一種方式,由於內部使用了局部工做竊取隊列來優化任務調度。這個優化對於很是小的(細粒度)任務有實際的好處。
AttachedToParent
用來建立子任務。
當一個任務啓動另外一個任務時,你能夠經過指定TaskCreationOptions.AttachedToParent
選擇性地創建父子關係:
Task parent = Task.Factory.StartNew (() => { Console.WriteLine ("I am a parent"); Task.Factory.StartNew (() => // 分離的任務 { Console.WriteLine ("I am detached"); }); Task.Factory.StartNew (() => // 子任務 { Console.WriteLine ("I am a child"); }, TaskCreationOptions.AttachedToParent); });
子任務的特殊之處在於,當你等待父任務結束時,也一樣會等待全部子任務。這對於子任務是一個延續任務時很是有用,稍後咱們會看到。
有兩種方式能夠顯式等待任務完成:
Wait
方法(可選擇指定超時時間)Result
屬性(當使用Task<TResult>
時)也能夠同時等待多個任務:經過靜態方法Task.WaitAll
(等待全部指定任務完成)和Task.WaitAny
(等待任意一個任務完成)。
WaitAll
和依次等待每一個任務相似,但它更高效,由於它只須要(至多)一次上下文切換。而且,若是有一個或多個任務拋出未處理的異常,WaitAll
仍然可以等待全部任務,並在以後從新拋出一個AggregateException
異常,它聚合了全部出錯任務的異常,功能至關於下面的代碼:
// 假設 t一、t2 和 t3 是任務: var exceptions = new List<Exception>(); try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); } if (exceptions.Count > 0) throw new AggregateException (exceptions);
調用WaitAny
至關於在一個ManualResetEventSlim
上等待,每一個任務結束時都對它發信號。
除了使用超時時間,你也能夠傳遞一個取消標記給Wait
方法:這樣能夠取消等待。注意這不是取消任務。
當你等待一個任務結束時(經過調用Wait
方法或訪問其Result
屬性),全部未處理的異常都會用一個AggregateException
對象封裝,方便從新拋給調用方。通常就無需在任務代碼中處理異常,而是這麼作:
int x = 0; Task<int> calc = Task.Factory.StartNew (() => 7 / x); try { Console.WriteLine (calc.Result); } catch (AggregateException aex) { Console.Write (aex.InnerException.Message); // 試圖以 0 爲除數 }
你仍然須要對獨立的任務(無父任務而且沒有在等待它)進行異常處理,以避免當任務失去做用域被垃圾回收時(見如下注釋)有未處理的異常,那會致使程序結束。若是對任務的等待指定了超時時間,那也是如此,由於全部超時時間事後拋出的異常都是未處理的。
TaskScheduler.UnobservedTaskException
靜態事件提供了應對未處理的任務異常的最後手段。經過掛接這個事件,你就能夠攔截這些本來會致使程序結束的異常,而且使用本身的邏輯對它們進行處理。
對於有父子關係的任務,在父任務上等待也會隱式的等待子任務,全部子任務的異常也會傳遞出來。
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; var parent = Task.Factory.StartNew (() => { Task.Factory.StartNew (() => // 子 { Task.Factory.StartNew (() => { throw null; }, atp); // 孫 }, atp); }); // 下面的調用會拋出 NullReferenceException 異常 (封裝在 // 嵌套的 AggregateExceptions 中): parent.Wait();
有趣的是,若是你在任務拋出異常後檢查它的Exception
屬性,這個讀取屬性的動做會防止由於該異常致使程序結束。基本原則是:PFX 的設計者不但願你忽略異常,只要採起某種方式接收異常,就不會受到結束程序的懲罰。
任務中的未處理異常不會致使程序當即結束:它會延遲直到垃圾回收器處理到這個任務,並調用它的析構方法時。這個延遲是由於在進行垃圾回收前,還沒法判斷是否會調用Wait
,或檢查Result
或Exception
屬性。它有時也會誤導你對錯誤源頭的判斷(Visual Studio 的調試器若是開啓了在首個異常處中斷,能夠幫助進行判斷)。
立刻咱們會看處處理異常的另外一種策略,就是使用任務延續。
啓動任務時能夠可選的傳遞一個取消標記(cancellation token)。它可讓你經過協做取消模式取消任務,像以前描述的那樣:
var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 作些事情... token.ThrowIfCancellationRequested(); // 檢查取消請求 // 作些事情... }, token); // ... cancelSource.Cancel();
若是要檢測任務取消,能夠用以下方式捕捉AggregateException
,並檢查它的內部異常:
try { task.Wait(); } catch (AggregateException ex) { if (ex.InnerException is OperationCanceledException) Console.Write ("Task canceled!"); }
若是但願顯式的拋出OperationCanceledException
異常(而不是經過調用ThrowIfCancellationRequested
),那麼必須把取消標記傳遞給OperationCanceledException
的構造方法。若是不這麼作,這個任務就不會以TaskStatus.Canceled
狀態結束,而且也不會觸發使用OnlyOnCanceled
條件的任務延續。
若是任務在啓動前被取消,它就不會被調度,而是直接在任務中拋出OperationCanceledException
。
由於取消標記也能夠被其它 API 識別,因此能夠在其它構造中無縫使用:
var cancelSource = new CancellationTokenSource(); CancellationToken token = cancelSource.Token; Task task = Task.Factory.StartNew (() => { // 傳遞取消標記給 PLINQ 查詢: var query = someSequence.AsParallel().WithCancellation (token)... // ... enumerate query ... });
調用cancelSource
上的Cancel
方法就能夠取消該 PLINQ 查詢,它會在任務中拋出OperationCanceledException
異常,從而取消該任務。
也能夠給Wait
或CancelAndWait
這類方法傳遞取消標記,它可讓你取消等待操做,而不是任務自己。
有時,在一個任務完成(或失敗)後立刻啓動另外一個任務會頗有用。Task
類上的ContinueWith
方法正是實現了這種功能:
Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant..")); Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));
一旦task1
(前項,antecedent)完成、失敗或取消,task2
(延續,continuation)會自動啓動。(若是task1
在運行第二行代碼前已經結束,那麼task2
會被當即調度執行。)傳遞給延續的 lambda 表達式的ant
參數是對前項任務的引用。
咱們的例子演示了最簡單的延續,它和如下代碼功能相似:
Task task = Task.Factory.StartNew (() => { Console.Write ("antecedent.."); Console.Write ("..continuation"); });
可是經過延續的方式能夠更加靈活,好比先等待task1
完成,以後再等待task2
。若是task1
返回數據,這樣就很是有用。
另外一個(不明顯的)差別是:默認狀況下,前項和延續任務多是在不一樣的線程上執行。你能夠在調用ContinueWith
時指定TaskContinuationOptions.ExecuteSynchronously
來強制它們在同一個線程執行:若是延續是很是細粒度的,這樣作能夠經過減小開銷來提高性能。
像普通任務同樣,延續也能夠使用Task<TResult>
類型並返回數據。下面的例子中,咱們使用鏈狀任務來計算Math.Sqrt(8*2)
並打印結果:
Task.Factory.StartNew<int> (() => 8) .ContinueWith (ant => ant.Result * 2) .ContinueWith (ant => Math.Sqrt (ant.Result)) .ContinueWith (ant => Console.WriteLine (ant.Result)); // 4
咱們的例子比較簡單,實際應用中,這些 lambda 表達式可能會調用計算密集型的方法。
延續能夠經過前項的Exception
屬性來獲取前項拋出的異常。下面的代碼會輸出NullReferenceException
信息:
Task task1 = Task.Factory.StartNew (() => { throw null; }); Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));
若是前項拋出了異常但延續沒有檢查前項的Exception
屬性(而且也沒有在等待前項),那麼異常會被認爲是未處理的,就會致使程序結束(除非使用TaskScheduler.UnobservedTaskException
進行了處理)。
安全的模式是從新拋出前項的異常。只要延續被Wait
等待,異常就可以傳播並從新拋出給等待方。
Task continuation = Task.Factory.StartNew (() => { throw null; }) .ContinueWith (ant => { if (ant.Exception != null) throw ant.Exception; // 繼續處理... }); continuation.Wait(); // 異常被拋回調用方
另外一種處理異常的方法是爲異常和正常狀況指定不一樣的延續。須要用到TaskContinuationOptions
:
Task task1 = Task.Factory.StartNew (() => { throw null; }); Task error = task1.ContinueWith (ant => Console.Write (ant.Exception), TaskContinuationOptions.OnlyOnFaulted); Task ok = task1.ContinueWith (ant => Console.Write ("Success!"), TaskContinuationOptions.NotOnFaulted);
下面的擴展方法會「吞掉」任務的未處理異常:
public static void IgnoreExceptions (this Task task) { task.ContinueWith (t => { var ignore = t.Exception; }, TaskContinuationOptions.OnlyOnFaulted); }
(能夠添加對異常的日誌記錄來進一步改進它。)如下是用法:
Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();
延續的一個強大功能是它僅在全部子任務都完成時纔會啓動。這時,全部子任務拋出的異常都會被封送給延續。
接下來的例子中,咱們啓動三個子任務,每一個都拋出NullReferenceException
。而後使用父任務的延續來一次性捕捉這些異常:
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent; Task.Factory.StartNew (() => { Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); Task.Factory.StartNew (() => { throw null; }, atp); }) .ContinueWith (p => Console.WriteLine (p.Exception), TaskContinuationOptions.OnlyOnFaulted);
默認狀況下,延續是被無條件調度的,也就是說不管前項是完成、拋出異常仍是取消,延續都會執行。你能夠經過設置TaskContinuationOptions
枚舉中的標識(可組合)來改變這種行爲。三種控制條件延續的核心標識是:
NotOnRanToCompletion = 0x10000, NotOnFaulted = 0x20000, NotOnCanceled = 0x40000,
這些標識是作減法的,也就是組合的越多,延續越不可能被執行。爲了方便使用,也提供瞭如下預先組合好的值:
OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled, OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled, OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted
(組合全部Not*
標識[NotOnRanToCompletion, NotOnFaulted, NotOnCanceled]
沒有意義,這會致使延續始終被取消。)
RanToCompletion
表明前項成功完成,沒有被取消,也沒有未處理的異常。
Faulted
表明前項中有未處理的異常拋出。
Canceled
表明如下兩種狀況之一:
前項經過其取消標記被取消。換句話說,OperationCanceledException
在前項中拋出,它的CancellationToken
屬性與啓動時傳遞給前項的標記取消匹配。
前項被隱式的取消,由於沒法知足指定的延續條件。
特別須要注意的是,若是這些標識致使延續沒法執行,延續並非被忘記或拋棄,而是被取消。這意味着全部延續任務上的延續就會開始運行,除非你指定了NotOnCanceled
。例如:
Task t1 = Task.Factory.StartNew (...); Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"), TaskContinuationOptions.OnlyOnFaulted); Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));
像以前說的同樣,t3
始終會被調度,即便是t1
沒有拋出異常也是如此。由於t1
成功完成,fault
任務會被取消,而t3
上並無定義任何限制延續的條件,因此t3
就會被無條件執行。
若是但願僅在fault
真正運行的狀況下執行t3
,須要把代碼改爲:
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"), TaskContinuationOptions.NotOnCanceled);
(此外,也能夠指定OnlyOnRanToCompletion
,不一樣之處就是t3
在fault
拋出異常的狀況下不會執行。)
延續的另外一個有用的功能是它能夠在多個前項完成後調度執行。ContinueWhenAll
是在多個前項都完成後調度,而ContinueWhenAny
是在任意一個前項完成後調度。這兩個方法都定義在TaskFactory
類上:
var task1 = Task.Factory.StartNew (() => Console.Write ("X")); var task2 = Task.Factory.StartNew (() => Console.Write ("Y")); var continuation = Task.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));
上面的例子會在打印 「 XY 「 或 「 YX 「 以後打印 「 Done 「。Lambda 表達式中的tasks
參數能夠用來訪問完成的任務數組,當前項返回數據時能夠用到。下面的例子對兩個前項返回的數字求和:
// 真實場景中 task1 和 task2 可能調用複雜的功能: Task<int> task1 = Task.Factory.StartNew (() => 123); Task<int> task2 = Task.Factory.StartNew (() => 456); Task<int> task3 = Task<int>.Factory.ContinueWhenAll ( new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result)); Console.WriteLine (task3.Result); // 579
在這個例子中,咱們使用了<int>
類型參數來調用Task.Factory
是爲了演示得到了一個泛型的任務工廠。這個類型參數不是必須的,它能夠被編譯器推斷。
對一個任務調用一次以上的ContinueWith
會建立單前項的多個延續。當該前項完成時,全部延續會一塊兒啓動(除非指定了TaskContinuationOptions.ExecuteSynchronously
,這會致使延續順序執行)。
下面的代碼會等待一秒,而後打印 「 XY 「 或者 「 YX 「:
var t = Task.Factory.StartNew (() => Thread.Sleep (1000)); t.ContinueWith (ant => Console.Write ("X")); t.ContinueWith (ant => Console.Write ("Y"));
任務調度器(task scheduler)爲任務分配線程,其由抽象類TaskScheduler
類表明,全部任務都會和一個任務調度器關聯。Framework 提供了兩種具體實現:默認調度器(default scheduler)是使用 CLR 線程池工做,還有同步上下文調度器(synchronization context scheduler),它(主要)是爲了對於使用 WPF 和 Windows Forms 的場景提供幫助,這裏的線程模型須要 UI 控件只能在建立它們的線程上訪問。例如,假設咱們須要在後臺從一個 web 服務獲取數據,而後使用它更新一個叫作lblResult
的 WPF 標籤。這能夠分解爲兩個任務:
lblResult
(延續任務)。若是對延續任務指定了窗口建立時獲取的同步上下文調度器,那麼就能夠安全的更新lblResult
:
public partial class MyWindow : Window { TaskScheduler _uiScheduler; // 定義一個字段以便於 // 在類中使用 public MyWindow() { InitializeComponent(); // 從建立窗口的線程獲取 UI 調度器: _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext(); Task.Factory.StartNew<string> (SomeComplexWebService) .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler); } string SomeComplexWebService() { ... } }
也能夠實現本身的任務調度器(經過繼承TaskScheduler
),可是通常只會在很是特殊的場景下才會這麼作。對於自定義調度,須要常用TaskCompletionSource
,咱們立刻會講到。
當調用Task.Factory
時,就是經過Task
上的靜態屬性獲取了默認的TaskFactory
對象。這個任務工廠的做用就是建立任務,具體的說,有三種任務:
StartNew
)ContinueWhenAll
和ContinueWhenAny
)FromAsync
)有趣的是,TaskFactory
是建立後兩種任務的惟一方法。而對於StartNew
,TaskFactory
純粹是爲了方便,技術上說是多餘的,這徹底等同於建立Task
對象而後調用其Start
方法。
TaskFactory
不是抽象工廠:你能夠實例化這個類,在但願重複使用一樣的(非默認的)TaskCreationOptions
值、TaskContinuationOptions
值或者TaskScheduler
時有用。例如,若是但願重複建立長時間運行的子任務,咱們能夠這樣建立一個自定義工廠:
var factory = new TaskFactory ( TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
而後建立任務就能夠僅調用這個工廠上的StartNew
:
Task task1 = factory.StartNew (Method1); Task task2 = factory.StartNew (Method2); // ...
在調用ContinueWhenAll
和ContinueWhenAny
時,自定義的延續選項會被應用。
Task
類作了兩件事情:
有趣的是,這兩件事能夠是分離的:能夠只利用任務的管理工做項的功能而不讓它調度到線程池上運行。TaskCompletionSource
類開啓了這個模式。
使用TaskCompletionSource
時,就建立它的實例。它暴露一個Task
屬性來返回一個任務,你能夠對其等待或附加延續,就和對通常的任務同樣。然而這個任務能夠經過TaskCompletionSource
對象的下列方法進行徹底控制:
public class TaskCompletionSource<TResult> { public void SetResult (TResult result); public void SetException (Exception exception); public void SetCanceled(); public bool TrySetResult (TResult result); public bool TrySetException (Exception exception); public bool TrySetCanceled(); // ... }
若是調用屢次,SetResult
、SetException
和SetCanceled
會拋出異常,而Try*
方法會返回false
。
TResult
對應任務的返回類型,因此TaskCompletionSource<int>
會給你一個Task<int>
。若是須要不返回結果的任務,能夠使用object
類型來建立TaskCompletionSource
,並在調用SetResult
時傳遞null
。能夠把Task<object>
轉換爲Task
類型來使用。
下面的代碼在等待五秒以後打印 「 123 「:
var source = new TaskCompletionSource<int>(); new Thread (() => { Thread.Sleep (5000); source.SetResult (123); }) .Start(); Task<int> task = source.Task; // 咱們的「奴隸」任務 Console.WriteLine (task.Result); // 123
稍後,咱們會展現使用如何BlockingCollection
來寫一個生產者 / 消費者隊列。而後會演示使用TaskCompletionSource
來改進這個方案,它能夠使隊列中的工做項能夠被等待和取消。
如前所屬,PLINQ、Parallel
類和Task
都會自動封送異常給使用者。爲了明白這麼作的重要性,考慮如下 LINQ 查詢,它在第一次迭代時會拋出DivideByZeroException
:
try { var query = from i in Enumerable.Range (0, 1000000) select 100 / i; // ... } catch (DivideByZeroException) { // ... }
若是咱們使用 PLINQ 來並行化查詢而假設它並無進行異常處理,那麼DivideByZeroException
可能會在一個線程中被拋出,就會無視catch
塊從而致使程序結束。
所以,異常會被自動捕捉並從新拋給調用方。然而不幸的是,狀況並非就像捕捉一個DivideByZeroException
那般簡單。由於這些類庫會利用不少線程,極可能有兩個或更多的異常被同時拋出。爲了確保可以報告全部異常,就使用了AggregateException
做爲容器來封裝它們,並經過InnerExceptions
屬性來暴露:
try { var query = from i in ParallelEnumerable.Range (0, 1000000) select 100 / i; // 對查詢進行枚舉 // ... } catch (AggregateException aex) { foreach (Exception ex in aex.InnerExceptions) Console.WriteLine (ex.Message); }
PLINQ 和Parallel
類都會在遇到第一個異常時中止查詢或循環執行,它使用的方式是不處理以後的元素或循環體。而在本輪循環結束前,還有可能拋出更多的異常。第一個異常能夠經過AggregateException
上的InnerException
屬性獲取。
AggregateException
類提供了一對方法來簡化異常處理:Flatten
和Handle
。
AggregateException
常常會包含其它的AggregateException
。好比在子任務拋出異常時就可能如此。你能夠經過調用Flatten
來消除任意層級的嵌套以簡化處理。這個方法會返回一個新的AggregateException
,它的InnerExceptions
就是展平以後的結果:
catch (AggregateException aex) { foreach (Exception ex in aex.Flatten().InnerExceptions) myLogWriter.LogException (ex); }
有時只須要捕捉特定類型的異常,並從新拋出其它類型的異常。AggregateException
上的Handle
方法提供了一個快捷方案。它接受一個異常斷定器,來對全部封裝的異常進行斷定:
public void Handle (Func<Exception, bool> predicate)
若是斷定器返回true
,則該異常被認爲是「已處理」。對於全部異常都運行斷定以後,接下來會發生:
true
),則不會從新拋出異常。false
(「未處理」),則會生成一個新的AggregateException
來封裝這些異常,並從新拋出。例如,下面的代碼最後會從新拋出一個AggregateException
,而且其中僅包含一個NullReferenceException
:
var parent = Task.Factory.StartNew (() => { // 咱們使用 3 個子任務同時拋出 3 個異常: int[] numbers = { 0 }; var childFactory = new TaskFactory (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None); childFactory.StartNew (() => 5 / numbers[0]); // 除數爲零 childFactory.StartNew (() => numbers [1]); // 索引越界 childFactory.StartNew (() => { throw null; }); // 空引用 }); try { parent.Wait(); } catch (AggregateException aex) { aex.Flatten().Handle (ex => // 注意這裏仍是須要調用 Flatten { if (ex is DivideByZeroException) { Console.WriteLine ("Divide by zero"); return true; // 該異常「已處理」 } if (ex is IndexOutOfRangeException) { Console.WriteLine ("Index out of range"); return true; // 該異常「已處理」 } return false; // 其它全部異常會被從新拋出 }); }
Framework 4.0 在System.Collections.Concurrent
命名空間中提供了一組新的集合。它們都是徹底線程安全的:
併發集合 | 對應的非併發集合 |
---|---|
ConcurrentStack<T> |
Stack<T> |
ConcurrentQueue<T> |
Queue<T> |
ConcurrentBag<T> |
( none ) |
BlockingCollection<T> |
( none ) |
ConcurrentDictionary<TKey,TValue> |
Dictionary<TKey,TValue> |
在通常的多線程場景中,須要線程安全的集合時可能會用到這些併發集合。可是,有些注意事項:
List<T>
的併發版本。Stack
和Queue
類,可是這對於併發訪問更好,由於鏈表有助於實現無鎖或更少的鎖。(這是由於向鏈表中插入一個節點只須要更新兩個引用,而對於List<T>
這種結構插入一個元素可能須要移動幾千個已存在的元素。)換句話說,這些集合並非提供了加鎖使用普通集合的快捷辦法。爲了演示這一點,若是咱們在單一線程上執行如下代碼:
var d = new ConcurrentDictionary<int,int>(); for (int i = 0; i < 1000000; i++) d[i] = 123;
它會比下面的代碼慢三倍:
var d = new Dictionary<int,int>(); for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;
(可是對ConcurrentDictionary
讀取會更快,由於讀是無鎖的。)
併發集合與普通集合的另外一個不一樣之處是它們暴露了一些特殊的方法,來進行原子的檢查並行動(test-and-act)的操做,例如TryPop
。這些方法中的大部分都是由IProducerConsumerCollection<T>
接口統一的。
生產者 / 消費者集合有兩個主要用例:
典型的例子是棧和隊列。生產者 / 消費者集合在並行編程中很是重要,由於它有助於高效的無鎖實現。
IProducerConsumerCollection<T>
接口表明了線程安全的生產者 / 消費者集合。如下類實現了該接口:ConcurrentStack<T>
、ConcurrentQueue<T>
和ConcurrentBag<T>
。
IProducerConsumerCollection<T>
擴展自ICollection
,並加入瞭如下方法:
void CopyTo (T[] array, int index); T[] ToArray(); bool TryAdd (T item); bool TryTake (out T item);
TryAdd
和TryTake
方法檢查是否能進行添加 / 移除操做,若是能夠,就進行添加 / 移除。檢查和操做是原子的,因此無需像普通集合那樣使用鎖:
int result; lock (myStack) if (myStack.Count > 0) result = myStack.Pop();
TryTake
在集合爲空時返回false
。TryAdd
在三種實現中都總會成功並返回true
。而若是你要寫本身的不容許重複元素的併發集合,就能夠在元素已存在時讓TryAdd
返回false
(好比本身寫併發集(set))。
TryTake
移除的具體元素是在子類中定義的:
TryTake
移除最新添加的元素。TryTake
移除最先添加的元素。TryTake
移除能夠最快移除的元素。這三個具體類基本都是顯式實現了TryTake
和TryAdd
方法,也經過更具體的的名字暴露了一樣的功能,好比TryDequeue
和TryPop
。
ConcurrentBag<T>
用來存儲一組無序的對象(容許重複)。它適用於你不關心調用Take
或TryTake
會返回哪一個元素的場景。
ConcurrentBag<T>
相比並發隊列和棧的好處是它的Add
方法被不少線程同時調用時幾乎沒有競爭衝突。而對於併發隊列和棧,並行調用Add
會有一些競爭衝突(可是比對非併發集合加鎖的方式要小得多)。併發包的Take
方法也很是高效,只要每一個線程不要拿出比它添加的數量更多的元素。
在併發包的內部,每個線程都有其私有的鏈表。元素會加入到調用Add
的線程對應的私有鏈表中,就消除了競爭衝突。在對包進行枚舉時,枚舉器會遍歷全部線程的私有鏈表,返回其中的每個元素。
調用Take
時,包會首先檢查當前線程的私有鏈表。若是其中有至少一個元素,就能夠沒有衝突的輕鬆完成任務(大多數狀況都是如此)。可是若是鏈表沒有元素,它就必須從其它線程的私有鏈表中「偷」一個元素,就可能致使競爭衝突。
因此,準確的說,調用Take
會返回當前線程最新添加的元素,若是當前線程沒有對應的元素,就會隨機取一個其它線程,返回它最新添加的元素。
若是你的並行操做基本都是在添加元素,或者每一個線程的Add
和Take
是平衡的,那麼使用併發包就很理想。咱們來看前面的一個例子,是使用Parallel.ForEach
來實現並行拼寫檢查:
var misspellings = new ConcurrentBag<Tuple<int,string>>(); Parallel.ForEach (wordsToTest, (word, state, i) => { if (!wordLookup.Contains (word)) misspellings.Add (Tuple.Create ((int) i, word)); });
對於實現生產者 / 消費者隊列,併發包就不是一個好的選擇,由於元素是在不一樣的線程進行添加和移除的。
若是在ConcurrentStack<T>
、ConcurrentQueue<T>
和ConcurrentBag<T>
這些生產者 / 消費者集合上調用TryTake
時,集合爲空,該方法會返回false
。這種場景下,有時可能等待一個元素被添加會更有用。
與其重載TryTake
方法來實現這個功能(若是還要容許取消和超時就可能須要大量成員),不如使用 PFX 的設計者已經實現好的BlockingCollection<T>
類。阻塞集合能夠封裝任意實現了IProducerConsumerCollection<T>
接口的對象,就能夠調用這個封裝上面的Take
方法,它在沒有元素時會阻塞。
阻塞集合也可讓你限制集合的大小,若是超過限制就阻塞生產者。這樣限制了大小的集合被稱爲有界阻塞集合(bounded blocking collection)。
使用BlockingCollection<T>
時:
IProducerConsumerCollection<T>
來封裝,還有集合的最大大小(上界)。Add
或TryAdd
來對底層集合添加元素。Take
或TryTake
來移除(消費)底層集合中的元素。若是調用構造方法的時候沒有指定目標集合,就會自動使用一個ConcurrentQueue<T>
的實例。進行生成和消費的方法均可以指定取消標記和超時時間。Add
和TryAdd
在集合有界時可能會阻塞,Take
和TryTake
在集合爲空時會阻塞。
另外一種消費元素的方式是調用GetConsumingEnumerable
。它會返回一個(可能)無限的序列,當有元素時就能夠返回它。你能夠調用CompleteAdding
來強行結束這個序列,它也會阻止以後再添加元素。
前面咱們寫過一個使用 Wait
和 Pulse
的生產者 / 消費者隊列。這裏使用BlockingCollection<T>
來重構同一個類(不考慮異常處理):
public class PCQueue : IDisposable { BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); public PCQueue (int workerCount) { // 爲每一個消費者建立並啓動單獨的任務: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask (Action action) { _taskQ.Add (action); } void Consume() { // 沒有元素時,對序列的枚舉就會被阻塞, // 而調用 CompleteAdding 能夠結束枚舉。 foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // 進行任務 } }
由於沒有給BlockingCollection
的構造方法傳遞任何參數,因此會自動建立一個併發隊列。而若是傳遞一個ConcurrentStack
,咱們就會獲得生產者 / 消費者棧。
BlockingCollection
還提供了AddToAny
和TakeFromAny
這些靜態方法,它們可讓你對指定的多個阻塞集合進行添加或移除元素。操做會對第一個可以進行操做的集合進行。
咱們以前實現的生產者 / 消費者模式還不夠靈活,由於工做項添加後沒法追蹤它們。若是可以實現如下功能會更好:
理想的解決方案是讓EnqueueTask
方法返回一個對象,來提供咱們上面描述的功能。好消息是這個類已經存在,正是Task
類。咱們須要作的只是經過TaskCompletionSource
來操控它:
public class PCQueue : IDisposable { class WorkItem { public readonly TaskCompletionSource<object> TaskSource; public readonly Action Action; public readonly CancellationToken? CancelToken; public WorkItem ( TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken) { TaskSource = taskSource; Action = action; CancelToken = cancelToken; } } BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>(); public PCQueue (int workerCount) { // 爲每一個消費者建立並啓動單獨的任務: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew (Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public Task EnqueueTask (Action action) { return EnqueueTask (action, null); } public Task EnqueueTask (Action action, CancellationToken? cancelToken) { var tcs = new TaskCompletionSource<object>(); _taskQ.Add (new WorkItem (tcs, action, cancelToken)); return tcs.Task; } void Consume() { foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable()) if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) { workItem.TaskSource.SetCanceled(); } else try { workItem.Action(); workItem.TaskSource.SetResult (null); // 表示完成 } catch (OperationCanceledException ex) { if (ex.CancellationToken == workItem.CancelToken) workItem.TaskSource.SetCanceled(); else workItem.TaskSource.SetException (ex); } catch (Exception ex) { workItem.TaskSource.SetException (ex); } } }
在EnqueueTask
中,咱們入隊一個工做項,它封裝了目標委託和任務完成源,從而讓咱們以後能夠控制返回給消費者的任務。
在Consume
中,咱們在出隊一個工做項後先檢查任務是否被取消。若是沒有,就運行委託而後調用任務完成源上的SetResult
來表示任務完成。
下面是如何使用這個類:
var pcQ = new PCQueue (1); Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!")); // ...
咱們如今能夠對task
等待、附加延續、讓延續中的異常傳播給父任務等等。換句話說,咱們得到了任務模型的豐富功能,同時也至關於自行實現了一個調度器。
在並行編程中,短暫的自旋常常比阻塞更好,由於它避免了上下文切換和內核模式轉換的開銷。SpinLock
和SpinWait
被設計用來在這種場景下提供幫助。它們的主要用途是實現自定義的同步構造。
SpinLock
和SpinWait
是結構體而不是類!這個設計是一種避免間址和垃圾回收的極限優化技術。它意味着你必須當心,不能不經意地複製了實例,好比不使用ref
修飾符把它們傳遞給另外一個方法,或者把它們定義成了readonly
的字段。這在使用SpinLock
時十分重要。
SpinLock
結構體可讓你進行鎖定,而無需上下文切換的開銷,它的代價是保持一個線程自旋(空忙)。這種方式適用於高競爭的場景下鎖定很是短暫的狀況(好比,從頭寫一個線程安全的鏈表)。
若是讓自旋鎖等待的過久(最可能是幾毫秒),它會和普通的鎖同樣出讓其時間片,致使上下文切換。再被從新調度後,它會繼續出讓,就這樣不斷的「自旋出讓」。這比徹底使用自旋消耗的 CPU 資源要少得多,可是比阻塞要高。
在單核的機器上,自旋鎖在遇到競爭時會當即開始「自旋出讓」。
使用SpinLock
和普通的鎖差很少,除了如下幾點:
SpinLock
上的Enter
方法。若是違反了這條規則,要否則會拋出異常(啓用全部者追蹤(owner tracking)時),要否則會死鎖(禁用全部者追蹤時)。在構造自旋鎖時,能夠指定是否啓用全部者追蹤,啓用會影響性能。SpinLock
可讓你經過IsHeld
屬性查詢鎖是否已被獲取,若是啓用了全部者追蹤,那麼使用IsHeldByCurrentThread
屬性。lock
語句那樣的語法糖來簡化SpinLock
的使用。另外一個不一樣之處是當調用Enter
時,你必須遵循提供 lockTaken
參數的健壯模式(幾乎老是使用try / finally
一塊兒實現)。
下面是個例子:
var spinLock = new SpinLock (true); // 啓用全部者追蹤 bool lockTaken = false; try { spinLock.Enter (ref lockTaken); // 作些事情... } finally { if (lockTaken) spinLock.Exit(); }
和普通的鎖同樣,當(且僅當)Enter
方法拋出異常而且鎖沒有被獲取時,lockTaken
會爲false
。這種場景很是罕見(當調用該線程的Abort
,或者OutOfMemoryException
異常被拋出時),但可讓你肯定以後是否須要調用Exit
。
SpinLock
也提供了接受超時時間的TryEnter
方法。
因爲SpinLock
笨拙的值類型語義和缺少語法支持,幾乎每次想用它都是受罪!在替換掉普通的鎖前請三思。
SpinLock
在須要寫本身的可重用同步構造時最有意義。即使如此,自旋鎖也不像看上去那麼有用。它仍然限制了併發。而且會什麼都不作的浪費 CPU 時間。常常更好的選擇都是把時間花在一些「投機」的事情上,並使用SpinWait
來輔助。(譯者注:這裏「投機」是指先進行操做並檢測搶佔,若是發現被搶佔就重試,詳見SpinWait
)
SpinWait
能夠幫助實現無鎖的代碼,使用自旋而非阻塞。它實現了安全措施來避免普通自旋可能會形成的資源飢餓和優先級倒置。
使用SpinWait
的無鎖編程是多線程中最難的,它是爲了應對沒有其它高層構造能夠使用的場景。先決條件是理解非阻塞同步。
假設咱們寫了一個純粹基於一個簡單標識的自旋信號系統:
bool _proceed; void Test() { // 自旋直到其它線程把 _proceed 設置爲 true: while (!_proceed) Thread.MemoryBarrier(); // ... }
若是Test
運行時_proceed
已經爲true
,或者幾回循環內就能變爲true
,那麼這個實現就會很高效。可是如今假設_proceed
在幾秒內保持false
,而且有四個線程同時調用Test
。這個自旋就會徹底佔用一個四核的 CPU!這會致使其它線程運行緩慢(資源飢餓),包括那個會把_proceed
設置爲true
的線程(優先級倒置)。在單核機器上,情況會進一步惡化,由於自旋幾乎老是致使優先級倒置。(雖然如今單核機器已經不多見了,但是單核的虛擬機並很多。)
SpinWait
使用兩種方法解決這個問題。首先,它會限制消耗 CPU 的自旋,在通過必定次數的自旋後,就會每次循環都出讓其時間片(經過調用Thread.Yield
和 Thread.Sleep
),從而減小資源消耗。其次,它會檢測是不是在單核機器上運行,若是是,就會每次循環都出讓其時間片。
有兩種方式使用SpinWait
。第一種是調用靜態方法SpinUntil
。這個方法接受一個斷定器(和一個可選的超時時間):
bool _proceed; void Test() { SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; }); // ... }
另外一種(更靈活)的方式是建立SpinWait
結構體的實例,並在循環中調用SpinOnce
:
bool _proceed; void Test() { var spinWait = new SpinWait(); while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); } // ... }
前者就是使用後者提供的快捷方式。
在當前的實現中,SpinWait
會在出讓以前進行 10 次消耗 CPU 的迭代。但它並不會在每次迭代後當即返回調用方,而是調用Thread.SpinWait
來 經過 CLR(最終是經過操做系統)再自旋必定時間。這個時間最初是幾十納秒,每次迭代都會加倍,直到 10 次迭代結束。這在必定程度上保證了消耗 CPU 的自旋階段的總時間的可預測性,CLR 和操做系統能夠根據狀況來調節。通常來講,這會在幾十微秒的區間,很小,可是要大於上下文切換的開銷。
在單核機器上,SpinWait
每次迭代都會出讓。能夠經過NextSpinWillYield
屬性來檢查SpinWait
在下一次自旋時會不會出讓。
若是SpinWait
在自旋出讓模式保持了好久(大概 20 次),就會按期Sleep
幾微秒來進一步節約資源給其它線程使用。
結合SpinWait
和Interlocked.CompareExchange
能夠原子的更新一個經過本身的值進行計算的字段(讀 - 改 - 寫)。例如,假設咱們要把字段 x 乘 10。非線程安全的簡單代碼就是:
x = x * 10;
它不是線程同步的緣由就和咱們在非阻塞同步中看到的對字段自增不是線程同步的緣由同樣。
正確的無鎖方式以下:
Interlocked.CompareExchange
實現)。例如:
int x; void MultiplyXBy (int factor) { var spinWait = new SpinWait(); while (true) { int snapshot1 = x; Thread.MemoryBarrier(); int calc = snapshot1 * factor; int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1); if (snapshot1 == snapshot2) return; // 沒有被搶佔 spinWait.SpinOnce(); } }
咱們能夠去掉對Thread.MemoryBarrier
的調用來略微提升性能。這是由於CompareExchange
也會生成內存屏障。最壞的狀況就是若是snapshot1
在第一次迭代時就讀取了一個過時的值,那麼會多進行一次自旋。
Interlocked.CompareExchange
是在字段的當前值與第三個參數相等時使用指定的值來更新字段。它會返回字段的舊值,就能夠用來與原快照比較,檢查是否過時。若是值不相等,意味着被另外一個線程搶佔,就須要自旋重試。
CompareExchange
也有重載能夠對於object
類型使用。咱們能夠利用這個重載來實現對全部引用類型的無鎖更新方法:
static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction) where T : class { var spinWait = new SpinWait(); while (true) { T snapshot1 = field; T calc = updateFunction (snapshot1); T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1); if (snapshot1 == snapshot2) return; spinWait.SpinOnce(); } }
下面是如何使用這個方法來寫一個無鎖的線程安全的事件(實際上,這是 C# 4.0 的編譯器對於事件默認的處理):
EventHandler _someDelegate; public event EventHandler SomeEvent { add { LockFreeUpdate (ref _someDelegate, d => d + value); } remove { LockFreeUpdate (ref _someDelegate, d => d - value); } }
咱們也能夠經過把對共享的字段的訪問放進SpinLock
裏來解決上面的問題。問題是自旋鎖同一時間只容許一個線程進入,儘管它(一般)可以消除上下文切換的開銷。而使用SpinWait
時,咱們能夠假設沒有競爭,投機的運行。若是被搶佔就重試。花費 CPU 時間作事情也許比在自旋鎖中浪費 CPU 時間好!
最後,考慮下面的類:
class Test { ProgressStatus _status = new ProgressStatus (0, "Starting"); class ProgressStatus // 不可變類 { public readonly int PercentComplete; public readonly string StatusMessage; public ProgressStatus (int percentComplete, string statusMessage) { PercentComplete = percentComplete; StatusMessage = statusMessage; } } }
咱們能夠使用LockFreeUpdate
方法來增長_status
的PercentComplete
字段的值:
LockFreeUpdate (ref _status, s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage));
注意咱們基於現有值建立了新的ProgressStatus
對象。要感謝LockFreeUpdate
方法,讀取PercentComplete
的值、增長它並寫回的操做不會被不安全的搶佔:任何搶佔均可以被可靠的檢測到,觸發自旋重試。