4、並行編程 - 並行LINQ(PLINQ) 的使用。AsParallel

用於對內存中的數據作並行運算,也就是說其只支持 LINQ to Object 的並行運算 html

1、AsParallel(並行化)

就是在集合後加個AsParallel()。算法

例如:spring

var numbers = Enumerable.Range(0, 100);
var result = numbers.AsParallel().AsOrdered().Where(i => i % 2 == 0);
foreach (var i in result)
   Console.WriteLine(i);

image

 

下面咱們模擬給ConcurrentDictionary灌入1500w條記錄,看看串行和並行效率上的差別,注意個人老爺機是2個硬件線程。mongodb

static void Main(string[] args)
{
    var dic = LoadData();

    Stopwatch watch = new Stopwatch();

    watch.Start();

    //串行執行
    var query1 = (from n in dic.Values
                  where n.Age > 20 && n.Age < 25
                  select n).ToList();

    watch.Stop();

    Console.WriteLine("串行計算耗費時間:{0}", watch.ElapsedMilliseconds);

    watch.Restart();

    var query2 = (from n in dic.Values.AsParallel()
                  where n.Age > 20 && n.Age < 25
                  select n).ToList();

    watch.Stop();

    Console.WriteLine("並行計算耗費時間:{0}", watch.ElapsedMilliseconds);

    Console.Read();
}

public static ConcurrentDictionary<int, Student> LoadData()
{
    ConcurrentDictionary<int, Student> dic = new ConcurrentDictionary<int, Student>();

    //預加載1500w條記錄
    Parallel.For(0, 15000000, (i) =>
    {
        var single = new Student()
        {
            ID = i,
            Name = "hxc" + i,
            Age = i % 151,
            CreateTime = DateTime.Now.AddSeconds(i)
        };
        dic.TryAdd(i, single);
    });

    return dic;
}

public class Student
{
    public int ID { get; set; }

    public string Name { get; set; }

    public int Age { get; set; }

    public DateTime CreateTime { get; set; }
}

 

orderby,sum(),average()等等這些聚合函數都是實現了並行化。編程

2、指定並行度

這個我在前面文章也說過,爲了避免讓並行計算佔用所有的硬件線程,或許可能要留一個線程作其餘事情。函數

var query2 = (from n in dic.Values.AsParallel().WithDegreeOfParallelism(Environment.ProcessorCount - 1)
where n.Age > 20 && n.Age < 25
                        orderby n.CreateTime descending
                        select n).ToList();

3、瞭解ParallelEnumerable類

首先這個類是Enumerable的並行版本,提供了不少用於查詢實現的一組方法,下圖爲ParallelEnumerable類的方法,記住他們都是並行的。spa

 image

ConcurrentBag<int> bag = new ConcurrentBag<int>();
 var list = ParallelEnumerable.Range
(0, 10000);
 list.ForAll((i) =>
 {
     bag.Add(i);
 });

 Console.WriteLine("bag集合中元素個數有:{0}", bag.Count);
 Console.WriteLine("list集合中元素個數總和爲:{0}", list.Sum());
 Console.WriteLine("list集合中元素最大值爲:{0}", list.Max());
 Console.WriteLine("list集合中元素第一個元素爲:{0}", list.FirstOrDefault());

4、plinq實現MapReduce算法

  mapReduce是一個很是流行的編程模型,用於大規模數據集的並行計算,很是的牛X啊,記得mongodb中就用到了這個玩意。pwa

  • map:  也就是「映射」操做,能夠爲每個數據項創建一個鍵值對,映射完後會造成一個鍵值對的集合。
  • reduce:「化簡」操做,咱們對這些巨大的「鍵值對集合「進行分組,統計等等。

下面我舉個例子,用Mapreduce來實現一個對age的分組統計。線程

static void Main(string[] args)
{
    List<Student> list = new List<Student>()
    {
        new Student(){ ID=1, Name="jack", Age=20},
        new Student(){ ID=1, Name="mary", Age=25},
        new Student(){ ID=1, Name="joe", Age=29},
        new Student(){ ID=1, Name="Aaron", Age=25},
    };

    //這裏咱們會對age創建一組鍵值對
    var map = list.AsParallel().ToLookup(i => i.Age, count => 1);

    //化簡統計
    var reduce = from IGrouping<int, int> singleMap
                 in map.AsParallel()
                 select new
                 {
                     Age = singleMap.Key,
                     Count = singleMap.Count()
                 };

    ///最後遍歷
    reduce.ForAll(i =>
    {
        Console.WriteLine("當前Age={0}的人數有:{1}人", i.Age, i.Count);
    });
}

public class Student
{
    public int ID { get; set; }

    public string Name { get; set; }

    public int Age { get; set; }

    public DateTime CreateTime { get; set; }
}

image

考慮一個簡單的例子,現有一個容量爲1000000的單詞集,須要咱們以降序列出其中出現次數超過100000的單詞(和其次數)。Map過程,使用PLINQ將集合按單詞分組,這裏使用了Lookup容器接口,它與Dictionary相似,可是提供的是鍵-值集映射;Reduce過程,使用PLINQ歸約查詢便可。code

image

某一次運行結果以下:

Word: you, Count: 142416Word: van, Count: 115816Word: next, Count: 110228

相關文章
相關標籤/搜索