[譯]什麼時候使用 Parallel.ForEach,什麼時候使用 PLINQ

原做者: Pamela Vagata, Parallel Computing Platform Group, Microsoft Corporationweb

原文pdf:http://download.csdn.NET/detail/sqlchen/7509513算法

 

====================================================================sql

簡介

當須要爲多核機器進行優化的時候,最好先檢查下你的程序是否有處理可以分割開來進行並行處理。(例如,有一個巨大的數據集合,其中的元素須要一個一個進行彼此獨立的耗時計算)。編程

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ 來幫助咱們進行並行處理,本文探討這二者的差異及適用的場景。數組

Parallel.ForEach

Parallel.ForEach 是 foreach 的多線程實現,他們都能對 IEnumerable<T> 類型對象進行遍歷,Parallel.ForEach 的特殊之處在於它使用多線程來執行循環體內的代碼段。緩存

Parallel.ForEach 最經常使用的形式以下:服務器

public static ParallelLoopResult ForEach<TSource>(  IEnumerable<TSource> source,        Action<TSource> body)   

PLINQ

PLINQ 也是一種對數據進行並行處理的編程模型,它經過 LINQ 的語法來實現相似 Parallel.ForEach 的多線程並行處理。
數據結構


場景一:簡單數據 之 獨立操做的並行處理(使用 Parallel.ForEach)

示例代碼:多線程

    public static void IndependentAction(IEnumerable<T> source, Action<T> action)  
    {  
        Parallel.ForEach(source, element => action(element));  
    }  

 理由:oop

1. 雖然 PLINQ 也提供了一個相似的 ForAll 接口,但它對於簡單的獨立操做過重量化了。
2. 使用 Parallel.ForEach 你還可以設定 ParallelOptions.MaxDegreeOfParalelism 參數(指定最多須要多少個線程),這樣當 ThreadPool 資源匱乏(甚至當可用線程數<MaxDegreeOfParalelism)的時候, Parallel.ForEach 依然可以順利運行,而且當後續有更多可用線程出現時,Parallel.ForEach 也能及時地利用這些線程。PLINQ 只能經過WithDegreeOfParallelism 方法來要求固定的線程數,即:要求了幾個就是幾個,不會多也不會少。

場景二:順序數據 之 並行處理(使用 PLINQ 來維持數據順序)

當輸出的數據序列須要保持原始的順序時採用 PLINQ 的 AsOrdered 方法很是簡單高效。

示例代碼:

    public static void GrayscaleTransformation(IEnumerable<Frame> Movie)  
    {  
        var ProcessedMovie =  
            Movie  
            .AsParallel()  
            .AsOrdered()  
            .Select(frame => ConvertToGrayscale(frame));  
      
        foreach (var grayscaleFrame in ProcessedMovie)  
        {  
            // Movie frames will be evaluated lazily  
        }  
    }  

 理由:

1. Parallel.ForEach 實現起來須要繞一些彎路,首先你須要使用如下的重載在方法:

    public static ParallelLoopResult ForEach<TSource >(  
        IEnumerable<TSource> source,  
        Action<TSource, ParallelLoopState, Int64> body)  

 這個重載的 Action 多包含了 index  參數,這樣你在輸出的時候就能利用這個值來維持原先的序列順序。請看下面的例子:

    public static double [] PairwiseMultiply(double[] v1, double[] v2)  
    {  
        var length = Math.Min(v1.Length, v2.Lenth);  
        double[] result = new double[length];  
        Parallel.ForEach(v1, (element, loopstate, elementIndex) =>  
            result[elementIndex] = element * v2[elementIndex]);  
        return result;  
    }  

 
你可能已經意識到這裏有個明顯的問題:咱們使用了固定長度的數組。若是傳入的是 IEnumerable 那麼你有4個解決方案:

(1) 調用 IEnumerable.Count() 來獲取數據長度,而後用這個值實例化一個固定長度的數組,而後使用上例的代碼。

