3、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等

在並行計算中,不可避免的會碰到多個任務共享變量,實例,集合。雖然task自帶了兩個方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來實現任務串行化,可是這些簡單的方法遠遠不能知足咱們實際的開發須要,從.net 4.0開始,類庫給咱們提供了不少的類來幫助咱們簡化並行計算中複雜的數據同步問題。html

1、隔離執行:不共享數據,讓每一個task都有一份本身的數據拷貝。

對數據共享問題處理的方式是「分離執行」,咱們經過把每一個Task執行完成後的各自計算的值進行最後的彙總,也就是說多個Task之間不存在數據共享了,各自作各自的事,徹底分離開來。spring

一、傳統方式

每一個Task執行時不存在數據共享了,每一個Task中計算本身值,最後咱們彙總每一個Task的Result。咱們能夠經過Task中傳遞的state參數來進行隔離執行:數據庫

int Sum = 0;
Task<int>[] tasks = new Task<int>[10];
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task<int>((obj) =>
    {
        var start = (int)obj;
        for (int j = 0; j < 1000; j++)
        {
            start = start + 1;
        }
        return start;
    }, Sum);
    tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
    Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

二、ThreadLocal類

在.Net中提供了System.Threading.ThreadLocal來建立分離。編程

ThreadLocal是一種提供線程本地存儲的類型,它能夠給每一個線程一個分離的實例,來提供每一個線程單獨的數據結果。上面的程序咱們可使用TreadLocal:安全

int Sum = 0;
Task<int>[] tasks = new Task<int>[10];

var tl = new ThreadLocal<int>();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task<int>((obj) =>
    {
        tl.Value = (int)obj;
        for (int j = 0; j < 1000; j++)
        {
            tl.Value++;
        }
        returntl.Value;
    }, Sum);
    tasks[i].Start();
}
Task.WaitAll(tasks);
for (var i = 0; i < 10; i++)
{
    Sum += tasks[i].Result;
}
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

可是咱們要注意的一點TreadLocal是針對每一個線程的,不是針對每一個Task的。一個Tread中可能有多個Task。併發

ThreadLocal類舉例:ui

static ThreadLocal<string> local;
static void Main()
{
    //建立ThreadLocal並提供默認值 
    local = new ThreadLocal<string>(() => "hehe");

    //修改TLS的線程 
    Thread th = new Thread(() =>
     {
         local.Value = "Mgen";
         Display();
     });

    th.Start();
    th.Join();
    Display();
}

//顯示TLS中數據值 
static void Display()
{
    Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value);
}

2、同步類型:經過調整task的執行,有序的執行task

同步類型是一種用來調度Task訪問臨界區域的一種特殊類型。在.Net 4.0中提供了多種同步類型給咱們使用,主要分爲:輕量級的、重量級的和等待處理型的,在下面咱們會介紹經常使用的同步處理類型。spa

經常使用的同步類型

首先來看看.Net 4.0中常見的幾種同步類型以及處理的相關問題:操作系統

同步類型以及解決問題.net

  • lock關鍵字、Montor類、SpinLock類:有序訪問臨界區域
  • Interlocked類:數值類型的增長或則減小
  • Mutex類:交叉同步
  • WaitAll方法:同步多個鎖定(主要是Task之間的調度)
  • 申明性的同步(如Synchronization):使類中的全部的方法同步

一、Lock鎖

其實最簡單同步類型的使用辦法就是使用lock關鍵字。在使用lock關鍵字時,首先咱們須要建立一個鎖定的object,並且這個object須要全部的task都能訪問,其次能咱們須要將咱們的臨界區域包含在lock塊中。咱們以前例子中代碼能夠這樣加上lock:

int Sum = 0;
Task[] tasks = new Task[10];

var obj = new Object(); for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            lock (obj)
            { Sum = Sum + 1; }
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

其實lock關鍵字是使用Monitor的一種簡短的方式,lock關鍵字自動經過調用Monitor.Enter\Monitor.Exit方法來處理得到鎖以及釋放鎖。

二、Interlocked 聯鎖

Interlocked經過使用操做系統或則硬件的一些特性提供了一些列高效的靜態的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類型的多個方法的重載。咱們將上面的例子中使用Interlocked:

int Sum = 0;
Task[] tasks = new Task[10];
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            Interlocked.Increment(ref Sum);
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

三、Mutex互斥體

Mutex也是一個同步類型,在多個線程進行訪問的時候,它只向一個線程受權共享數據的獨立訪問。咱們能夠經過Mutex中的WaitOne方法來獲取Mutex的全部權,可是同時咱們要注意的是,咱們在一個線程中多少次調用過WaitOne方法,就須要調用多少次ReleaseMutex方法來釋放Mutex的佔有。上面的例子咱們經過Mutex這樣實現:

int Sum = 0;
Task[] tasks = new Task[10];

