原做者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporationweb
原文pdf:http://download.csdn.NET/detail/sqlchen/7509513算法
====================================================================sql
當須要爲多核機器進行優化的時候,最好先檢查下你的程序是否有處理可以分割開來進行並行處理。(例如,有一個巨大的數據集合,其中的元素須要一個一個進行彼此獨立的耗時計算)。編程
.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 來幫助咱們進行並行處理,本文探討這二者的差異及適用的場景。數組
Parallel.ForEach 是 foreach 的多線程實現,他們都能對 IEnumerable<T> 類型對象進行遍歷,Parallel.ForEach 的特殊之處在於它使用多線程來執行循環體內的代碼段。緩存
Parallel.ForEach 最經常使用的形式以下:服務器
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource> body)
PLINQ 也是一種對數據進行並行處理的編程模型,它經過 LINQ 的語法來實現相似 Parallel.ForEach 的多線程並行處理。
數據結構
示例代碼:多線程
public static void IndependentAction(IEnumerable<T> source, Action<T> action) { Parallel.ForEach(source, element => action(element)); }
理由:oop
1. 雖然 PLINQ 也提供了一個相似的 ForAll 接口,但它對於簡單的獨立操做過重量化了。
2. 使用 Parallel.ForEach 你還可以設定 ParallelOptions.MaxDegreeOfParalelism 參數(指定最多須要多少個線程),這樣當 ThreadPool 資源匱乏(甚至當可用線程數<MaxDegreeOfParalelism)的時候, Parallel.ForEach 依然可以順利運行,而且當後續有更多可用線程出現時,Parallel.ForEach 也能及時地利用這些線程。PLINQ 只能經過WithDegreeOfParallelism 方法來要求固定的線程數,即:要求了幾個就是幾個,不會多也不會少。
當輸出的數據序列須要保持原始的順序時採用 PLINQ 的 AsOrdered 方法很是簡單高效。
示例代碼:
public static void GrayscaleTransformation(IEnumerable<Frame> Movie) { var ProcessedMovie = Movie .AsParallel() .AsOrdered() .Select(frame => ConvertToGrayscale(frame)); foreach (var grayscaleFrame in ProcessedMovie) { // Movie frames will be evaluated lazily } }
理由:
1. Parallel.ForEach 實現起來須要繞一些彎路,首先你須要使用如下的重載在方法:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState, Int64> body)
這個重載的 Action 多包含了 index 參數,這樣你在輸出的時候就能利用這個值來維持原先的序列順序。請看下面的例子:
public static double [] PairwiseMultiply(double[] v1, double[] v2) { var length = Math.Min(v1.Length, v2.Lenth); double[] result = new double[length]; Parallel.ForEach(v1, (element, loopstate, elementIndex) => result[elementIndex] = element * v2[elementIndex]); return result; }
你可能已經意識到這裏有個明顯的問題:咱們使用了固定長度的數組。若是傳入的是 IEnumerable 那麼你有4個解決方案:
(1) 調用 IEnumerable.Count() 來獲取數據長度,而後用這個值實例化一個固定長度的數組,而後使用上例的代碼。
(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(沒看懂貼原文)
(3) 第三種方式是採用返回一個哈希集合的方式,這種方式下一般須要至少2倍於傳入數據的內存,因此處理大數據時請慎用。
(4) 本身實現排序算法(保證傳入數據與傳出數據通過排序後次序一致)
2. 相比之下 PLINQ 的 AsOrdered 方法如此簡單,並且該方法能處理流式的數據,從而容許傳入數據是延遲實現的(lazy materialized)
PLINQ 能輸出流數據,這個特性在一下場合很是有用:
1. 結果集不須要是一個完整的處理完畢的數組,即:任什麼時候間點下內存中僅保持數組中的部分信息
2. 你可以在一個單線程上遍歷輸出結果(就好像他們已經存在/處理完了)
示例:
public static void AnalyzeStocks(IEnumerable<Stock> Stocks) { var StockRiskPortfolio = Stocks .AsParallel() .AsOrdered() .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)}) .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk)); foreach (var stockRisk in StockRiskPortfolio) { SomeStockComputation(stockRisk.Risk); // StockRiskPortfolio will be a stream of results } }
這裏使用一個單線程的 foreach 來對 PLINQ 的輸出進行後續處理,一般狀況下 foreach 不須要等待 PLINQ 處理完全部數據就能開始運做。
PLINQ 也容許指定輸出緩存的方式,具體可參照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚舉
PLINQ 的 Zip 方法提供了同時遍歷兩個集合並進行結合元算的方法,而且它能夠與其餘查詢處理操做結合,實現很是複雜的機能。
示例:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { return a .AsParallel() .AsOrdered() .Select(element => ExpensiveComputation(element)) .Zip( b .AsParallel() .AsOrdered() .Select(element => DifferentExpensiveComputation(element)), (a_element, b_element) => Combine(a_element,b_element)); }
示例中的兩個數據源可以並行處理,當雙方都有一個可用元素時提供給 Zip 進行後續處理(Combine)。
Parallel.ForEach 也能實現相似的 Zip 處理:
public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b) { var numElements = Math.Min(a.Count(), b.Count()); var result = new T[numElements]; Parallel.ForEach(a, (element, loopstate, index) => { var a_element = ExpensiveComputation(element); var b_element = DifferentExpensiveComputation(b.ElementAt(index)); result[index] = Combine(a_element, b_element); }); return result; }
固然使用 Parallel.ForEach 後你就得本身確認是否要維持原始序列,而且要注意數組越界訪問的問題。
Parallel.ForEach 提供了一個線程局部變量的重載,定義以下:
public static ParallelLoopResult ForEach<TSource, TLocal>( IEnumerable<TSource> source, Func<TLocal> localInit, Func<TSource, ParallelLoopState, TLocal,TLocal> body, Action<TLocal> localFinally)
使用的示例:
public static List<R> Filtering<T,R>(IEnumerable<T> source) { var results = new List<R>(); using (SemaphoreSlim sem = new SemaphoreSlim(1)) { Parallel.ForEach(source, () => new List<R>(), (element, loopstate, localStorage) => { bool filter = filterFunction(element); if (filter) localStorage.Add(element); return localStorage; }, (finalStorage) => { lock(myLock) { results.AddRange(finalStorage) }; }); } return results; }
線程局部變量有什麼優點呢?請看下面的例子(一個網頁抓取程序):
public static void UnsafeDownloadUrls () { WebClient webclient = new WebClient(); Parallel.ForEach(urls, (url,loopstate,index) => { webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
一般初版代碼是這麼寫的,可是運行時會報錯「System.NotSupportedException -> WebClient does not support concurrent I/O operations.」。這是由於多個線程沒法同時訪問同一個 WebClient 對象。因此咱們會把 WebClient 對象定義到線程中來:
public static void BAD_DownloadUrls () { Parallel.ForEach(urls, (url,loopstate,index) => { WebClient webclient = new WebClient(); webclient.DownloadFile(url, filenames[index] + ".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
修改以後依然有問題,由於你的機器不是服務器,大量實例化的 WebClient 迅速達到你機器容許的虛擬鏈接上限數。線程局部變量能夠解決這個問題:
public static void downloadUrlsSafe() { Parallel.ForEach(urls, () => new WebClient(), (url, loopstate, index, webclient) => { webclient.DownloadFile(url, filenames[index]+".dat"); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); return webclient; }, (webclient) => { }); }
這樣的寫法保證了咱們能得到足夠的 WebClient 實例,同時這些 WebClient 實例彼此隔離僅僅屬於各自關聯的線程。
雖然 PLINQ 提供了 ThreadLocal<T> 對象來實現相似的功能:
public static void downloadUrl() { var webclient = new ThreadLocal<WebClient>(()=> new WebClient ()); var res = urls .AsParallel() .ForAll( url => { webclient.Value.DownloadFile(url, host[url] +".dat")); Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url); }); }
可是請注意:ThreadLocal<T> 相對而言開銷更大!
Parallel.ForEach 有個重載聲明以下,其中包含一個 ParallelLoopState 對象:
public static ParallelLoopResult ForEach<TSource >( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> body)
ParallelLoopState.Stop() 提供了退出循環的方法,這種方式要比其餘兩種方法更快。這個方法通知循環不要再啓動執行新的迭代,並儘量快的推出循環。
ParallelLoopState.IsStopped 屬性可用來斷定其餘迭代是否調用了 Stop 方法。
示例:
public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var matchFound = false; Parallel.ForEach(TSpace, (curValue, loopstate) => { if (curValue.Equals(match) ) { matchFound = true; loopstate.Stop(); } }); return matchFound; }
ParallelLoopState.Break() 通知循環繼續執行本元素前的迭代,但不執行本元素以後的迭代。最前調用 Break 的起做用,並被記錄到 ParallelLoopState.LowestBreakIteration 屬性中。這種處理方式一般被應用在一個有序的查找處理中,好比你有一個排序過的數組,你想在其中查找匹配元素的最小 index,那麼可使用如下的代碼:
public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T> { var loopResult = Parallel.ForEach(source, (curValue, loopState, curIndex) => { if (curValue.Equals(match)) { loopState.Break(); } }); var matchedIndex = loopResult.LowestBreakIteration; return matchedIndex.HasValue ? matchedIndex : -1; }
雖然 PLINQ 也提供了退出的機制(cancellation token),但相對來講退出的時機並無 Parallel.ForEach 那麼及時。