(2) The second option would be to materialize the original collection before using it; in the event that your input data set is prohibitively large, neither of the first two options will be feasible.(沒看懂貼原文)

(3) 第三種方式是採用返回一個哈希集合的方式,這種方式下一般須要至少2倍於傳入數據的內存,因此處理大數據時請慎用。

(4) 本身實現排序算法(保證傳入數據與傳出數據通過排序後次序一致)

2. 相比之下 PLINQ 的 AsOrdered 方法如此簡單,並且該方法能處理流式的數據,從而容許傳入數據是延遲實現的(lazy materialized)

場景三:流數據 之 並行處理(使用 PLINQ)

PLINQ 能輸出流數據,這個特性在一下場合很是有用:

1. 結果集不須要是一個完整的處理完畢的數組,即:任什麼時候間點下內存中僅保持數組中的部分信息

2. 你可以在一個單線程上遍歷輸出結果(就好像他們已經存在/處理完了)

示例:

    public static void AnalyzeStocks(IEnumerable<Stock> Stocks)  
    {  
        var StockRiskPortfolio =  
            Stocks  
            .AsParallel()  
            .AsOrdered()  
            .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})  
            .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));  
      
        foreach (var stockRisk in StockRiskPortfolio)  
        {  
            SomeStockComputation(stockRisk.Risk);  
            // StockRiskPortfolio will be a stream of results  
        }  
    }  

 

這裏使用一個單線程的 foreach 來對 PLINQ 的輸出進行後續處理,一般狀況下 foreach 不須要等待 PLINQ 處理完全部數據就能開始運做。

PLINQ 也容許指定輸出緩存的方式,具體可參照 PLINQ 的 WithMergeOptions 方法,及 ParallelMergeOptions 枚舉

場景四:處理兩個集合(使用 PLINQ)

PLINQ 的 Zip 方法提供了同時遍歷兩個集合並進行結合元算的方法,而且它能夠與其餘查詢處理操做結合,實現很是複雜的機能。

示例:

    public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)  
    {  
        return  
            a  
            .AsParallel()  
            .AsOrdered()  
            .Select(element => ExpensiveComputation(element))  
            .Zip(  
                b  
                .AsParallel()  
                .AsOrdered()  
                .Select(element => DifferentExpensiveComputation(element)),  
                (a_element, b_element) => Combine(a_element,b_element));  
    }  

 示例中的兩個數據源可以並行處理,當雙方都有一個可用元素時提供給 Zip 進行後續處理(Combine)。

Parallel.ForEach 也能實現相似的 Zip 處理:

    public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)  
    {  
        var numElements = Math.Min(a.Count(), b.Count());  
        var result = new T[numElements];  
        Parallel.ForEach(a,  
            (element, loopstate, index) =>  
            {  
                var a_element = ExpensiveComputation(element);  
                var b_element = DifferentExpensiveComputation(b.ElementAt(index));  
                result[index] = Combine(a_element, b_element);  
            });  
        return result;  
    }  

 固然使用 Parallel.ForEach 後你就得本身確認是否要維持原始序列,而且要注意數組越界訪問的問題。

場景五:線程局部變量

