進程之間的線程同步

  Mutex類、Event類、SemaphoreSlim類和ReaderWriterLockSlim類等提供了多個進程之間的線程同步。數組

 一、WaitHandle 基類

  WaitHandle抽象類,用於等待一個信號的設置。能夠根據其派生類的不一樣,等待不一樣的信號。異步委託的BeginInvoke()方法返回一個實現了IAsycResult接口的對象。使用IAsycResult接口能夠用AsycWaitHandle屬性訪問WaitHandle基類。在調用WaitOne()方法時,線程會等待接收一個和等待句柄相關的信號:安全

static void Main(string[] args)
{
    Func<int> func = new Func<int>(
        () =>
        {
            Thread.Sleep(1500);
            return 1;
        });
    IAsyncResult ar = func.BeginInvoke(null, null);
    int count = 0;
    while (true)
    {
        Interlocked.Increment(ref count);
        Console.WriteLine("第{0}週期循環等待結果。", count);
        if (ar.AsyncWaitHandle.WaitOne(100, false))
        {
            Console.WriteLine("得到返回結果。");
            break;
        }
    }
    int result = func.EndInvoke(ar);
    Console.WriteLine("結果爲:{0}", result);
}

  使用WaitHandle基類能夠等待一個信號的出現(WaitHandle()方法)、等待多個對象都必須發出信號(WaitAll()方法)、等待多個對象中任一一個發出信號(WaitAny()方法)。其中WaitAll()方法和WaitAny()方法時WaitHandle類的靜態方法,接收一個WaitHandle參數數組。dom

  WaitHandle基類的SafeWaitHandle屬性,其中能夠將一個本機句柄賦予一個系統資源,等待該句柄,如I/O操做,或者自定義的句柄。異步

二、Mutex 類

  Mutex類繼承自WaitHandle類,提供跨多個進程同步訪問的一個類。相似於Monitor類,只能有一個線程擁有鎖定。在Mutex類的構造函數各參數含義:ide

  • initiallyOwned: 若是爲 true,則給予調用線程已命名的系統互斥體的初始所屬權(若是已命名的系統互斥體是經過此調用建立的);不然爲 false。
  • name:系統互斥體的名稱。 若是值爲 null,則 System.Threading.Mutex 是未命名的。
  • createdNew: 在此方法返回時,若是建立了局部互斥體(即,若是 name 爲 null 或空字符串)或指定的命名系統互斥體,則包含布爾值 true;若是指定的命名系統互斥體已存在,則爲false。 該參數未經初始化即被傳遞。
  • mutexSecurity: 一個 System.Security.AccessControl.MutexSecurity 對象,表示應用於已命名的系統互斥體的訪問控制安全性。

  互斥也能夠在另外一個進程中定義,操做系統可以識別有名稱的互斥,它由進程之間共享。若是沒有指定互斥的名稱,則不在不一樣的進程之間共享。該方法能夠檢測程序是否已運行,能夠禁止程序啓動兩次。函數

static void Main(string[] args)
{
    // ThreadingTimer();
    // TimersTimer();
    bool isCreateNew = false;
    Mutex mutex = new Mutex(false, "MyApp", out isCreateNew);//查詢是否已有互斥「MyApp」存在
    if(isCreateNew==false)
    {
        //已存在互斥
    }
}

  要打開已有互斥,可使用Mutex.OpenExisting()方法,不須要構造函數建立互斥時須要的相同.Net權限。可使用WaitOne()方法得到互斥的鎖定,成爲該互斥的擁有着。調用ReleaseMutex()方法釋放互斥:oop

if(mutex.WaitOne())//設置互斥鎖定
{
    try
    {
        //執行代碼
    }
    finally {
        mutex.ReleaseMutex();//釋放互斥
    }
}
else
{
    //出現問題
}

 三、Semaphore 類

  信號量是一種計數的互斥鎖定,能夠同時由多個線程使用。信號量可定義容許同時訪問受旗語鎖定保護的資源的線程個數。Semaphore和SemaphoreSlim兩個類具備信號量功能。Semaphore類能夠指定名稱,讓其在系統資源範圍內查找到,容許在不一樣的進程之間同步。Semaphore類是對較短等待時間進行優化了的輕型版本。優化