var mutex = new Mutex();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            bool lockAcquired = mutex.WaitOne(); try
            {
                Sum++;
            }
            finally
            {
                if (lockAcquired) mutex.ReleaseMutex();
            }
        }
    });
    tasks[i].Start();
}
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);

3、申明性同步

咱們能夠經過使用Synchronization 特性來標識一個類,從而使一個類型的字段以及方法都實現同步化。在使用Synchronization 時,咱們須要將咱們的目標同步的類繼承於System.ContextBoundObject類型。咱們來看看以前的例子咱們同步標識Synchronization 的實現:

static void Main(string[] args)
{
    var sum = new SumClass();
    Task[] tasks = new Task[10];
    for (int i = 0; i < 10; i++)
    {
        tasks[i] = new Task(() =>
        {
            for (int j = 0; j < 1000; j++)
            {
                sum.Increment();
            }
        });
        tasks[i].Start();
    }
    Task.WaitAll(tasks);
    Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum());
}

[Synchronization]
class SumClass : ContextBoundObject
{
    private int _Sum;

    public void Increment()
    {
        _Sum++;
    }

    public int GetSum()
    {
        return _Sum;
    }
}

4、併發集合

當多個線程對某個非線程安全容器併發地進行讀寫操做時,這些操做將致使不可預估的後果或者會致使報錯。爲了解決這個問題咱們可使用lock關鍵字或者Monitor類來給容器上鎖。但鎖的引入使得咱們的代碼更加複雜,同時也帶來了更多的同步消耗。而.NET Framework 4提供的線程安全且可拓展的併發集合可以使得咱們的並行代碼更加容易編寫,此外,鎖的使用次數的減小也減小了麻煩的死鎖與競爭條件的問題。.NET Framework 4主要提供了以下幾種併發集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合經過使用一種叫作比較並交換(compare and swap, CAS)指令和內存屏障的技術來避免使用重量級的鎖。

在.Net 4.0中提供了不少併發的集合類型來讓咱們處理數據同步的集合的問題,這裏麪包括:

1.ConcurrentQueue:提供併發安全的隊列集合,以先進先出的方式進行操做;
2.ConcurrentStack:提供併發安全的堆棧集合,以先進後出的方式進行操做;
3.ConcurrentBag:提供併發安全的一種無序集合;
4.ConcurrentDictionary:提供併發安全的一種key-value類型的集合。

咱們在這裏只作ConcurrentQueue的一個嘗試,併發隊列是一種線程安全的隊列集合,咱們能夠經過Enqueue()進行排隊、TryDequeue()進行出隊列操做:

for (var j = 0; j < 10; j++)
{
    var queue = new ConcurrentQueue<int>();
    var count = 0;
    for (var i = 0; i < 1000; i++)
    {
        queue.Enqueue(i);
    }
    var tasks = new Task[10];
    for (var i = 0; i < tasks.Length; i++)
    {
        tasks[i] = new Task(() =>
        {
            while (queue.Count > 0)
            {
                int item;
                var isDequeue = queue.TryDequeue(out item);
                if (isDequeue) Interlocked.Increment(ref count);
            }

        });
        tasks[i].Start();
    }
    try
    {
        Task.WaitAll(tasks);
    }
    catch (AggregateException e)
    {
        e.Handle((ex) =>
        {
            Console.WriteLine("Exception Message:{0}", ex.Message);
            return true;
        });
    }
    Console.WriteLine("Dequeue items count :{0}", count);
}

5、Barrier(屏障同步)

barrier叫作屏障,就像下圖中的「紅色線」,若是咱們的屏障設爲4個task就認爲已經滿了的話,那麼執行中先到的task必須等待後到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設置操做爲new Barrier(4,(i)=>{})。SignalAndWait給咱們提供了超時的重載,爲了可以取消後續執行

image

//四個task執行
static Task[] tasks = new Task[4];

static Barrier barrier = null;

static void Main(string[] args)
{
    barrier = new Barrier(tasks.Length, (i) =>
     {
         Console.WriteLine("**********************************************************");
         Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber);
         Console.WriteLine("**********************************************************");
     });

    for (int j = 0; j < tasks.Length; j++)
    {
        tasks[j] = Task.Factory.StartNew((obj) =>
        {
            var single = Convert.ToInt32(obj);

            LoadUser(single);
            barrier.SignalAndWait();

            LoadProduct(single);
            barrier.SignalAndWait();

            LoadOrder(single);
            barrier.SignalAndWait();
        }, j);
    }

    Task.WaitAll(tasks);

    Console.WriteLine("指定數據庫中全部數據已經加載完畢!");

    Console.Read();
}

static void LoadUser(int num)
{
    Console.WriteLine("當前任務:{0}正在加載User部分數據!", num);
}

static void LoadProduct(int num)
{
    Console.WriteLine("當前任務:{0}正在加載Product部分數據!", num);
}

static void LoadOrder(int num)
{
    Console.WriteLine("當前任務:{0}正在加載Order部分數據!", num);
}

相關文章
相關標籤/搜索