Parallel.ForEach 提供了一個線程局部變量的重載,定義以下:

    public static ParallelLoopResult ForEach<TSource, TLocal>(  
        IEnumerable<TSource> source,  
        Func<TLocal> localInit,  
        Func<TSource, ParallelLoopState, TLocal,TLocal> body,  
        Action<TLocal> localFinally)  

 使用的示例:

    public static List<R> Filtering<T,R>(IEnumerable<T> source)  
    {  
        var results = new List<R>();  
        using (SemaphoreSlim sem = new SemaphoreSlim(1))  
        {  
            Parallel.ForEach(source,  
                () => new List<R>(),  
                (element, loopstate, localStorage) =>  
                {  
                    bool filter = filterFunction(element);  
                    if (filter)  
                        localStorage.Add(element);  
                    return localStorage;  
                },  
                (finalStorage) =>  
                {  
                    lock(myLock)  
                    {  
                        results.AddRange(finalStorage)  
                    };  
                });  
        }  
        return results;  
    }  

 線程局部變量有什麼優點呢?請看下面的例子(一個網頁抓取程序):

    public static void UnsafeDownloadUrls ()  
    {  
        WebClient webclient = new WebClient();  
        Parallel.ForEach(urls,  
            (url,loopstate,index) =>  
            {  
                webclient.DownloadFile(url, filenames[index] + ".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
            });  
    }  

 一般初版代碼是這麼寫的,可是運行時會報錯「System.NotSupportedException -> WebClient does not support concurrent I/O operations.」。這是由於多個線程沒法同時訪問同一個 WebClient 對象。因此咱們會把 WebClient 對象定義到線程中來:

    public static void BAD_DownloadUrls ()  
    {  
        Parallel.ForEach(urls,  
            (url,loopstate,index) =>  
            {  
                WebClient webclient = new WebClient();  
                webclient.DownloadFile(url, filenames[index] + ".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
            });  
    }  

 修改以後依然有問題,由於你的機器不是服務器,大量實例化的 WebClient 迅速達到你機器容許的虛擬鏈接上限數。線程局部變量能夠解決這個問題:

    public static void downloadUrlsSafe()  
    {  
        Parallel.ForEach(urls,  
            () => new WebClient(),  
            (url, loopstate, index, webclient) =>  
            {  
                webclient.DownloadFile(url, filenames[index]+".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
                return webclient;  
            },  
                (webclient) => { });  
    }  

 這樣的寫法保證了咱們能得到足夠的 WebClient 實例,同時這些 WebClient 實例彼此隔離僅僅屬於各自關聯的線程。

雖然 PLINQ 提供了 ThreadLocal<T> 對象來實現相似的功能:

    public static void downloadUrl()  
    {  
        var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());  
        var res =  
            urls  
            .AsParallel()  
            .ForAll(  
                url =>  
                {  
                    webclient.Value.DownloadFile(url, host[url] +".dat"));  
                    Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
                });  
    }  

 可是請注意:ThreadLocal<T> 相對而言開銷更大!

場景五:退出操做 (使用 Parallel.ForEach)

Parallel.ForEach 有個重載聲明以下,其中包含一個 ParallelLoopState 對象:

 

    public static ParallelLoopResult ForEach<TSource >(  
        IEnumerable<TSource> source,  
        Action<TSource, ParallelLoopState> body)  

ParallelLoopState.Stop() 提供了退出循環的方法,這種方式要比其餘兩種方法更快。這個方法通知循環不要再啓動執行新的迭代,並儘量快的推出循環。

ParallelLoopState.IsStopped 屬性可用來斷定其餘迭代是否調用了 Stop 方法。

示例:

    public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>  
    {  
        var matchFound = false;  
        Parallel.ForEach(TSpace,  
            (curValue, loopstate) =>  
                {  
                    if (curValue.Equals(match) )  
                    {  
                        matchFound = true;  
                        loopstate.Stop();  
                    }  
                });  
        return matchFound;  
    }  

 ParallelLoopState.Break() 通知循環繼續執行本元素前的迭代,但不執行本元素以後的迭代。最前調用 Break 的起做用,並被記錄到 ParallelLoopState.LowestBreakIteration 屬性中。這種處理方式一般被應用在一個有序的查找處理中,好比你有一個排序過的數組,你想在其中查找匹配元素的最小 index,那麼可使用如下的代碼:

    public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>  
    {  
        var loopResult = Parallel.ForEach(source,  
            (curValue, loopState, curIndex) =>  
            {  
                if (curValue.Equals(match))  
                {  
                    loopState.Break();  
                }  
             });  
        var matchedIndex = loopResult.LowestBreakIteration;  
        return matchedIndex.HasValue ? matchedIndex : -1;  
    }  

 雖然 PLINQ 也提供了退出的機制(cancellation token),但相對來講退出的時機並無 Parallel.ForEach 那麼及時。

相關文章
相關標籤/搜索