static void Main()
{
    int taskCount = 6;
    int semaphoreCount = 3;
    Semaphore semaphore = new Semaphore(semaphoreCount, semaphoreCount, "Test");//建立計數爲3的信號量
    /* 第一個參數爲初始釋放的鎖定數,第二個參數爲可鎖定的總數。若是第一個參數小於第二個參數,其差值就是已分配線程的計量數。
     * 第三個參數爲信號指定的名稱,能讓它在不一樣的進程之間共享。
     */
    var tasks = new Task[taskCount];

    for (int i = 0; i < taskCount; i++)
    {
        tasks[i] = Task.Run(() => TaskMain(semaphore));//建立6個任務
    }

    Task.WaitAll(tasks);

    Console.WriteLine("All tasks finished");
}

//鎖定信號的任務
static void TaskMain(Semaphore semaphore)
{
    bool isCompleted = false;
    while (!isCompleted)//循環等待被釋放的信號量
    {
        if (semaphore.WaitOne(600))//最長等待600ms
        {
            try
            {
                Console.WriteLine("Task {0} locks the semaphore", Task.CurrentId);
                Thread.Sleep(2000);//2s後釋放信號
            }
            finally
            {
                Console.WriteLine("Task {0} releases the semaphore", Task.CurrentId);
                semaphore.Release();//釋放信號量
                isCompleted = true;
            }
        }
        else
        {
            //超過規定的等待時間,寫入一條超時等待的信息
            Console.WriteLine("Timeout for task {0}; wait again", Task.CurrentId);
        }
    }
}

  以上方法中,信號量計數爲3,所以最多隻有三個任務可得到鎖定,第4個及之後的任務必須等待。在解除鎖定時,任何狀況下必定要解除資源的鎖定。ui

四、Events 類

  事件也是一個系統範圍內資源同步的方法。主要由如下幾個類提供:ManualResetEvent、AutoResetEvent、ManualResetEventSlim、和CountdownEvent類。this

  ManualResetEventSlim類中,調用Set()方法能夠發出信號;調用Reset()方法可使重置爲無信號狀態。若是多個線程在等待向一個事件發出信號,並調用Set()方法,就釋放全部等待線程。若是一個線程剛剛調用了WiatOne()方法,但事件已發出信號,等待的線程就能夠繼續等待。

  AutoResetEvent類中,一樣能夠經過Set()方法發出信號、Reset()方法重置信號,可是該類是自動重置信號。若是一個線程在等待自動重置的事件發信號,當第一個線程的等待狀態結束時,該事件會自動變爲不發信號的狀態。即:若是多個線程在等待向事件發信號,只有一個線程結束其等待狀態,它不是等待事件最長的線程,而是優先級最高的線程。

//計算數據的類,使用ManualResetEventSlim類的示例
public class Calculator
{
    private ManualResetEventSlim mEvent;
    public int Result { get; private set; }
    public Calculator(ManualResetEventSlim ev)
    {
        this.mEvent = ev;
    }
    public void Calculation(int x, int y)
    {
        Console.WriteLine("Task {0} starts calculation", Task.CurrentId);
        Thread.Sleep(new Random().Next(3000));//隨機等待事件
        Result = x + y;//計算結果

        Console.WriteLine("Task {0} is ready", Task.CurrentId);
        mEvent.Set();//發出完成信號
    }
}
//外部調用的示例:
static void Main()
{
    const int taskCount = 10;

    ManualResetEventSlim[] mEvents = new ManualResetEventSlim[taskCount];

    WaitHandle[] waitHandles = new WaitHandle[taskCount];
    var calcs = new Calculator[taskCount];

    for (int i = 0; i < taskCount; i++)
    {
        int i1 = i;//目的是使後面要執行的Task沒必要等待執行完後才釋放i,讓for繼續
        mEvents[i] = new ManualResetEventSlim(false);//對應任務的事件對象發出信號
        waitHandles[i] = mEvents[i].WaitHandle;//ManualResetEvent類派生自WaitHandle類,但ManualResetEventSlim並非,所以須要保存其WaitHandle對象
        calcs[i] = new Calculator(mEvents[i]);

        Task.Run(() => calcs[i1].Calculation(i1 + 1, i1 + 3));
    }

    for (int i = 0; i < taskCount; i++)
    {
        int index = WaitHandle.WaitAny(waitHandles);//等待任何一個發出信號,並返回發出信號的索引
        if (index == WaitHandle.WaitTimeout)
        {
            Console.WriteLine("Timeout!!");
        }
        else
        {
            mEvents[index].Reset();//從新設置爲無信號狀態
            Console.WriteLine("finished task for {0}, result: {1}", index, calcs[index].Result);
        }
    }
}
View Code

  CountdownEvent類適用於:須要把一個工做任務分配到多個任務中,而後在各個任務結束後合併結果(不須要爲每一個任務單首創建事件對象)。每一個任務不須要同步。CountdownEvent類爲全部設置了事件的任務定義了一個初始數字,達到該計數後,就發出信號。

//修改計算類
public class Calculator
{
    private CountdownEvent cEvent;
    public int Result { get; private set; }
    public Calculator(CountdownEvent ev)
    {
        this.cEvent = ev;
    }
    public void Calculation(int x, int y)
    {
        Console.WriteLine("Task {0} starts calculation", Task.CurrentId);
        Thread.Sleep(new Random().Next(3000));//隨機等待事件
        Result = x + y;//計算結果
        // signal the event—completed!
        Console.WriteLine("Task {0} is ready", Task.CurrentId);
        cEvent.Signal();//發出完成信號
    }
}
//修改方法調用
static void Main()
{
    const int taskCount = 10;
    CountdownEvent cEvent = new CountdownEvent(taskCount);

    WaitHandle[] waitHandles = new WaitHandle[taskCount];
    var calcs = new Calculator[taskCount];

    for (int i = 0; i < taskCount; i++)
    {
        int i1 = i;//目的是使後面要執行的Task沒必要等待執行後才釋放i,讓for能夠繼續
        calcs[i] = new Calculator(cEvent);//爲每一個任務都賦該事件通知類
        Task.Run(() => calcs[i1].Calculation(i1 + 1, i1 + 3));

    }

    cEvent.Wait();//等待一個事件的信號
    Console.WriteLine("all finished");
    for (int i = 0; i < taskCount; i++)
    {
        Console.WriteLine("task for {0}, result: {1}", i, calcs[i].Result);
    }
}
View Code

 五、Barrier 類

  Barrier類適用於:工做有多個任務分支,而且在全部任務執行完後須要合併的工做狀況。與CountdownEvent不一樣於,該類用於須要同步的參與者。在激活一個任務後,能夠動態的添加其餘參與者。在主參與者繼續以前,能夠等待全部其餘參與者完成工做。

static void Main()
{
    const int numberTasks = 2;
    const int partitionSize = 1000000;
    var data = new List<string>(FillData(partitionSize * numberTasks));
    var barrier = new Barrier(numberTasks + 1);//定義三個參與者:一個主參與者(分配任務者),兩個子參與者(被分配任務者)
    var tasks = new Task<int[]>[numberTasks];//兩個子參與者
    for (int i = 0; i < numberTasks; i++)
    {
        int jobNumber = i;
        tasks[i] = Task.Run(() => CalculationInTask(jobNumber, partitionSize, barrier, data));//啓動計算任務:能夠分開寫,以執行多個不一樣的任務。
    }
    barrier.SignalAndWait();// 主參與者以完成,等待子參與者所有完成。
    //合併兩個結果(LINQ)
    IEnumerable<int> resultCollection = tasks[0].Result.Zip(tasks[1].Result, (c1, c2) => { return c1 + c2; }).ToList();//當即求和

    char ch = 'a';
    int sum = 0;
    foreach (var x in resultCollection)
    {
        Console.WriteLine("{0}, count: {1}", ch++, x);//輸出結果
        sum += x;
    }
    Console.WriteLine("main finished {0}", sum);//統計過的字符串數量
    Console.WriteLine("remaining {0}, phase {1}", barrier.ParticipantsRemaining, barrier.CurrentPhaseNumber);//當前參與者信息
}

static int[] CalculationInTask(int jobNumber, int partitionSize, Barrier barrier, IList<string> coll)
{
    var data = new List<string>(coll);
    int start = jobNumber * partitionSize;//計算其實下標
    int end = start + partitionSize;//計算結束的位置
    Console.WriteLine("Task {0}: partition from {1} to {2}", Task.CurrentId, start, end);
    int[] charCount = new int[26];
    for (int j = start; j < end; j++)
    {
        char c = data[j][0];//獲取當前字符串的第一個字符
        charCount[c - 97]++;//對應字符的數量+1;
    }
    Console.WriteLine("Calculation completed from task {0}. {1} times a, {2} times z", Task.CurrentId, charCount[0], charCount[25]);//告知計算完成
    barrier.RemoveParticipant();//告知,減小一個參數者
    Console.WriteLine("Task {0} removed from barrier, remaining participants {1}", Task.CurrentId, barrier.ParticipantsRemaining);
    return charCount;//返回統計的結果
}

//用於填充一個字符串鏈表
public static IEnumerable<string> FillData(int size)
{
    var data = new List<string>(size);
    var r = new Random();
    for (int i = 0; i < size; i++)
    {
        data.Add(GetString(r));//得到一個隨機的字符串
    }
    return data;
}
private static string GetString(Random r)
{
    var sb = new StringBuilder(6);
    for (int i = 0; i < 6; i++)
    {
        sb.Append((char)(r.Next(26) + 97));//建立一個6個字符的隨機字符串
    }
    return sb.ToString();
}
View Code

 六、ReaderWriterLockSlim 類

  該類是使鎖定機制容許鎖定多個讀取器(而不是一個寫入器)訪問某個資源:若是沒有寫入器鎖定資源,那麼容許多個讀取器訪問資源,但只能有一個寫入器鎖定該資源。

  由它的屬性能夠讀取是否處於堵塞或不堵塞的鎖定,如EnterReadLock()和TryEnterReadLock()方法。也能夠得到其是否處於寫入鎖定或非鎖定狀態,如EnterWriteLock()和TryEnterWriteLock()方法。若是任務須要先讀取資源,以後寫入資源,可使用EnterUpgradeableReadLock()或TryEnterUpgradeableReadLock()方法獲取可升級的讀取鎖定。該鎖定能夠獲取寫入鎖定,而不須要釋放讀取鎖定。

class Program
{
    private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };
    private static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

    static void ReaderMethod(object reader)
    {
        try
        {
            rwl.EnterReadLock();
            for (int i = 0; i < items.Count; i++)
            {
                Console.WriteLine("reader {0}, loop: {1}, item: {2}", reader, i, items[i]);
                Thread.Sleep(40);
            }
        }
        finally
        {
            rwl.ExitReadLock();
        }
    }

    static void WriterMethod(object writer)
    {
        try
        {
            while (!rwl.TryEnterWriteLock(50))
            {
                Console.WriteLine("Writer {0} waiting for the write lock", writer);
                Console.WriteLine("current reader count: {0}", rwl.CurrentReadCount);
            }
            Console.WriteLine("Writer {0} acquired the lock", writer);
            for (int i = 0; i < items.Count; i++)
            {
                items[i]++;
                Thread.Sleep(50);
            }
            Console.WriteLine("Writer {0} finished", writer);
        }
        finally
        {
            rwl.ExitWriteLock();
        }
    }

    static void Main()
    {
        var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
        var tasks = new Task[6];
        tasks[0] = taskFactory.StartNew(WriterMethod, 1);
        tasks[1] = taskFactory.StartNew(ReaderMethod, 1);
        tasks[2] = taskFactory.StartNew(ReaderMethod, 2);
        tasks[3] = taskFactory.StartNew(WriterMethod, 2);
        tasks[4] = taskFactory.StartNew(ReaderMethod, 3);
        tasks[5] = taskFactory.StartNew(ReaderMethod, 4);

        for (int i = 0; i < 6; i++)
        {
            tasks[i].Wait();
        }
    }
}
View Code
相關文章
相關標籤/搜索